123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- using Core.RabbitMQBus.Common;
- using Microsoft.Extensions.Options;
- using Polly;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using RabbitMQ.Client.Exceptions;
- using System;
- using System.Net.Sockets;
- namespace Core.RabbitMQBus.EventBus
- {
- public class ConnectionChannel : IConnectionChannel, IDisposable
- {
- private readonly RabbitMQOptions _options;
- private IConnection _connection;
- public ConnectionChannel(IOptions<RabbitMQOptions> options)
- {
- _options = options.Value;
- TryConnection();
- }
- /// <summary>
- /// 交换机名称
- /// </summary>
- public string ExchangeName => _options.ExchangeName;
- /// <summary>
- /// 获取Channel
- /// </summary>
- /// <returns></returns>
- public IModel GetChannel()
- {
- return _connection.CreateModel();
- }
- /// <summary>
- /// 创建连接
- /// </summary>
- /// <returns></returns>
- public IConnection GetConnection()
- {
- if (_connection != null && _connection.IsOpen)
- {
- return _connection;
- }
- var connectionFactory = new ConnectionFactory
- {
- UserName = _options.UserName,
- Port = _options.Port,
- Password = _options.Password,
- VirtualHost = _options.VirtualHost,
- HostName = _options.HostName
- };
- _connection = connectionFactory.CreateConnection();
- _connection.ConnectionShutdown += Connection_ConnectionShutdown;
- _connection.CallbackException += Connection_CallbackException;
- _connection.ConnectionBlocked += Connection_ConnectionBlocked;
- return _connection;
- }
- /// <summary>
- /// 重试连接
- /// </summary>
- /// <returns></returns>
- public void TryConnection()
- {
- var policy = Policy.Handle<SocketException>().Or<BrokerUnreachableException>()
- .WaitAndRetry(5, p => TimeSpan.FromSeconds(1), (ex, time) =>
- {
- //记录异常日志
- });
- policy.Execute(() =>
- {
- try
- {
- GetConnection();
- }
- catch (Exception)
- {
- throw;
- }
- });
- }
- /// <summary>
- /// 连接被阻止的异常
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void Connection_ConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
- {
- TryConnection();
- }
- /// <summary>
- /// 回调异常
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e)
- {
- TryConnection();
- }
- /// <summary>
- /// 连接异常
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
- {
- TryConnection();
- }
- /// <summary>
- /// 释放连接通到
- /// </summary>
- public void Dispose()
- {
- if (_connection.IsOpen)
- {
- _connection.Close();
- }
- }
- }
- }
|