RabbitMQReceiveService.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. using log4net;
  2. using Microsoft.Extensions.Hosting;
  3. using RabbitMQ.Client;
  4. using System;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Ropin.Environmentally.AlarmService.Model;
  9. using RabbitMQ.Client.Events;
  10. using System.Net.Http;
  11. using System.Net;
  12. using Ropin.Inspection.Model;
  13. using Newtonsoft.Json;
  14. using Ropin.Inspection.Service;
  15. using Newtonsoft.Json.Linq;
  16. using System.Collections.Generic;
  17. using System.Text.RegularExpressions;
  18. using InfluxData.Net.Common.Enums;
  19. using InfluxData.Net.InfluxDb;
  20. using System.Linq;
  21. using InfluxData.Net.InfluxDb.Models;
  22. namespace Ropin.Environmentally.AlarmService.Service
  23. {
  24. public class RabbitMQReceiveService : IHostedService, IDisposable
  25. {
  26. private readonly IServiceProvider _provider;
  27. private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQReceiveService));
  28. private readonly IHttpClientFactory httpClientFactory;
  29. private readonly CofingSetModel _cofingSetModel;
  30. private readonly RabbitMQModel _rabbitMQModel;
  31. private readonly IPushMsgService _pushMsgService;
  32. private InfluxDbClient clientDb;
  33. public RabbitMQReceiveService(IServiceProvider provider, CofingSetModel cofingSetModel, IHttpClientFactory httpClientFactory, IPushMsgService pushMsgService)//, IPushMsgService pushMsgService
  34. {
  35. this._provider = provider;
  36. _cofingSetModel = cofingSetModel;
  37. if (cofingSetModel != null)
  38. {
  39. _rabbitMQModel = cofingSetModel.RabbitMQ;
  40. }
  41. IniInflux();
  42. this.httpClientFactory = httpClientFactory;
  43. _pushMsgService = pushMsgService;
  44. }
  45. private IConnection con;
  46. private IModel channel;
  47. public Task StartAsync(CancellationToken cancellationToken)
  48. {
  49. Task.Run(async () =>
  50. {
  51. //TpushMsgModel msgModel = new TpushMsgModel();
  52. //msgModel.C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf";
  53. //msgModel.C_MsgTypeCode = "strs";
  54. //msgModel.Subject = "str";
  55. //string msgData = JsonConvert.SerializeObject(msgModel);
  56. //await WebScadaAlarmExecute(msgData);
  57. await AddRabbitMQ();
  58. //string msg = "{\"C_DevStoreCode\":\"015ea688-0698-408c-94b7-0475b14a20fc\",\"C_MsgTypeCode\":\"MSG_TYPE_011\",\"Subject\":\"设备开启关闭报警\",\"Msg\":\"关闭\",\"DevNumber\":\"20230413\",\"DevName\":\"沸石转轮催化燃烧设备\",\"UserName\":\"设备\",\"UserMobile\":null,\"DevAddress\":null,\"DevOpsName\":null,\"CreateOn\":\"03/06/2025 16:38:43\",\"GenerationType\":1,\"msgStatus\":4}";
  59. //bool result= await WebScadaAlarmExecute(msg);
  60. //while (true)
  61. //{
  62. // await Task.Delay(5000);//10000:10s
  63. //}
  64. });
  65. return Task.CompletedTask;
  66. }
  67. public async Task AddRabbitMQ()
  68. {
  69. try
  70. {
  71. var factory = new ConnectionFactory()
  72. {
  73. HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址
  74. Port = _rabbitMQModel.Port,//5672,//端口号
  75. UserName = _rabbitMQModel.UserName,//"guest",//用户账号
  76. VirtualHost = _rabbitMQModel.VirtualHost,//"/",
  77. Password = _rabbitMQModel.Password,// "guest"//用户密码
  78. };
  79. if (con == null || con.IsOpen == false)
  80. {
  81. con = factory.CreateConnection();//创建连接对象
  82. }
  83. if (channel == null || channel.IsOpen == false)
  84. {
  85. channel = con.CreateModel();//创建连接会话对象
  86. }
  87. channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例
  88. //声明队列
  89. var queueName = channel.QueueDeclare(
  90. queue: _rabbitMQModel.QueueName, //消息队列名称
  91. durable: true, //是否缓存
  92. exclusive: false,
  93. autoDelete: false,
  94. arguments: null
  95. ).QueueName;
  96. channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey);
  97. //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
  98. // 设置 prefetch_count 来控制批量读取的数量
  99. ushort batchSize = 5000;
  100. channel.BasicQos(prefetchSize: 0, prefetchCount: batchSize, global: false);
  101. //channel.BasicQos(0, 1, false);
  102. channel.ConfirmSelect(); // 开启消息确认模式
  103. //创建消费者对象
  104. var consumer = new EventingBasicConsumer(channel);
  105. List<ulong> batchDeliveryTags = new List<ulong>();
  106. List<string> batchMessages = new List<string>();
  107. consumer.Received += async (model, ea) =>
  108. {
  109. try
  110. {
  111. bool result = false;
  112. var body = ea.Body.ToArray();
  113. var message = Encoding.UTF8.GetString(body);
  114. log.Info("【RabbitMQ】" + message);
  115. batchMessages.Add(new Random().Next(0, 101).ToString());
  116. batchDeliveryTags.Add(ea.DeliveryTag);
  117. if (batchMessages.Count >= batchSize)
  118. {
  119. //ProcessBatch(batchMessages);
  120. result = await AddData(batchMessages, batchDeliveryTags);
  121. Console.WriteLine(batchMessages.Count);
  122. batchMessages.Clear();
  123. // 确认所有消息已被处理
  124. //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
  125. }
  126. //if (result)
  127. //{
  128. // // 确认消息已处理
  129. // channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  130. //}
  131. //else
  132. //{
  133. // channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现
  134. //}
  135. }
  136. catch (Exception ex)
  137. {
  138. channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现
  139. log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message);
  140. }
  141. };
  142. //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭;
  143. channel.BasicConsume(queue: _rabbitMQModel.QueueName, autoAck: false, consumer: consumer);
  144. }
  145. catch (Exception ex)
  146. {
  147. log.Info("【异常-RabbitMQ】" + ex.Message);
  148. }
  149. }
  150. private void IniInflux()
  151. {
  152. //连接InfluxDb的API地址、账号、密码
  153. var infuxUrl = "http://124.71.132.255:8085/";
  154. var infuxUser = "admin";
  155. var infuxPwd = "123456";
  156. //创建InfluxDbClient实例
  157. clientDb = new InfluxDbClient(infuxUrl, infuxUser, infuxPwd, InfluxDbVersion.Latest);
  158. }
  159. /// <summary>
  160. /// 往InfluxDB中写入数据
  161. /// </summary>
  162. public async Task<bool> AddData(List<string> devs, List<ulong> deliveryTags)
  163. {
  164. IList<Point> points = new List<Point>();
  165. var dbName = "fanyidb";
  166. foreach (string dev in devs)
  167. {
  168. try
  169. {
  170. //if (string.IsNullOrEmpty((string)dev["id"]) || string.IsNullOrEmpty((string)dev["value"]) || (string)dev["value"] == "null")
  171. // continue;
  172. //bool isDecimal = Regex.IsMatch((string)dev["value"], @"^\d+(\.\d+)?$");//true是非负数的整数和小数
  173. //if ((string)dev["value"] == "NaN" || !isDecimal)
  174. //{
  175. // continue;
  176. //}
  177. string strId = "1111111111";
  178. string strDevSpotCode = "654654541321545";
  179. float dValue = Convert.ToSingle(dev);
  180. string strName = "ceshi";
  181. //string unitName = (string)dev["unitName"];
  182. var point_model = new Point()
  183. {
  184. Name = "fanyidev",//表名
  185. Tags = new Dictionary<string, object>()
  186. {
  187. { "Id", strDevSpotCode },
  188. //{ "Id", strId + "-" + strDevSpotCode },
  189. //{ "code", strDevSpotCode },
  190. { "name", strName }
  191. // ,
  192. //{ "unitName", unitName }
  193. },
  194. Fields = new Dictionary<string, object>()
  195. {
  196. { "Val",dValue }
  197. },
  198. Timestamp = DateTime.UtcNow //DateTime.Now
  199. };
  200. points.Add(point_model);
  201. //从指定库中写入数据,支持传入多个对象的集合
  202. //var response = await clientDb.Client.WriteAsync(point_model, dbName);
  203. }
  204. catch (Exception ex)
  205. {
  206. continue;
  207. }
  208. }
  209. if (points.Any())
  210. {
  211. await clientDb.Client.WriteAsync(points, dbName);
  212. // 确认所有消息已被处理
  213. ulong lastDeliveryTag = deliveryTags[^1];
  214. channel.BasicAck(deliveryTag: lastDeliveryTag, multiple: true);
  215. }
  216. else
  217. return false;
  218. return true;
  219. }
  220. /// <summary>
  221. /// WebScada服务的设备报警
  222. /// </summary>
  223. /// <param name="msgData"></param>
  224. /// <returns></returns>
  225. public async Task<bool> WebScadaAlarmExecute(string msgData)
  226. {
  227. bool result = false;
  228. try
  229. {
  230. var pushMsg = JsonConvert.DeserializeObject<TpushMsgModel>(msgData);
  231. Inspection.Common.Helper.RabbitMQModel rabbitMQModels =new Inspection.Common.Helper.RabbitMQModel();
  232. rabbitMQModels.QueueName = "rab.video.record.mqtt"; //消息队列名称
  233. rabbitMQModels.UserName = _rabbitMQModel.UserName;
  234. rabbitMQModels.Password = _rabbitMQModel.Password;
  235. rabbitMQModels.HostName = _rabbitMQModel.HostName;
  236. rabbitMQModels.Port = _rabbitMQModel.Port;
  237. rabbitMQModels.VirtualHost = _rabbitMQModel.VirtualHost;
  238. rabbitMQModels.ExchangeName = rabbitMQModels.QueueName+".DirectExchange";
  239. rabbitMQModels.RoutingKey = rabbitMQModels.QueueName + ".key";
  240. bool bols = await _pushMsgService.PushAlarmMsgAsync(pushMsg, pushMsg.Subject, rabbitMQModels);
  241. if (bols)
  242. {
  243. result = true;
  244. }
  245. }
  246. catch (Exception ex)
  247. {
  248. log.Info("【异常-WebScadaAlarmExecute】" + ex.Message);
  249. }
  250. return result;
  251. }
  252. /// <summary>
  253. /// WebScada服务的设备报警
  254. /// </summary>
  255. /// <param name="msgData"></param>
  256. /// <returns></returns>
  257. public async Task<bool> WebScadaAlarmExecuteAPI(string msgData)
  258. {
  259. bool result = false;
  260. try
  261. {
  262. using (HttpClient httpClient = httpClientFactory.CreateClient())
  263. {
  264. var httpRequestMessage = new HttpRequestMessage
  265. {
  266. Method = HttpMethod.Post,
  267. RequestUri = new Uri(_cofingSetModel.PublicPushMessageAPI),
  268. Content = new StringContent(msgData, Encoding.UTF8, "application/json")
  269. };
  270. var response = await httpClient.SendAsync(httpRequestMessage);
  271. string responseResult = await response.Content.ReadAsStringAsync();
  272. if (response.StatusCode != HttpStatusCode.OK)
  273. {
  274. log.Info($"【错误-WebScadaAlarmExecute】WebScada服务的设备报警 发送数据失败【{msgData}】");
  275. }
  276. else
  277. {
  278. //"{\"code\":-1001,\"message\":\"参数错误。\"}"
  279. receiveModel resultReturn = JsonConvert.DeserializeObject<receiveModel>(responseResult);
  280. if (resultReturn.code == "0") { result = true; }
  281. }
  282. }
  283. }
  284. catch (Exception ex)
  285. {
  286. log.Info("【异常-WebScadaAlarmExecute】" + ex.Message);
  287. }
  288. return result;
  289. }
  290. public Task StopAsync(CancellationToken cancellationToken)
  291. {
  292. Dispose();
  293. return Task.CompletedTask;
  294. }
  295. public void Dispose()
  296. {
  297. if (channel!=null)
  298. {
  299. channel.Close();
  300. }
  301. if (con!=null)
  302. {
  303. con.Close();
  304. }
  305. }
  306. }
  307. public class receiveModel
  308. {
  309. public string code { get; set; }
  310. public string message { get; set; }
  311. }
  312. }