using FBoxClientDriver.Contract; using InfluxData.Net.InfluxDb; using log4net; using Microsoft.AspNet.SignalR.Client; using Microsoft.AspNetCore.Connections; using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Ropin.Environmentally.DcsService.AppSetingModel; using Ropin.Inspection.Model.Entities; using Ropin.Inspection.Model.SearchModel.DEV; using Ropin.Inspection.Repository; using Ropin.Inspection.Repository.DEV.Interface; using System; using System.Runtime.Intrinsics.Arm; using System.Text; using System.Threading; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using static System.Net.WebRequestMethods; using Ropin.Inspection.Common.Helper; using Ropin.Inspection.Model.ViewModel.DEV; using System.Collections.Generic; using Ropin.Inspection.Repository.LGS.Interface; using System.Net.Http; using System.Net.Sockets; using System.Net; using FBoxClientDriver.Contract.Entity; using System.IO; using System.Linq; using System.IO.Pipelines; using Ubiety.Dns.Core; using Microsoft.AspNet.SignalR.Client.Http; using Renci.SshNet; using static Zstandard.Net.ZstandardInterop; using System.Net.Http.Headers; using LinqKit; using Ropin.Environmentally.DcsService.Service.Model; namespace Ropin.Environmentally.DcsService.Service { public class RabbitMQReceiveService : IHostedService, IDisposable { private readonly IServiceProvider _provider; private InfluxDbClient clientDb; private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQReceiveService)); private readonly RabbitMQModel _rabbitMQModel; private readonly IniInfluxDB _IniInfluxData; private readonly CmdModel _cmdModel; public RabbitMQReceiveService(IServiceProvider provider, RabbitMQModel rabbitMQModel, IniInfluxDB IniInfluxData, CmdModel cmdModel) { this._provider = provider; _rabbitMQModel = rabbitMQModel; _IniInfluxData = IniInfluxData; _cmdModel = cmdModel; } private RabbitMQ.Client.IConnection con; private RabbitMQ.Client.IModel channel; public Task StartAsync(CancellationToken cancellationToken) { Task.Run(async () => { //await MQTTService("10708f9b-79fe-4fb2-9c5f-22b78993d233", "{\"DO1\":1}"); await AddRabbitMQ(); //while (true) //{ // await Task.Delay(5000);//10000:10s //} }); return Task.CompletedTask; } public async Task AddRabbitMQ() { try { var factory = new ConnectionFactory() { HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址 Port = _rabbitMQModel.Port,//5672,//端口号 UserName = _rabbitMQModel.UserName,//"guest",//用户账号 VirtualHost = _rabbitMQModel.VirtualHost,//"/", Password = _rabbitMQModel.Password,// "guest"//用户密码 }; if (con == null || con.IsOpen == false) { con = factory.CreateConnection();//创建连接对象 } if (channel == null || channel.IsOpen == false) { channel = con.CreateModel();//创建连接会话对象 } channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例 //声明队列 var queueName = channel.QueueDeclare( queue: _rabbitMQModel.QueueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ).QueueName; channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey); //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 channel.BasicQos(0, 1, false); channel.ConfirmSelect(); // 开启消息确认模式 //创建消费者对象 var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); log.Info("【RabbitMQ】" + message); var ReceiveData = JsonConvert.DeserializeObject>(message); ReceiveData = ReceiveData.OrderBy(x => x.sort).ToList(); bool result= await CmdExecute(ReceiveData); if (result) { // 确认消息已处理 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } else { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 } } catch (Exception ex) { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message); } }; //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } catch (Exception ex) { log.Info("【异常-RabbitMQ】" + ex.Message); } } public async Task CmdExecute(List list) { bool cmdResult= false; try { using (var scope = _provider.GetRequiredService().CreateScope()) { var largeScreenEventRepository = scope.ServiceProvider.GetService(); var ItdevBoxRepository = scope.ServiceProvider.GetService(); foreach (var item in list) { if (item == null) continue; var items = await ItdevBoxRepository.GetByConditionAsync(C => C.C_ID == item.BoxId&&C.C_Status=="1"); var content = items.FirstOrDefault(); if (content == null) { log.Info($"数据错误-CmdExecut:盒子{item.BoxId},没有找到数据"); } //0 = 禁用; 1 = 启用; 2 = 发送成功; 3 = 发送失败; 4 = 执行成功; 5 = 执行失败 string satus = "2"; bool bol=false; switch (item.protocol) { case "MQTT": case "COME_TYPE_001": bol = await MQTTService(item.BoxId, item.pms, content.C_BoxNo); satus = bol ? "4" : "5"; break; case "HTTP": case "COME_TYPE_002": bol= await HTTPService(item.BoxId, item.pms, content.C_BoxNo); break; case "TCP": case "COME_TYPE_003": break; case "UDP": case "COME_TYPE_004": break; //case "LoRa": //case "COME_TYPE_005": break; default: break; } if (satus == "4"|| satus == "5") { if (!string.IsNullOrEmpty(item.EveId)) { cmdResult = await largeScreenEventRepository.UpdateLargeScreenEventStatusBYId(item.EveId, satus); } } else { cmdResult = true; } } } } catch (Exception ex) { log.Info("【异常-CmdExecute】" + ex.Message); } return cmdResult; } #region MQTT public async Task MQTTService(string boxCode,string param, string boxNo) { bool bol = true; try { var factory = new MqttFactory(); var mqttClient = factory.CreateMqttClient(); try { var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer(_cmdModel.Mqtt.Ip, _cmdModel.Mqtt.Port) // 要访问的mqtt服务端的 ip 和 端口号 .WithCredentials(_cmdModel.Mqtt.UserName, _cmdModel.Mqtt.Password) // 要访问的mqtt服务端的用户名和密码 .WithClientId(Guid.NewGuid().ToString()) // 设置客户端id .WithCleanSession() .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = false // 是否使用 tls加密 }).Build(); await mqttClient.ConnectAsync(optionsBuilder); string topic = _cmdModel.Mqtt.Theme + "/" + boxNo;// 027655321321654653 // 发布消息 var message = new MqttApplicationMessageBuilder() .WithTopic(topic) // 主题 .WithPayload(param) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .Build(); var publishResult= await mqttClient.PublishAsync(message); log.Info($"【MQTT】【PublishAsync】{JsonConvert.SerializeObject(publishResult)}"); } catch (Exception e) { log.Info("【异常-MqttFactory】" + e.Message); bol = false; } // 断开连接 await mqttClient.DisconnectAsync(); } catch (Exception ex) { log.Info("【异常-MQTTService】" + ex.Message); bol= false; } return bol; } #endregion #region HTTP【90%】 public static TokenResult tokenResult; public static DateTime tokenTime; //webAPI public async Task HTTPService(string boxCode, string param,string boxNo) { bool result = false; try { var ReceiveData = JsonConvert.DeserializeObject(param); if (ReceiveData== null) { log.Info($"参数错误-HTTPService【boxCode:{boxCode}】【param:{param}】"); } if (tokenResult == null|| DateTime.Now >= tokenTime) { tokenResult = FanyiHelper.GetToken(new AccessUser { client_id = _cmdModel.Http.clientId, client_secret = _cmdModel.Http.clientSecret }); tokenTime = DateTime.Now.AddSeconds(7200); } using (var scope = _provider.GetRequiredService().CreateScope()) { var TdevBoxDevSpotRepository = scope.ServiceProvider.GetService(); var predicate = PredicateBuilder.New(true);//查询条件,推荐后台使用这种方式灵活筛选 predicate = predicate.And(i => i.C_Status.Equals("1")); predicate = predicate.And(i => i.C_BoxCode.Equals(boxCode)); var items = await TdevBoxDevSpotRepository.GetByConditionAsync(predicate); if (!items.Any()) { log.Info($"没有数据-HTTPService【boxCode:{boxCode}】里面没有设备点数据"); } foreach (var item in items) { if (item!=null) { using (var httpClient = new HttpClient()) { Dictionary dict = new Dictionary(); httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", tokenResult.access_token); dict["id"] = item.C_ID;//监控点uid,若使用id属性,则无需使用name dict["name"] = item.C_ChineseName;//监控点名称,使用name则无需使用id属性 dict["groupname"] = item.C_GroupName;//监控分组名称,与name联合使用,用于区分不同分组下监控点名称重名 //type-{0:value值为监控点中指定的类型},{1:value值整数为10进制值,浮点数和其它直接上传} dict["type"] = ReceiveData.type; dict["value"] = ReceiveData.value;// 值 using (var content = new FormUrlEncodedContent(dict)) { var msg = httpClient.PostAsync($"{_cmdModel.Http.Api}?boxNo={boxNo}", content); if (msg.IsFaulted) { log.Info($"接口数据返回-HTTPService:IsFaulted=true【boxCode:{boxCode}】"); } else { var postResult = msg.Result.Content.ReadAsStringAsync().Result; log.Info($"接口数据返回-HTTPService【{JsonConvert.SerializeObject(postResult)}】"); } } } } } } } catch (Exception ex) { log.Info("【异常-HTTPService】" + ex.Message); result = false; } return result; } //ip+端口 public async Task HTTPService1(string boxCode, string param) { bool result = false; try { #region ip+端口 var gatewayUri = $"http://60.204.212.71:1883/"; // 确保URI是正确的 using (var httpClient = new HttpClient()) { // 设置请求的URI httpClient.BaseAddress = new Uri(gatewayUri); // 准备要发送的数据 var data = new StringContent(param, Encoding.UTF8, "application/json"); //// 发送GET请求 //var getResponse = await httpClient.GetAsync("http://example.com/api/data"); // 发送POST请求 var response = await httpClient.PostAsync($"{_cmdModel.Http.Api}?boxNo={boxCode}", data); // 确保响应成功 response.EnsureSuccessStatusCode(); log.Info($"数据发送成功-HTTPService【{JsonConvert.SerializeObject(response)}】"); if (response.IsSuccessStatusCode) { // 读取响应内容(如果需要) string responseBody = await response.Content.ReadAsStringAsync(); Console.WriteLine(responseBody); } } #endregion } catch (Exception ex) { log.Info("【异常-HTTPService】" + ex.Message); result = false; } return result; } #endregion #region TCP【50%】 public bool TCPService(string boxCode, string param) { bool result = false; try { // 网关地址和端口 string host = "60.204.212.71"; int port = 1883; // 创建TcpClient实例 using (TcpClient client = new TcpClient()) { // 连接到网关 client.Connect(host, port); // 获取网络流 using (NetworkStream stream = client.GetStream()) { // 接收数据 byte[] bytes = new byte[1024]; int bytesRead = stream.Read(bytes, 0, bytes.Length); string data = Encoding.UTF8.GetString(bytes, 0, bytesRead); Console.WriteLine("Received: " + data); log.Info($"数据接收-TCPService-Read【{data}】"); if (!string.IsNullOrEmpty(data)) { // 发送数据 byte[] responseBytes = Encoding.UTF8.GetBytes(param); stream.Write(responseBytes, 0, responseBytes.Length); } // 关闭连接 client.Close(); } } } catch (Exception ex) { log.Info("【异常-TCPService】" + ex.Message); result = false; } return result; } public bool TCPService1(string boxCode, string param) { bool result = false; try { // 网关地址和端口 string host = "60.204.212.71"; int port = 1883; // 创建TcpClient实例 using (TcpClient client = new TcpClient()) { // 连接到网关 client.Connect(host, port); // 获取网络流 using (NetworkStream stream = client.GetStream()) { //// 如果需要从服务器获取数据 //byte[] bytes = new byte[1024]; //int bytesRead0 = stream.Read(bytes, 0, bytes.Length); //string data0 = Encoding.ASCII.GetString(bytes, 0, bytesRead0); // 要发送的数据 //string data = "Hello, TCP Server!"; byte[] buffer = Encoding.UTF8.GetBytes(param); // 发送数据 stream.Write(buffer, 0, buffer.Length); // 接收响应(如果有的话) buffer = new byte[client.ReceiveBufferSize]; int bytesRead = stream.Read(buffer, 0, client.ReceiveBufferSize); if (bytesRead > 0) { string response = Encoding.UTF8.GetString(buffer, 0, bytesRead); log.Info($"数据发送成功-TCPService【{JsonConvert.SerializeObject(response)}】"); } } } } catch (Exception ex) { log.Info("【异常-TCPService】" + ex.Message); result = false; } return result; } #endregion #region UDP【50%】 public bool UDPService(string boxCode, string param) { bool result = false; try { // 网关IP地址和端口 string gatewayIp = "60.204.212.71"; int gatewayPort = 1883; // UDP数据 byte[] data = Encoding.UTF8.GetBytes(param); // 创建UdpClient实例 using (UdpClient client = new UdpClient()) { // 目标IP地址和端口 IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse(gatewayIp), gatewayPort); try { byte[] data1 = client.Receive(ref endPoint); string message = Encoding.ASCII.GetString(data1); // 发送数据 int sendResult= client.Send(data, data.Length, endPoint); log.Info($"数据发送成功-UDPService【{sendResult}】"); } catch (Exception ex) { log.Info("【数据发送失败-异常-UDPService】" + ex.Message); result = false; } } } catch (Exception ex) { log.Info("【异常-UDPService】" + ex.Message); result = false; } return result; } public bool UDPService2(string boxCode, string param) { try { string gatewayIp = "60.204.212.71"; // 固定的IP地址和端口 IPAddress ip = IPAddress.Parse("127.0.0.1");//"127.0.0.1" int port = 1883; // 目标IP地址和端口 IPEndPoint endPoint = new IPEndPoint(ip, port); Socket socket; socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); socket.Bind(endPoint); EndPoint endPoint1 = new IPEndPoint(IPAddress.Any, 0); // 接收数据 byte[] bytes = new byte[1024]; int bytesRead = socket.ReceiveFrom(bytes, ref endPoint1); string data = Encoding.UTF8.GetString(bytes, 0, bytesRead); // 创建UdpClient实例并绑定到本地的任意可用端口 using (UdpClient udpClient = new UdpClient(port)) { // 绑定到指定的远程端口、【 udpClient.Client.SetSocketOption(SocketOptionLevel.Udp, SocketOptionName.ReuseAddress, true); udpClient.Client.Bind(new IPEndPoint(ip, port)); Console.WriteLine($"Listening for UDP data on {udpClient.Client.LocalEndPoint}"); // 无限循环来接收数据 while (true) { try { // 等待接收数据 IPEndPoint remoteEndPoint = null; byte[] buffer = udpClient.Receive(ref remoteEndPoint); // 如果不是预期的IP地址,则忽略 if (remoteEndPoint.Address != ip) continue; // 将字节转换为字符串 string message = Encoding.UTF8.GetString(buffer); Console.WriteLine($"Received: {message} from {remoteEndPoint}"); } catch (Exception e) { Console.WriteLine(e.ToString()); } } } } catch (Exception ex) { throw; } } #endregion #region LoRa【50%】 public bool LoRaService(string boxCode, string param) { bool result = false; try { string _gatewayAddress= "60.204.212.71"; int _gatewayPort= 1883; using (TcpClient client = new TcpClient(_gatewayAddress, _gatewayPort)) using (NetworkStream stream = client.GetStream()) using (StreamWriter writer = new StreamWriter(stream)) { // 这里的param应该是符合LoRa协议的数据包格式 writer.WriteLine(param); writer.Flush(); // 可以添加代码来读取网关的响应 } } catch (Exception ex) { log.Info("【异常-LoRaService】" + ex.Message); result = false; } return result; } #endregion public Task StopAsync(CancellationToken cancellationToken) { Dispose(); return Task.CompletedTask; } public void Dispose() { if (channel!=null) { channel.Close(); } if (con!=null) { con.Close(); } } } }