RabbitMQReceiveService.cs 9.6 KB


  1. using log4net;
  2. using Microsoft.Extensions.Hosting;
  3. using RabbitMQ.Client;
  4. using System;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Ropin.Environmentally.AlarmService.Model;
  9. using RabbitMQ.Client.Events;
  10. using System.Net.Http;
  11. using System.Net;
  12. using Ropin.Inspection.Model;
  13. using Newtonsoft.Json;
  14. using Ropin.Inspection.Service;
  15. namespace Ropin.Environmentally.AlarmService.Service
  16. {
  17. public class RabbitMQReceiveService : IHostedService, IDisposable
  18. {
  19. private readonly IServiceProvider _provider;
  20. private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQReceiveService));
  21. private readonly IHttpClientFactory httpClientFactory;
  22. private readonly CofingSetModel _cofingSetModel;
  23. private readonly RabbitMQModel _rabbitMQModel;
  24. private readonly IPushMsgService _pushMsgService;
  25. public RabbitMQReceiveService(IServiceProvider provider, CofingSetModel cofingSetModel, IHttpClientFactory httpClientFactory, IPushMsgService pushMsgService)//, IPushMsgService pushMsgService
  26. {
  27. this._provider = provider;
  28. _cofingSetModel = cofingSetModel;
  29. if (cofingSetModel != null)
  30. {
  31. _rabbitMQModel = cofingSetModel.RabbitMQ;
  32. }
  33. this.httpClientFactory = httpClientFactory;
  34. _pushMsgService = pushMsgService;
  35. }
  36. private IConnection con;
  37. private IModel channel;
  38. public Task StartAsync(CancellationToken cancellationToken)
  39. {
  40. Task.Run(async () =>
  41. {
  42. //TpushMsgModel msgModel = new TpushMsgModel();
  43. //msgModel.C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf";
  44. //msgModel.C_MsgTypeCode = "strs";
  45. //msgModel.Subject = "str";
  46. //string msgData = JsonConvert.SerializeObject(msgModel);
  47. //await WebScadaAlarmExecute(msgData);
  48. await AddRabbitMQ();
  49. //string msg = "{\"C_DevStoreCode\":\"015ea688-0698-408c-94b7-0475b14a20fc\",\"C_MsgTypeCode\":\"MSG_TYPE_011\",\"Subject\":\"设备开启关闭报警\",\"Msg\":\"关闭\",\"DevNumber\":\"20230413\",\"DevName\":\"沸石转轮催化燃烧设备\",\"UserName\":\"设备\",\"UserMobile\":null,\"DevAddress\":null,\"DevOpsName\":null,\"CreateOn\":\"03/06/2025 16:38:43\",\"GenerationType\":1,\"msgStatus\":4}";
  50. //bool result= await WebScadaAlarmExecute(msg);
  51. //while (true)
  52. //{
  53. // await Task.Delay(5000);//10000:10s
  54. //}
  55. });
  56. return Task.CompletedTask;
  57. }
  58. public async Task AddRabbitMQ()
  59. {
  60. try
  61. {
  62. var factory = new ConnectionFactory()
  63. {
  64. HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址
  65. Port = _rabbitMQModel.Port,//5672,//端口号
  66. UserName = _rabbitMQModel.UserName,//"guest",//用户账号
  67. VirtualHost = _rabbitMQModel.VirtualHost,//"/",
  68. Password = _rabbitMQModel.Password,// "guest"//用户密码
  69. };
  70. if (con == null || con.IsOpen == false)
  71. {
  72. con = factory.CreateConnection();//创建连接对象
  73. }
  74. if (channel == null || channel.IsOpen == false)
  75. {
  76. channel = con.CreateModel();//创建连接会话对象
  77. }
  78. channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例
  79. //声明队列
  80. var queueName = channel.QueueDeclare(
  81. queue: _rabbitMQModel.QueueName, //消息队列名称
  82. durable: false, //是否缓存
  83. exclusive: false,
  84. autoDelete: false,
  85. arguments: null
  86. ).QueueName;
  87. channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey);
  88. //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
  89. channel.BasicQos(0, 1, false);
  90. channel.ConfirmSelect(); // 开启消息确认模式
  91. //创建消费者对象
  92. var consumer = new EventingBasicConsumer(channel);
  93. consumer.Received += async (model, ea) =>
  94. {
  95. try
  96. {
  97. var body = ea.Body.ToArray();
  98. var message = Encoding.UTF8.GetString(body);
  99. log.Info("【RabbitMQ】" + message);
  100. bool result = await WebScadaAlarmExecute(message);
  101. if (result)
  102. {
  103. // 确认消息已处理
  104. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  105. }
  106. else
  107. {
  108. channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现
  109. }
  110. }
  111. catch (Exception ex)
  112. {
  113. channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现
  114. log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message);
  115. }
  116. };
  117. //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭;
  118. channel.BasicConsume(queue: _rabbitMQModel.QueueName, autoAck: false, consumer: consumer);
  119. }
  120. catch (Exception ex)
  121. {
  122. log.Info("【异常-RabbitMQ】" + ex.Message);
  123. }
  124. }
  125. /// <summary>
  126. /// WebScada服务的设备报警
  127. /// </summary>
  128. /// <param name="msgData"></param>
  129. /// <returns></returns>
  130. public async Task<bool> WebScadaAlarmExecute(string msgData)
  131. {
  132. bool result = false;
  133. try
  134. {
  135. var pushMsg = JsonConvert.DeserializeObject<TpushMsgModel>(msgData);
  136. Inspection.Common.Helper.RabbitMQModel rabbitMQModels =new Inspection.Common.Helper.RabbitMQModel();
  137. rabbitMQModels.QueueName = "rab.video.record.mqtt"; //消息队列名称
  138. rabbitMQModels.UserName = _rabbitMQModel.UserName;
  139. rabbitMQModels.Password = _rabbitMQModel.Password;
  140. rabbitMQModels.HostName = _rabbitMQModel.HostName;
  141. rabbitMQModels.Port = _rabbitMQModel.Port;
  142. rabbitMQModels.VirtualHost = _rabbitMQModel.VirtualHost;
  143. rabbitMQModels.ExchangeName = rabbitMQModels.QueueName+".DirectExchange";
  144. rabbitMQModels.RoutingKey = rabbitMQModels.QueueName + ".key";
  145. bool bols = await _pushMsgService.PushAlarmMsgAsync(pushMsg, pushMsg.Subject, rabbitMQModels);
  146. if (bols)
  147. {
  148. result = true;
  149. }
  150. }
  151. catch (Exception ex)
  152. {
  153. log.Info("【异常-WebScadaAlarmExecute】" + ex.Message);
  154. }
  155. return result;
  156. }
  157. /// <summary>
  158. /// WebScada服务的设备报警
  159. /// </summary>
  160. /// <param name="msgData"></param>
  161. /// <returns></returns>
  162. public async Task<bool> WebScadaAlarmExecuteAPI(string msgData)
  163. {
  164. bool result = false;
  165. try
  166. {
  167. using (HttpClient httpClient = httpClientFactory.CreateClient())
  168. {
  169. var httpRequestMessage = new HttpRequestMessage
  170. {
  171. Method = HttpMethod.Post,
  172. RequestUri = new Uri(_cofingSetModel.PublicPushMessageAPI),
  173. Content = new StringContent(msgData, Encoding.UTF8, "application/json")
  174. };
  175. var response = await httpClient.SendAsync(httpRequestMessage);
  176. string responseResult = await response.Content.ReadAsStringAsync();
  177. if (response.StatusCode != HttpStatusCode.OK)
  178. {
  179. log.Info($"【错误-WebScadaAlarmExecute】WebScada服务的设备报警 发送数据失败【{msgData}】");
  180. }
  181. else
  182. {
  183. //"{\"code\":-1001,\"message\":\"参数错误。\"}"
  184. receiveModel resultReturn = JsonConvert.DeserializeObject<receiveModel>(responseResult);
  185. if (resultReturn.code == "0") { result = true; }
  186. }
  187. }
  188. }
  189. catch (Exception ex)
  190. {
  191. log.Info("【异常-WebScadaAlarmExecute】" + ex.Message);
  192. }
  193. return result;
  194. }
  195. public Task StopAsync(CancellationToken cancellationToken)
  196. {
  197. Dispose();
  198. return Task.CompletedTask;
  199. }
  200. public void Dispose()
  201. {
  202. if (channel!=null)
  203. {
  204. channel.Close();
  205. }
  206. if (con!=null)
  207. {
  208. con.Close();
  209. }
  210. }
  211. }
  212. public class receiveModel
  213. {
  214. public string code { get; set; }
  215. public string message { get; set; }
  216. }
  217. }