123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- using Core.RabbitMQBus.Log;
- using RabbitMQ.Client;
- using System;
- using System.Collections.Concurrent;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace Core.RabbitMQBus.EventBus
- {
- public class RabbitMqPublisher : IRabbitMqPublisher
- {
- private readonly ILoggerHelper _loggerHelper;
- private readonly IConnectionChannel _connectionChannel;
- private readonly IModel _channel;
- IBasicProperties properties;
- ConcurrentDictionary<ulong, string> confirmedDictionary = new ConcurrentDictionary<ulong, string>();
- public RabbitMqPublisher(IConnectionChannel connectionChannel, ILoggerHelper loggerHelper)
- {
- _loggerHelper = loggerHelper;
- _connectionChannel = connectionChannel;
- _channel = _connectionChannel.GetChannel();
- //声名交换机
- _channel.ExchangeDeclare(exchange: _connectionChannel.ExchangeName, type: "topic", durable: true);
- //启用发布者确认
- _channel.ConfirmSelect();
- //肯定确认
- _channel.BasicAcks += (s, e) =>
- {
- //多条
- if (e.Multiple)
- {
- var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag);
- foreach (var entry in confirmed)
- {
- confirmedDictionary.TryRemove(entry.Key, out _);
- }
- }
- //单条
- else
- {
- confirmedDictionary.TryRemove(e.DeliveryTag, out _);
- }
- };
- //否定确认
- _channel.BasicNacks += (s, e) =>
- {
- //多条
- if (e.Multiple)
- {
- var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag);
- foreach (var entry in confirmed)
- {
- confirmedBasicNackLog(entry.Key);
- }
- }
- //单条
- else
- {
- confirmedBasicNackLog(e.DeliveryTag);
- }
- };
- }
- /// <summary>
- /// 否定确认日志
- /// </summary>
- /// <param name="sequenceNumber"></param>
- public void confirmedBasicNackLog(ulong sequenceNumber)
- {
- confirmedDictionary.TryGetValue(sequenceNumber, out string body);
- _loggerHelper.LogInfo("RabbitMQ发布者确认异常", body);
- confirmedDictionary.TryRemove(sequenceNumber, out _);
- }
- /// <summary>
- /// 发布事件
- /// </summary>
- /// <returns></returns>
- public void Publish(string queueName, object content)
- {
- try
- {
- var message = SwifterJsonSerializer.SerializeObject(content);
- confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message);
- var body = Encoding.Default.GetBytes(message);
- //队列持久化
- //var dictionary = new Dictionary<string, object>();
- //dictionary.TryAdd(queueName, content);
- properties = _channel.CreateBasicProperties();
- properties.Persistent = true;
- //properties.Headers = dictionary;
- //发送消息
- _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body);
- }
- catch (Exception ex)
- {
- _loggerHelper.LogInfo("发布消息异常", ex.Message);
- throw;
- }
- }
- /// <summary>
- /// 异步发布事件
- /// </summary>
- /// <returns></returns>
- public async Task PublishAsync(string queueName, object content)
- {
- try
- {
- var message = SwifterJsonSerializer.SerializeObject(content);
- confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message);
- var body = Encoding.Default.GetBytes(message);
- //队列持久化
- properties = _channel.CreateBasicProperties();
- properties.Persistent = true;
- //发送消息
- _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body);
- await Task.CompletedTask;
- }
- catch (Exception ex)
- {
- await _loggerHelper.LogInfoAsync("发布消息异常", ex.Message);
- throw;
- }
- }
- }
- }
|