ConnectionChannel.cs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. using Core.RabbitMQBus.Common;
  2. using Microsoft.Extensions.Options;
  3. using Polly;
  4. using RabbitMQ.Client;
  5. using RabbitMQ.Client.Events;
  6. using RabbitMQ.Client.Exceptions;
  7. using System;
  8. using System.Net.Sockets;
  9. namespace Core.RabbitMQBus.EventBus
  10. {
  11. public class ConnectionChannel : IConnectionChannel, IDisposable
  12. {
  13. private readonly RabbitMQOptions _options;
  14. private IConnection _connection;
  15. public ConnectionChannel(IOptions<RabbitMQOptions> options)
  16. {
  17. _options = options.Value;
  18. TryConnection();
  19. }
  20. /// <summary>
  21. /// 交换机名称
  22. /// </summary>
  23. public string ExchangeName => _options.ExchangeName;
  24. /// <summary>
  25. /// 获取Channel
  26. /// </summary>
  27. /// <returns></returns>
  28. public IModel GetChannel()
  29. {
  30. return _connection.CreateModel();
  31. }
  32. /// <summary>
  33. /// 创建连接
  34. /// </summary>
  35. /// <returns></returns>
  36. public IConnection GetConnection()
  37. {
  38. if (_connection != null && _connection.IsOpen)
  39. {
  40. return _connection;
  41. }
  42. var connectionFactory = new ConnectionFactory
  43. {
  44. UserName = _options.UserName,
  45. Port = _options.Port,
  46. Password = _options.Password,
  47. VirtualHost = _options.VirtualHost,
  48. HostName = _options.HostName
  49. };
  50. _connection = connectionFactory.CreateConnection();
  51. _connection.ConnectionShutdown += Connection_ConnectionShutdown;
  52. _connection.CallbackException += Connection_CallbackException;
  53. _connection.ConnectionBlocked += Connection_ConnectionBlocked;
  54. return _connection;
  55. }
  56. /// <summary>
  57. /// 重试连接
  58. /// </summary>
  59. /// <returns></returns>
  60. public void TryConnection()
  61. {
  62. var policy = Policy.Handle<SocketException>().Or<BrokerUnreachableException>()
  63. .WaitAndRetry(5, p => TimeSpan.FromSeconds(1), (ex, time) =>
  64. {
  65. //记录异常日志
  66. });
  67. policy.Execute(() =>
  68. {
  69. try
  70. {
  71. GetConnection();
  72. }
  73. catch (Exception)
  74. {
  75. throw;
  76. }
  77. });
  78. }
  79. /// <summary>
  80. /// 连接被阻止的异常
  81. /// </summary>
  82. /// <param name="sender"></param>
  83. /// <param name="e"></param>
  84. private void Connection_ConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
  85. {
  86. TryConnection();
  87. }
  88. /// <summary>
  89. /// 回调异常
  90. /// </summary>
  91. /// <param name="sender"></param>
  92. /// <param name="e"></param>
  93. private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e)
  94. {
  95. TryConnection();
  96. }
  97. /// <summary>
  98. /// 连接异常
  99. /// </summary>
  100. /// <param name="sender"></param>
  101. /// <param name="e"></param>
  102. private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
  103. {
  104. TryConnection();
  105. }
  106. /// <summary>
  107. /// 释放连接通到
  108. /// </summary>
  109. public void Dispose()
  110. {
  111. if (_connection.IsOpen)
  112. {
  113. _connection.Close();
  114. }
  115. }
  116. }
  117. }