using Core.RabbitMQBus.Log;
using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Core.RabbitMQBus.EventBus
{
    public class RabbitMqPublisher : IRabbitMqPublisher
    {
        private readonly ILoggerHelper _loggerHelper;
        private readonly IConnectionChannel _connectionChannel;
        private readonly IModel _channel;
        IBasicProperties properties;
        ConcurrentDictionary<ulong, string> confirmedDictionary = new ConcurrentDictionary<ulong, string>();
        public RabbitMqPublisher(IConnectionChannel connectionChannel, ILoggerHelper loggerHelper)
        {
            _loggerHelper = loggerHelper;
            _connectionChannel = connectionChannel;
            _channel = _connectionChannel.GetChannel();
            //声名交换机
            _channel.ExchangeDeclare(exchange: _connectionChannel.ExchangeName, type: "topic", durable: true);
            //启用发布者确认
            _channel.ConfirmSelect();
            //肯定确认
            _channel.BasicAcks += (s, e) =>
            {
                //多条
                if (e.Multiple)
                {
                    var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag);
                    foreach (var entry in confirmed)
                    {
                        confirmedDictionary.TryRemove(entry.Key, out _);
                    }
                }
                //单条
                else
                {
                    confirmedDictionary.TryRemove(e.DeliveryTag, out _);
                }
            };
            //否定确认
            _channel.BasicNacks += (s, e) =>
            {
                //多条
                if (e.Multiple)
                {
                    var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag);
                    foreach (var entry in confirmed)
                    {
                        confirmedBasicNackLog(entry.Key);
                    }
                }
                //单条
                else
                {
                    confirmedBasicNackLog(e.DeliveryTag);
                }
            };
        }

        /// <summary>
        /// 否定确认日志
        /// </summary>
        /// <param name="sequenceNumber"></param>
        public void confirmedBasicNackLog(ulong sequenceNumber)
        {
            confirmedDictionary.TryGetValue(sequenceNumber, out string body);
            _loggerHelper.LogInfo("RabbitMQ发布者确认异常", body);
            confirmedDictionary.TryRemove(sequenceNumber, out _);
        }

        /// <summary>
        /// 发布事件
        /// </summary>
        /// <returns></returns>
        public void Publish(string queueName, object content)
        {
            try
            {
                var message = SwifterJsonSerializer.SerializeObject(content);
                confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message);
                var body = Encoding.Default.GetBytes(message);
                //队列持久化
                //var dictionary = new Dictionary<string, object>();
                //dictionary.TryAdd(queueName, content);
                properties = _channel.CreateBasicProperties();
                properties.Persistent = true;
                //properties.Headers = dictionary;
                //发送消息
                _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body);
            }
            catch (Exception ex)
            {
                _loggerHelper.LogInfo("发布消息异常", ex.Message);
                throw;
            }
        }

        /// <summary>
        /// 异步发布事件
        /// </summary>
        /// <returns></returns>
        public async Task PublishAsync(string queueName, object content)
        {
            try
            {
                var message = SwifterJsonSerializer.SerializeObject(content);
                confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message);
                var body = Encoding.Default.GetBytes(message);
                //队列持久化
                properties = _channel.CreateBasicProperties();
                properties.Persistent = true;
                //发送消息
                _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body);
                await Task.CompletedTask;
            }
            catch (Exception ex)
            {
                await _loggerHelper.LogInfoAsync("发布消息异常", ex.Message);
                throw;
            }
        }

    }
}