using Core.RabbitMQBus.Log; using RabbitMQ.Client; using System; using System.Collections.Concurrent; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Core.RabbitMQBus.EventBus { public class RabbitMqPublisher : IRabbitMqPublisher { private readonly ILoggerHelper _loggerHelper; private readonly IConnectionChannel _connectionChannel; private readonly IModel _channel; IBasicProperties properties; ConcurrentDictionary confirmedDictionary = new ConcurrentDictionary(); public RabbitMqPublisher(IConnectionChannel connectionChannel, ILoggerHelper loggerHelper) { _loggerHelper = loggerHelper; _connectionChannel = connectionChannel; _channel = _connectionChannel.GetChannel(); //声名交换机 _channel.ExchangeDeclare(exchange: _connectionChannel.ExchangeName, type: "topic", durable: true); //启用发布者确认 _channel.ConfirmSelect(); //肯定确认 _channel.BasicAcks += (s, e) => { //多条 if (e.Multiple) { var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag); foreach (var entry in confirmed) { confirmedDictionary.TryRemove(entry.Key, out _); } } //单条 else { confirmedDictionary.TryRemove(e.DeliveryTag, out _); } }; //否定确认 _channel.BasicNacks += (s, e) => { //多条 if (e.Multiple) { var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag); foreach (var entry in confirmed) { confirmedBasicNackLog(entry.Key); } } //单条 else { confirmedBasicNackLog(e.DeliveryTag); } }; } /// /// 否定确认日志 /// /// public void confirmedBasicNackLog(ulong sequenceNumber) { confirmedDictionary.TryGetValue(sequenceNumber, out string body); _loggerHelper.LogInfo("RabbitMQ发布者确认异常", body); confirmedDictionary.TryRemove(sequenceNumber, out _); } /// /// 发布事件 /// /// public void Publish(string queueName, object content) { try { var message = SwifterJsonSerializer.SerializeObject(content); confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message); var body = Encoding.Default.GetBytes(message); //队列持久化 //var dictionary = new Dictionary(); //dictionary.TryAdd(queueName, content); properties = _channel.CreateBasicProperties(); properties.Persistent = true; //properties.Headers = dictionary; //发送消息 _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body); } catch (Exception ex) { _loggerHelper.LogInfo("发布消息异常", ex.Message); throw; } } /// /// 异步发布事件 /// /// public async Task PublishAsync(string queueName, object content) { try { var message = SwifterJsonSerializer.SerializeObject(content); confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message); var body = Encoding.Default.GetBytes(message); //队列持久化 properties = _channel.CreateBasicProperties(); properties.Persistent = true; //发送消息 _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body); await Task.CompletedTask; } catch (Exception ex) { await _loggerHelper.LogInfoAsync("发布消息异常", ex.Message); throw; } } } }