using log4net; using Microsoft.Extensions.Hosting; using RabbitMQ.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Ropin.Environmentally.AlarmService.Model; using RabbitMQ.Client.Events; using System.Net.Http; using System.Net; using Ropin.Inspection.Model; using Newtonsoft.Json; using Ropin.Inspection.Service; using Newtonsoft.Json.Linq; using System.Collections.Generic; using System.Text.RegularExpressions; using InfluxData.Net.Common.Enums; using InfluxData.Net.InfluxDb; using System.Linq; using InfluxData.Net.InfluxDb.Models; namespace Ropin.Environmentally.AlarmService.Service { public class RabbitMQReceiveService : IHostedService, IDisposable { private readonly IServiceProvider _provider; private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQReceiveService)); private readonly IHttpClientFactory httpClientFactory; private readonly CofingSetModel _cofingSetModel; private readonly RabbitMQModel _rabbitMQModel; private readonly IPushMsgService _pushMsgService; private InfluxDbClient clientDb; public RabbitMQReceiveService(IServiceProvider provider, CofingSetModel cofingSetModel, IHttpClientFactory httpClientFactory, IPushMsgService pushMsgService)//, IPushMsgService pushMsgService { this._provider = provider; _cofingSetModel = cofingSetModel; if (cofingSetModel != null) { _rabbitMQModel = cofingSetModel.RabbitMQ; } IniInflux(); this.httpClientFactory = httpClientFactory; _pushMsgService = pushMsgService; } private IConnection con; private IModel channel; public Task StartAsync(CancellationToken cancellationToken) { Task.Run(async () => { //TpushMsgModel msgModel = new TpushMsgModel(); //msgModel.C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf"; //msgModel.C_MsgTypeCode = "strs"; //msgModel.Subject = "str"; //string msgData = JsonConvert.SerializeObject(msgModel); //await WebScadaAlarmExecute(msgData); await AddRabbitMQ(); //string msg = "{\"C_DevStoreCode\":\"015ea688-0698-408c-94b7-0475b14a20fc\",\"C_MsgTypeCode\":\"MSG_TYPE_011\",\"Subject\":\"设备开启关闭报警\",\"Msg\":\"关闭\",\"DevNumber\":\"20230413\",\"DevName\":\"沸石转轮催化燃烧设备\",\"UserName\":\"设备\",\"UserMobile\":null,\"DevAddress\":null,\"DevOpsName\":null,\"CreateOn\":\"03/06/2025 16:38:43\",\"GenerationType\":1,\"msgStatus\":4}"; //bool result= await WebScadaAlarmExecute(msg); //while (true) //{ // await Task.Delay(5000);//10000:10s //} }); return Task.CompletedTask; } public async Task AddRabbitMQ() { try { var factory = new ConnectionFactory() { HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址 Port = _rabbitMQModel.Port,//5672,//端口号 UserName = _rabbitMQModel.UserName,//"guest",//用户账号 VirtualHost = _rabbitMQModel.VirtualHost,//"/", Password = _rabbitMQModel.Password,// "guest"//用户密码 }; if (con == null || con.IsOpen == false) { con = factory.CreateConnection();//创建连接对象 } if (channel == null || channel.IsOpen == false) { channel = con.CreateModel();//创建连接会话对象 } channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例 //声明队列 var queueName = channel.QueueDeclare( queue: _rabbitMQModel.QueueName, //消息队列名称 durable: true, //是否缓存 exclusive: false, autoDelete: false, arguments: null ).QueueName; channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey); //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 // 设置 prefetch_count 来控制批量读取的数量 ushort batchSize = 5000; channel.BasicQos(prefetchSize: 0, prefetchCount: batchSize, global: false); //channel.BasicQos(0, 1, false); channel.ConfirmSelect(); // 开启消息确认模式 //创建消费者对象 var consumer = new EventingBasicConsumer(channel); List batchDeliveryTags = new List(); List batchMessages = new List(); consumer.Received += async (model, ea) => { try { bool result = false; var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); log.Info("【RabbitMQ】" + message); batchMessages.Add(new Random().Next(0, 101).ToString()); batchDeliveryTags.Add(ea.DeliveryTag); if (batchMessages.Count >= batchSize) { //ProcessBatch(batchMessages); result = await AddData(batchMessages, batchDeliveryTags); Console.WriteLine(batchMessages.Count); batchMessages.Clear(); // 确认所有消息已被处理 //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true); } //if (result) //{ // // 确认消息已处理 // channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //} //else //{ // channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 //} } catch (Exception ex) { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message); } }; //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭; channel.BasicConsume(queue: _rabbitMQModel.QueueName, autoAck: false, consumer: consumer); } catch (Exception ex) { log.Info("【异常-RabbitMQ】" + ex.Message); } } private void IniInflux() { //连接InfluxDb的API地址、账号、密码 var infuxUrl = "http://124.71.132.255:8085/"; var infuxUser = "admin"; var infuxPwd = "123456"; //创建InfluxDbClient实例 clientDb = new InfluxDbClient(infuxUrl, infuxUser, infuxPwd, InfluxDbVersion.Latest); } /// /// 往InfluxDB中写入数据 /// public async Task AddData(List devs, List deliveryTags) { IList points = new List(); var dbName = "fanyidb"; foreach (string dev in devs) { try { //if (string.IsNullOrEmpty((string)dev["id"]) || string.IsNullOrEmpty((string)dev["value"]) || (string)dev["value"] == "null") // continue; //bool isDecimal = Regex.IsMatch((string)dev["value"], @"^\d+(\.\d+)?$");//true是非负数的整数和小数 //if ((string)dev["value"] == "NaN" || !isDecimal) //{ // continue; //} string strId = "1111111111"; string strDevSpotCode = "654654541321545"; float dValue = Convert.ToSingle(dev); string strName = "ceshi"; //string unitName = (string)dev["unitName"]; var point_model = new Point() { Name = "fanyidev",//表名 Tags = new Dictionary() { { "Id", strDevSpotCode }, //{ "Id", strId + "-" + strDevSpotCode }, //{ "code", strDevSpotCode }, { "name", strName } // , //{ "unitName", unitName } }, Fields = new Dictionary() { { "Val",dValue } }, Timestamp = DateTime.UtcNow //DateTime.Now }; points.Add(point_model); //从指定库中写入数据,支持传入多个对象的集合 //var response = await clientDb.Client.WriteAsync(point_model, dbName); } catch (Exception ex) { continue; } } if (points.Any()) { await clientDb.Client.WriteAsync(points, dbName); // 确认所有消息已被处理 ulong lastDeliveryTag = deliveryTags[^1]; channel.BasicAck(deliveryTag: lastDeliveryTag, multiple: true); } else return false; return true; } /// /// WebScada服务的设备报警 /// /// /// public async Task WebScadaAlarmExecute(string msgData) { bool result = false; try { var pushMsg = JsonConvert.DeserializeObject(msgData); Inspection.Common.Helper.RabbitMQModel rabbitMQModels =new Inspection.Common.Helper.RabbitMQModel(); rabbitMQModels.QueueName = "rab.video.record.mqtt"; //消息队列名称 rabbitMQModels.UserName = _rabbitMQModel.UserName; rabbitMQModels.Password = _rabbitMQModel.Password; rabbitMQModels.HostName = _rabbitMQModel.HostName; rabbitMQModels.Port = _rabbitMQModel.Port; rabbitMQModels.VirtualHost = _rabbitMQModel.VirtualHost; rabbitMQModels.ExchangeName = rabbitMQModels.QueueName+".DirectExchange"; rabbitMQModels.RoutingKey = rabbitMQModels.QueueName + ".key"; bool bols = await _pushMsgService.PushAlarmMsgAsync(pushMsg, pushMsg.Subject, rabbitMQModels); if (bols) { result = true; } } catch (Exception ex) { log.Info("【异常-WebScadaAlarmExecute】" + ex.Message); } return result; } /// /// WebScada服务的设备报警 /// /// /// public async Task WebScadaAlarmExecuteAPI(string msgData) { bool result = false; try { using (HttpClient httpClient = httpClientFactory.CreateClient()) { var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(_cofingSetModel.PublicPushMessageAPI), Content = new StringContent(msgData, Encoding.UTF8, "application/json") }; var response = await httpClient.SendAsync(httpRequestMessage); string responseResult = await response.Content.ReadAsStringAsync(); if (response.StatusCode != HttpStatusCode.OK) { log.Info($"【错误-WebScadaAlarmExecute】WebScada服务的设备报警 发送数据失败【{msgData}】"); } else { //"{\"code\":-1001,\"message\":\"参数错误。\"}" receiveModel resultReturn = JsonConvert.DeserializeObject(responseResult); if (resultReturn.code == "0") { result = true; } } } } catch (Exception ex) { log.Info("【异常-WebScadaAlarmExecute】" + ex.Message); } return result; } public Task StopAsync(CancellationToken cancellationToken) { Dispose(); return Task.CompletedTask; } public void Dispose() { if (channel!=null) { channel.Close(); } if (con!=null) { con.Close(); } } } public class receiveModel { public string code { get; set; } public string message { get; set; } } }