using Core.RabbitMQBus.Common; using Microsoft.Extensions.Options; using Polly; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System; using System.Net.Sockets; namespace Core.RabbitMQBus.EventBus { public class ConnectionChannel : IConnectionChannel, IDisposable { private readonly RabbitMQOptions _options; private IConnection _connection; public ConnectionChannel(IOptions options) { _options = options.Value; TryConnection(); } /// /// 交换机名称 /// public string ExchangeName => _options.ExchangeName; /// /// 获取Channel /// /// public IModel GetChannel() { return _connection.CreateModel(); } /// /// 创建连接 /// /// public IConnection GetConnection() { if (_connection != null && _connection.IsOpen) { return _connection; } var connectionFactory = new ConnectionFactory { UserName = _options.UserName, Port = _options.Port, Password = _options.Password, VirtualHost = _options.VirtualHost, HostName = _options.HostName }; _connection = connectionFactory.CreateConnection(); _connection.ConnectionShutdown += Connection_ConnectionShutdown; _connection.CallbackException += Connection_CallbackException; _connection.ConnectionBlocked += Connection_ConnectionBlocked; return _connection; } /// /// 重试连接 /// /// public void TryConnection() { var policy = Policy.Handle().Or() .WaitAndRetry(5, p => TimeSpan.FromSeconds(1), (ex, time) => { //记录异常日志 }); policy.Execute(() => { try { GetConnection(); } catch (Exception) { throw; } }); } /// /// 连接被阻止的异常 /// /// /// private void Connection_ConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { TryConnection(); } /// /// 回调异常 /// /// /// private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e) { TryConnection(); } /// /// 连接异常 /// /// /// private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { TryConnection(); } /// /// 释放连接通到 /// public void Dispose() { if (_connection.IsOpen) { _connection.Close(); } } } }