RabbitMqSubscriber.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. 
  2. using RabbitMQ.Client;
  3. using RabbitMQ.Client.Events;
  4. using RabbitMQ.Client.Exceptions;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Net.Sockets;
  8. using System.Reflection;
  9. using System.Text;
  10. using System.Threading;
  11. using System.Threading.Tasks;
  12. using Microsoft.Extensions.Logging;
  13. using Core.RabbitMQBus.Log;
  14. using Core.RabbitMQBus.Common;
  15. namespace Core.RabbitMQBus.EventBus
  16. {
  17. public class RabbitMqSubscriber : IRabbitMqSubscriber
  18. {
  19. private readonly ILogger<RabbitMqSubscriber> _rabbitMqSubscriberLogger;
  20. private readonly ILoggerHelper _loggerHelper;
  21. private readonly IConnectionChannel _connectionChannel;
  22. public RabbitMqSubscriber(IConnectionChannel connectionChannel, ILoggerHelper loggerHelper, ILogger<RabbitMqSubscriber> rabbitMqSubscriberLogger)
  23. {
  24. _loggerHelper = loggerHelper;
  25. _connectionChannel = connectionChannel;
  26. _rabbitMqSubscriberLogger = rabbitMqSubscriberLogger;
  27. }
  28. /// <summary>
  29. /// 订阅事件
  30. /// </summary>
  31. /// <returns></returns>
  32. public void Subscriber(string queueName)
  33. {
  34. var channel = _connectionChannel.GetChannel();
  35. //声明队列
  36. channel.QueueDeclare(queue: queueName, durable: true, false, false, null);
  37. //绑定交换机队列
  38. channel.QueueBind(queue: queueName, exchange: _connectionChannel.ExchangeName, routingKey: queueName);
  39. var consumer = new EventingBasicConsumer(channel);
  40. channel.BasicQos(0, 1000, false);
  41. consumer.Shutdown += OnConsumerShutdown;
  42. consumer.Registered += OnConsumerRegistered;
  43. consumer.Unregistered += OnConsumerUnregistered;
  44. consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
  45. consumer.Received += (model, ea) =>
  46. {
  47. var body = ea.Body;
  48. var message = Encoding.Default.GetString(body);
  49. var subscriberServiceMethod = RegisterSubscriberCache.GetSubscriberMethod(ea.RoutingKey);
  50. var obj = SwifterJsonSerializer.DeserializeObject(message, subscriberServiceMethod.MethodParameterType);
  51. var methodTask = (Task<ExecuteMethodResult>)subscriberServiceMethod.MethodInfo.Invoke(subscriberServiceMethod.SubscriberService,
  52. new object[] { obj });
  53. if (methodTask.Result.IsSucceed)
  54. {
  55. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  56. }
  57. else
  58. {
  59. _loggerHelper.LogInfo("订阅执行异常", methodTask.Result.Message.ToString());
  60. channel.BasicNack(ea.DeliveryTag, false, true);
  61. }
  62. };
  63. channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
  64. }
  65. /// <summary>
  66. /// 取消消费者
  67. /// </summary>
  68. /// <param name="sender"></param>
  69. /// <param name="e"></param>
  70. private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
  71. {
  72. _loggerHelper.LogInfo("取消消费者", e.ConsumerTag);
  73. }
  74. /// <summary>
  75. /// 消费者取消注册
  76. /// </summary>
  77. /// <param name="sender"></param>
  78. /// <param name="e"></param>
  79. private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
  80. {
  81. _loggerHelper.LogInfo("消费者取消注册", e.ConsumerTag);
  82. }
  83. /// <summary>
  84. /// 消费者注册
  85. /// </summary>
  86. /// <param name="sender"></param>
  87. /// <param name="e"></param>
  88. private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
  89. {
  90. _rabbitMqSubscriberLogger.LogInformation($"消费者已启动:{e.ConsumerTag}");
  91. _loggerHelper.LogInfo("消费者注册", e.ConsumerTag);
  92. }
  93. /// <summary>
  94. /// 消费者宕机
  95. /// </summary>
  96. /// <param name="sender"></param>
  97. /// <param name="e"></param>
  98. private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
  99. {
  100. _loggerHelper.LogInfo("消费者宕机", e.ReplyText);
  101. }
  102. }
  103. }