using log4net;
using Microsoft.Extensions.Hosting;
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Ropin.Environmentally.AlarmService.Model;
using RabbitMQ.Client.Events;
using System.Net.Http;
using System.Net;
using Ropin.Inspection.Model;
using Newtonsoft.Json;
using Ropin.Inspection.Service;
namespace Ropin.Environmentally.AlarmService.Service
{
public class RabbitMQReceiveService : IHostedService, IDisposable
{
private readonly IServiceProvider _provider;
private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQReceiveService));
private readonly IHttpClientFactory httpClientFactory;
private readonly CofingSetModel _cofingSetModel;
private readonly RabbitMQModel _rabbitMQModel;
private readonly IPushMsgService _pushMsgService;
public RabbitMQReceiveService(IServiceProvider provider, CofingSetModel cofingSetModel, IHttpClientFactory httpClientFactory, IPushMsgService pushMsgService)//, IPushMsgService pushMsgService
{
this._provider = provider;
_cofingSetModel = cofingSetModel;
if (cofingSetModel != null)
{
_rabbitMQModel = cofingSetModel.RabbitMQ;
}
this.httpClientFactory = httpClientFactory;
_pushMsgService = pushMsgService;
}
private IConnection con;
private IModel channel;
public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(async () =>
{
//TpushMsgModel msgModel = new TpushMsgModel();
//msgModel.C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf";
//msgModel.C_MsgTypeCode = "strs";
//msgModel.Subject = "str";
//string msgData = JsonConvert.SerializeObject(msgModel);
//await WebScadaAlarmExecute(msgData);
await AddRabbitMQ();
//string msg = "{\"C_DevStoreCode\":\"015ea688-0698-408c-94b7-0475b14a20fc\",\"C_MsgTypeCode\":\"MSG_TYPE_011\",\"Subject\":\"设备开启关闭报警\",\"Msg\":\"关闭\",\"DevNumber\":\"20230413\",\"DevName\":\"沸石转轮催化燃烧设备\",\"UserName\":\"设备\",\"UserMobile\":null,\"DevAddress\":null,\"DevOpsName\":null,\"CreateOn\":\"03/06/2025 16:38:43\",\"GenerationType\":1,\"msgStatus\":4}";
//bool result= await WebScadaAlarmExecute(msg);
//while (true)
//{
// await Task.Delay(5000);//10000:10s
//}
});
return Task.CompletedTask;
}
public async Task AddRabbitMQ()
{
try
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址
Port = _rabbitMQModel.Port,//5672,//端口号
UserName = _rabbitMQModel.UserName,//"guest",//用户账号
VirtualHost = _rabbitMQModel.VirtualHost,//"/",
Password = _rabbitMQModel.Password,// "guest"//用户密码
};
if (con == null || con.IsOpen == false)
{
con = factory.CreateConnection();//创建连接对象
}
if (channel == null || channel.IsOpen == false)
{
channel = con.CreateModel();//创建连接会话对象
}
channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例
//声明队列
var queueName = channel.QueueDeclare(
queue: _rabbitMQModel.QueueName, //消息队列名称
durable: false, //是否缓存
exclusive: false,
autoDelete: false,
arguments: null
).QueueName;
channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey);
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
channel.BasicQos(0, 1, false);
channel.ConfirmSelect(); // 开启消息确认模式
//创建消费者对象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
log.Info("【RabbitMQ】" + message);
bool result = await WebScadaAlarmExecute(message);
if (result)
{
// 确认消息已处理
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现
}
}
catch (Exception ex)
{
channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现
log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message);
}
};
//消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭;
channel.BasicConsume(queue: _rabbitMQModel.QueueName, autoAck: false, consumer: consumer);
}
catch (Exception ex)
{
log.Info("【异常-RabbitMQ】" + ex.Message);
}
}
///
/// WebScada服务的设备报警
///
///
///
public async Task WebScadaAlarmExecute(string msgData)
{
bool result = false;
try
{
var pushMsg = JsonConvert.DeserializeObject(msgData);
Inspection.Common.Helper.RabbitMQModel rabbitMQModels =new Inspection.Common.Helper.RabbitMQModel();
rabbitMQModels.QueueName = "rab.video.record.mqtt"; //消息队列名称
rabbitMQModels.UserName = _rabbitMQModel.UserName;
rabbitMQModels.Password = _rabbitMQModel.Password;
rabbitMQModels.HostName = _rabbitMQModel.HostName;
rabbitMQModels.Port = _rabbitMQModel.Port;
rabbitMQModels.VirtualHost = _rabbitMQModel.VirtualHost;
rabbitMQModels.ExchangeName = rabbitMQModels.QueueName+".DirectExchange";
rabbitMQModels.RoutingKey = rabbitMQModels.QueueName + ".key";
bool bols = await _pushMsgService.PushAlarmMsgAsync(pushMsg, pushMsg.Subject, rabbitMQModels);
if (bols)
{
result = true;
}
}
catch (Exception ex)
{
log.Info("【异常-WebScadaAlarmExecute】" + ex.Message);
}
return result;
}
///
/// WebScada服务的设备报警
///
///
///
public async Task WebScadaAlarmExecuteAPI(string msgData)
{
bool result = false;
try
{
using (HttpClient httpClient = httpClientFactory.CreateClient())
{
var httpRequestMessage = new HttpRequestMessage
{
Method = HttpMethod.Post,
RequestUri = new Uri(_cofingSetModel.PublicPushMessageAPI),
Content = new StringContent(msgData, Encoding.UTF8, "application/json")
};
var response = await httpClient.SendAsync(httpRequestMessage);
string responseResult = await response.Content.ReadAsStringAsync();
if (response.StatusCode != HttpStatusCode.OK)
{
log.Info($"【错误-WebScadaAlarmExecute】WebScada服务的设备报警 发送数据失败【{msgData}】");
}
else
{
//"{\"code\":-1001,\"message\":\"参数错误。\"}"
receiveModel resultReturn = JsonConvert.DeserializeObject(responseResult);
if (resultReturn.code == "0") { result = true; }
}
}
}
catch (Exception ex)
{
log.Info("【异常-WebScadaAlarmExecute】" + ex.Message);
}
return result;
}
public Task StopAsync(CancellationToken cancellationToken)
{
Dispose();
return Task.CompletedTask;
}
public void Dispose()
{
if (channel!=null)
{
channel.Close();
}
if (con!=null)
{
con.Close();
}
}
}
public class receiveModel
{
public string code { get; set; }
public string message { get; set; }
}
}