using log4net; using Microsoft.Extensions.Hosting; using RabbitMQ.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Ropin.Environmentally.AlarmService.Model; using RabbitMQ.Client.Events; using System.Net.Http; using System.Net; using Ropin.Inspection.Model; using Newtonsoft.Json; using Ropin.Inspection.Service; namespace Ropin.Environmentally.AlarmService.Service { public class RabbitMQReceiveService : IHostedService, IDisposable { private readonly IServiceProvider _provider; private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQReceiveService)); private readonly IHttpClientFactory httpClientFactory; private readonly CofingSetModel _cofingSetModel; private readonly RabbitMQModel _rabbitMQModel; private readonly IPushMsgService _pushMsgService; public RabbitMQReceiveService(IServiceProvider provider, CofingSetModel cofingSetModel, IHttpClientFactory httpClientFactory, IPushMsgService pushMsgService)//, IPushMsgService pushMsgService { this._provider = provider; _cofingSetModel = cofingSetModel; if (cofingSetModel != null) { _rabbitMQModel = cofingSetModel.RabbitMQ; } this.httpClientFactory = httpClientFactory; _pushMsgService = pushMsgService; } private IConnection con; private IModel channel; public Task StartAsync(CancellationToken cancellationToken) { Task.Run(async () => { //TpushMsgModel msgModel = new TpushMsgModel(); //msgModel.C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf"; //msgModel.C_MsgTypeCode = "strs"; //msgModel.Subject = "str"; //string msgData = JsonConvert.SerializeObject(msgModel); //await WebScadaAlarmExecute(msgData); await AddRabbitMQ(); //string msg = "{\"C_DevStoreCode\":\"015ea688-0698-408c-94b7-0475b14a20fc\",\"C_MsgTypeCode\":\"MSG_TYPE_011\",\"Subject\":\"设备开启关闭报警\",\"Msg\":\"关闭\",\"DevNumber\":\"20230413\",\"DevName\":\"沸石转轮催化燃烧设备\",\"UserName\":\"设备\",\"UserMobile\":null,\"DevAddress\":null,\"DevOpsName\":null,\"CreateOn\":\"03/06/2025 16:38:43\",\"GenerationType\":1,\"msgStatus\":4}"; //bool result= await WebScadaAlarmExecute(msg); //while (true) //{ // await Task.Delay(5000);//10000:10s //} }); return Task.CompletedTask; } public async Task AddRabbitMQ() { try { var factory = new ConnectionFactory() { HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址 Port = _rabbitMQModel.Port,//5672,//端口号 UserName = _rabbitMQModel.UserName,//"guest",//用户账号 VirtualHost = _rabbitMQModel.VirtualHost,//"/", Password = _rabbitMQModel.Password,// "guest"//用户密码 }; if (con == null || con.IsOpen == false) { con = factory.CreateConnection();//创建连接对象 } if (channel == null || channel.IsOpen == false) { channel = con.CreateModel();//创建连接会话对象 } channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例 //声明队列 var queueName = channel.QueueDeclare( queue: _rabbitMQModel.QueueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ).QueueName; channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey); //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 channel.BasicQos(0, 1, false); channel.ConfirmSelect(); // 开启消息确认模式 //创建消费者对象 var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); log.Info("【RabbitMQ】" + message); bool result = await WebScadaAlarmExecute(message); if (result) { // 确认消息已处理 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } else { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 } } catch (Exception ex) { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message); } }; //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭; channel.BasicConsume(queue: _rabbitMQModel.QueueName, autoAck: false, consumer: consumer); } catch (Exception ex) { log.Info("【异常-RabbitMQ】" + ex.Message); } } /// /// WebScada服务的设备报警 /// /// /// public async Task WebScadaAlarmExecute(string msgData) { bool result = false; try { var pushMsg = JsonConvert.DeserializeObject(msgData); Inspection.Common.Helper.RabbitMQModel rabbitMQModels =new Inspection.Common.Helper.RabbitMQModel(); rabbitMQModels.QueueName = "rab.video.record.mqtt"; //消息队列名称 rabbitMQModels.UserName = _rabbitMQModel.UserName; rabbitMQModels.Password = _rabbitMQModel.Password; rabbitMQModels.HostName = _rabbitMQModel.HostName; rabbitMQModels.Port = _rabbitMQModel.Port; rabbitMQModels.VirtualHost = _rabbitMQModel.VirtualHost; rabbitMQModels.ExchangeName = rabbitMQModels.QueueName+".DirectExchange"; rabbitMQModels.RoutingKey = rabbitMQModels.QueueName + ".key"; bool bols = await _pushMsgService.PushAlarmMsgAsync(pushMsg, pushMsg.Subject, rabbitMQModels); if (bols) { result = true; } } catch (Exception ex) { log.Info("【异常-WebScadaAlarmExecute】" + ex.Message); } return result; } /// /// WebScada服务的设备报警 /// /// /// public async Task WebScadaAlarmExecuteAPI(string msgData) { bool result = false; try { using (HttpClient httpClient = httpClientFactory.CreateClient()) { var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(_cofingSetModel.PublicPushMessageAPI), Content = new StringContent(msgData, Encoding.UTF8, "application/json") }; var response = await httpClient.SendAsync(httpRequestMessage); string responseResult = await response.Content.ReadAsStringAsync(); if (response.StatusCode != HttpStatusCode.OK) { log.Info($"【错误-WebScadaAlarmExecute】WebScada服务的设备报警 发送数据失败【{msgData}】"); } else { //"{\"code\":-1001,\"message\":\"参数错误。\"}" receiveModel resultReturn = JsonConvert.DeserializeObject(responseResult); if (resultReturn.code == "0") { result = true; } } } } catch (Exception ex) { log.Info("【异常-WebScadaAlarmExecute】" + ex.Message); } return result; } public Task StopAsync(CancellationToken cancellationToken) { Dispose(); return Task.CompletedTask; } public void Dispose() { if (channel!=null) { channel.Close(); } if (con!=null) { con.Close(); } } } public class receiveModel { public string code { get; set; } public string message { get; set; } } }