using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System; using System.Collections.Generic; using System.Net.Sockets; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Core.RabbitMQBus.Log; using Core.RabbitMQBus.Common; namespace Core.RabbitMQBus.EventBus { public class RabbitMqSubscriber : IRabbitMqSubscriber { private readonly ILogger _rabbitMqSubscriberLogger; private readonly ILoggerHelper _loggerHelper; private readonly IConnectionChannel _connectionChannel; public RabbitMqSubscriber(IConnectionChannel connectionChannel, ILoggerHelper loggerHelper, ILogger rabbitMqSubscriberLogger) { _loggerHelper = loggerHelper; _connectionChannel = connectionChannel; _rabbitMqSubscriberLogger = rabbitMqSubscriberLogger; } /// /// 订阅事件 /// /// public void Subscriber(string queueName) { var channel = _connectionChannel.GetChannel(); //声明队列 channel.QueueDeclare(queue: queueName, durable: true, false, false, null); //绑定交换机队列 channel.QueueBind(queue: queueName, exchange: _connectionChannel.ExchangeName, routingKey: queueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(0, 1000, false); consumer.Shutdown += OnConsumerShutdown; consumer.Registered += OnConsumerRegistered; consumer.Unregistered += OnConsumerUnregistered; consumer.ConsumerCancelled += OnConsumerConsumerCancelled; consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.Default.GetString(body); var subscriberServiceMethod = RegisterSubscriberCache.GetSubscriberMethod(ea.RoutingKey); var obj = SwifterJsonSerializer.DeserializeObject(message, subscriberServiceMethod.MethodParameterType); var methodTask = (Task)subscriberServiceMethod.MethodInfo.Invoke(subscriberServiceMethod.SubscriberService, new object[] { obj }); if (methodTask.Result.IsSucceed) { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } else { _loggerHelper.LogInfo("订阅执行异常", methodTask.Result.Message.ToString()); channel.BasicNack(ea.DeliveryTag, false, true); } }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } /// /// 取消消费者 /// /// /// private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { _loggerHelper.LogInfo("取消消费者", e.ConsumerTag); } /// /// 消费者取消注册 /// /// /// private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { _loggerHelper.LogInfo("消费者取消注册", e.ConsumerTag); } /// /// 消费者注册 /// /// /// private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { _rabbitMqSubscriberLogger.LogInformation($"消费者已启动:{e.ConsumerTag}"); _loggerHelper.LogInfo("消费者注册", e.ConsumerTag); } /// /// 消费者宕机 /// /// /// private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { _loggerHelper.LogInfo("消费者宕机", e.ReplyText); } } }