123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
-
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using RabbitMQ.Client.Exceptions;
- using System;
- using System.Collections.Generic;
- using System.Net.Sockets;
- using System.Reflection;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using Microsoft.Extensions.Logging;
- using Core.RabbitMQBus.Log;
- using Core.RabbitMQBus.Common;
- namespace Core.RabbitMQBus.EventBus
- {
- public class RabbitMqSubscriber : IRabbitMqSubscriber
- {
- private readonly ILogger<RabbitMqSubscriber> _rabbitMqSubscriberLogger;
- private readonly ILoggerHelper _loggerHelper;
- private readonly IConnectionChannel _connectionChannel;
- public RabbitMqSubscriber(IConnectionChannel connectionChannel, ILoggerHelper loggerHelper, ILogger<RabbitMqSubscriber> rabbitMqSubscriberLogger)
- {
- _loggerHelper = loggerHelper;
- _connectionChannel = connectionChannel;
- _rabbitMqSubscriberLogger = rabbitMqSubscriberLogger;
- }
- /// <summary>
- /// 订阅事件
- /// </summary>
- /// <returns></returns>
- public void Subscriber(string queueName)
- {
- var channel = _connectionChannel.GetChannel();
- //声明队列
- channel.QueueDeclare(queue: queueName, durable: true, false, false, null);
- //绑定交换机队列
- channel.QueueBind(queue: queueName, exchange: _connectionChannel.ExchangeName, routingKey: queueName);
- var consumer = new EventingBasicConsumer(channel);
- channel.BasicQos(0, 1000, false);
- consumer.Shutdown += OnConsumerShutdown;
- consumer.Registered += OnConsumerRegistered;
- consumer.Unregistered += OnConsumerUnregistered;
- consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body;
- var message = Encoding.Default.GetString(body);
- var subscriberServiceMethod = RegisterSubscriberCache.GetSubscriberMethod(ea.RoutingKey);
- var obj = SwifterJsonSerializer.DeserializeObject(message, subscriberServiceMethod.MethodParameterType);
- var methodTask = (Task<ExecuteMethodResult>)subscriberServiceMethod.MethodInfo.Invoke(subscriberServiceMethod.SubscriberService,
- new object[] { obj });
- if (methodTask.Result.IsSucceed)
- {
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- }
- else
- {
- _loggerHelper.LogInfo("订阅执行异常", methodTask.Result.Message.ToString());
- channel.BasicNack(ea.DeliveryTag, false, true);
- }
- };
- channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
- }
- /// <summary>
- /// 取消消费者
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
- {
- _loggerHelper.LogInfo("取消消费者", e.ConsumerTag);
- }
- /// <summary>
- /// 消费者取消注册
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
- {
- _loggerHelper.LogInfo("消费者取消注册", e.ConsumerTag);
- }
- /// <summary>
- /// 消费者注册
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
- {
- _rabbitMqSubscriberLogger.LogInformation($"消费者已启动:{e.ConsumerTag}");
- _loggerHelper.LogInfo("消费者注册", e.ConsumerTag);
- }
- /// <summary>
- /// 消费者宕机
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
- {
- _loggerHelper.LogInfo("消费者宕机", e.ReplyText);
- }
- }
- }
|