using System.Linq; using System.Data; using Microsoft.EntityFrameworkCore; using Newtonsoft.Json; using Ropin.Inspection.Model.Entities; using Ropin.Inspection.Model.ViewModel.DEV; using Ropin.Inspection.Model; using Ropin.Inspection.Repository.DEV.Interface; using Ropin.Inspection.Repository; using Ropin.Inspection.Common.Helper; using InitQ.Cache; using InfluxData.Net.InfluxDb; using InfluxData.Net.Common.Enums; using RabbitMQ.Client; using Microsoft.Extensions.Hosting; using System; using Microsoft.Extensions.Logging; using System.Threading.Tasks; using System.Threading; using Microsoft.Extensions.DependencyInjection; using System.Collections.Generic; using log4net; using RabbitMQ.Client.Events; using System.Text; using System.Threading.Channels; using FBoxClientDriver.Contract.Entity; namespace Ropin.Environmentally.LedgeService1 { public class RabbitMQService : IHostedService, IDisposable { private readonly IServiceProvider _provider; private InfluxDbClient clientDb; private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQService)); private readonly RabbitMQModel _rabbitMQModel; private readonly IniInfluxData _IniInfluxData; public RabbitMQService(IServiceProvider provider, RabbitMQModel rabbitMQModel, IniInfluxData IniInfluxData) { this._provider = provider; _rabbitMQModel = rabbitMQModel; _IniInfluxData = IniInfluxData; IniInflux(); } private IConnection con; private IModel channel; public Task StartAsync(CancellationToken cancellationToken) { Task.Run(async () => { await AddRabbitMQ(); //while (true) //{ // //await DevStatusChange("0", "8453d5ed-8a21-4880-88e7-f872e93551bf", "295458455312930168", DateTime.Now); // await Task.Delay(5000);//10000:10s //} }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { Dispose(); return Task.CompletedTask; } public void Dispose() { channel.Close(); con.Close(); } public async Task AddRabbitMQ() { try { var factory = new ConnectionFactory() { HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址 Port = _rabbitMQModel.Port,//5672,//端口号 UserName = _rabbitMQModel.UserName,//"guest",//用户账号 VirtualHost = _rabbitMQModel.VirtualHost,//"/", Password = _rabbitMQModel.Password,// "guest"//用户密码 }; if (con == null || con.IsOpen == false) { con = factory.CreateConnection();//创建连接对象 } if (channel == null || channel.IsOpen == false) { channel = con.CreateModel();//创建连接会话对象 } channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例 //声明队列 var queueName = channel.QueueDeclare( queue: _rabbitMQModel.QueueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ).QueueName; channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey); //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 channel.BasicQos(0, 1, false); channel.ConfirmSelect(); // 开启消息确认模式 //创建消费者对象 var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); log.Info("【RabbitMQ】" + message); var ReceiveData = JsonConvert.DeserializeObject(message); bool result =await DevStatusChange("0", ReceiveData.devStoreCode, ReceiveData.devRunSpot, ReceiveData.time); if (result) { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } else { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 } } catch (Exception ex) { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message); } }; //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } catch (Exception ex) { log.Info("【异常-RabbitMQ】" + ex.Message); } } #region /// /// 运行台账-【动态】 /// /// /// /// /// private async Task DevStatusChange(string devStatus, string devStoreCode, string devRunSpot, DateTime? time) { bool result = false; try { using (var scope = _provider.GetRequiredService().CreateScope()) { var _redisService = scope.ServiceProvider.GetService(); DateTime startTime = await _redisService.GetAsync("fanyibox_devStartRun_" + devRunSpot); DateTime endTime = await _redisService.GetAsync("fanyibox_devEndRun_" + devRunSpot); var _tmtnDevOpsRecordRepository = scope.ServiceProvider.GetService(); var _devDevOpeAccountConfigService = scope.ServiceProvider.GetService(); var _mtnPushMsgResultRepository = scope.ServiceProvider.GetService(); var solidWasteRecordItems = await _tmtnDevOpsRecordRepository.GetRecordsConditionAsync(new TmtnDevOpsRecordDetailSearchModel { bSolidWaste = true, C_DevStoreCode = devStoreCode, IsPagination = false, Start = startTime, End = endTime }); var devAccountConfig = await _devDevOpeAccountConfigService.GetByConditionAsync(t => t.C_DevStoreCode == devStoreCode); using (var dbContext = scope.ServiceProvider.GetService())// { var devStore = dbContext.TDEV_DevStore.Where(x => x.C_ID == devStoreCode).FirstOrDefault(); //运行台账 if (devStatus == "0") { bool isAlarm = false; var devOpeAccountS = dbContext.GetDbSet(); var devAccountConfigModel = devAccountConfig.OrderByDescending(t => t.D_CreateOn).FirstOrDefault(); if (devAccountConfigModel != null) { if (!string.IsNullOrEmpty(devAccountConfigModel.C_Config)) { TdevDevOpeAccountConfigViewModel configList = JsonConvert.DeserializeObject(devAccountConfigModel.C_Config); if (configList != null) { IEnumerable items = _redisService.Get>("fanyibox_devStore_" + devStoreCode + "_spot"); if (startTime.IsNotEmptyOrNull() && endTime.IsNotEmptyOrNull() && startTime != DateTime.MinValue && endTime != DateTime.MinValue) { isAlarm = await GetDevAlarmData(devStoreCode, startTime, endTime) >= 1; } List DevOpsRecordSolidWaste = new List(); if (solidWasteRecordItems.Any() && solidWasteRecordItems.FirstOrDefault() != null) foreach (var item in solidWasteRecordItems.ToList()) { if (item.C_Status == "4") { if (!string.IsNullOrWhiteSpace(item.C_SolidWaste)) DevOpsRecordSolidWaste.Add(JsonConvert.DeserializeObject(item.C_SolidWaste)); } } Dictionary datas = new Dictionary(); #region datas.Add("Date", time); foreach (Inspection.Model.ViewModel.DEV.RunSpotConfig item in configList.RunSpotConfigList) { if (item.Name == "DevStoreName") { datas.Add(item.Name, devStore.C_Name); } else if (item.Name == "RunStartTime") { datas.Add(item.Name, startTime); } else if (item.Name == "RunEndTime") { datas.Add(item.Name, DateTime.Now); } else if (item.Name == "RunWhetherNormal") { datas.Add(item.Name, isAlarm); } else if (item.Name == "DischargeTime") { datas.Add(item.Name, (DateTime.Now - startTime).TotalHours.ToString("F2")); } else if (item.Name == "ConsumableName" || item.Name == "WasteName") { datas.Add(item.Name, DevOpsRecordSolidWaste.Any() ? string.Join(",", DevOpsRecordSolidWaste.Select(x => x.NameSpecification).ToList().ToArray()) : "/"); } else if (item.Name == "ConsumableReplacementQuantity") { int number = DevOpsRecordSolidWaste.Any() ? DevOpsRecordSolidWaste.Sum(x => (string.IsNullOrEmpty(x.SolidWasteNumber) ? 0 : Convert.ToInt32(x.SolidWasteNumber))) : 0; datas.Add(item.Name, number); } else if (item.Name == "WasteProduction") { int number = DevOpsRecordSolidWaste.Any() ? DevOpsRecordSolidWaste.Sum(x => (string.IsNullOrEmpty(x.DropNumber) ? 0 : Convert.ToInt32(x.DropNumber))) : 0; datas.Add(item.Name, number); } else { if (item.BReadDevSpot) { string vals = "0"; string valsMax = "0"; string valsMin = "0"; TDEV_WebScadaDevSpot webScadaDevSpot = items?.Where(x => x.C_Name == item.DevSpotName).FirstOrDefault(); if (webScadaDevSpot == null) { datas.Add(item.Name, vals); datas.Add(item.Name + "_Max", valsMax); datas.Add(item.Name + "_Min", valsMin); continue; } string SpotId = webScadaDevSpot.C_DevSpotCode.ToString(); if (!string.IsNullOrEmpty(SpotId) && startTime.IsNotEmptyOrNull() && endTime.IsNotEmptyOrNull() && startTime != DateTime.MinValue && endTime != DateTime.MinValue) { endTime = DateTime.Now; vals = await GetMeanData(SpotId, startTime, endTime); valsMax = await GetDataMax(SpotId, startTime, endTime); valsMin = await GetDataMin(SpotId, startTime, endTime); if (vals == null) { SpotId = webScadaDevSpot.C_ID.ToString(); vals = await GetMeanData(SpotId, startTime, endTime); valsMax = await GetDataMax(SpotId, startTime, endTime); valsMin = await GetDataMin(SpotId, startTime, endTime); } } vals = string.IsNullOrEmpty(vals) ? "0" : Convert.ToDouble(vals).ToString("0.0"); valsMax = string.IsNullOrEmpty(valsMax) ? "0" : Convert.ToDouble(valsMax).ToString("0.0"); valsMin = string.IsNullOrEmpty(valsMin) ? "0" : Convert.ToDouble(valsMin).ToString("0.0"); datas.Add(item.Name, vals); datas.Add(item.Name + "_Max", valsMax); datas.Add(item.Name + "_Min", valsMin); } else { var itemData = configList.RunSpotConfigList?.Where(x => x.Name == item.Name).FirstOrDefault(); datas.Add(item.Name, itemData?.Value); } } } #endregion await devOpeAccountS.AddRangeAsync(new TDEV_DevOpeAccount { C_ID = Guid.NewGuid().ToString(), C_DevStoreCode = devStoreCode, C_Content = JsonConvert.SerializeObject(datas), C_Remark = "", C_DevOpeAccountConfigCode = devAccountConfigModel.C_ID, C_CreateBy = Guid.Parse("6e864cbc-5252-11ec-8681-fa163e02b3e4"), D_CreateOn = DateTime.Now }); var qty = await dbContext.SaveChangesAsync(); result = qty > 0 ? true : false; if (!result) { log.Info($"台账保存失败:【{JsonConvert.SerializeObject(datas)}】"); } } } } } var devStoreLog = dbContext.GetDbSet(); await devStoreLog.AddAsync(new TDEV_DevStoreLog { C_ID = Guid.NewGuid().ToString(), C_DeviceCode = devStoreCode, C_Type = devStatus == "1" ? "2" : "3", C_LogMsg = devStatus == "1" ? "开启" : "关闭", C_CreateBy = Guid.Parse("6e864cbc-5252-11ec-8681-fa163e02b3e4"), D_CreateOn = DateTime.Now }); int sumbitLog= await dbContext.SaveChangesAsync(); if (sumbitLog<=0) { log.Info($"[TDEV_DevStoreLog]保存失败:【devStoreCode={devStoreCode}】"); } #region 业主设备运行记录 try { DevStoreRunRecord runRecord = new DevStoreRunRecord(); if (!string.IsNullOrEmpty(devStore.C_RunRecord)) { runRecord = JsonConvert.DeserializeObject(devStore.C_RunRecord); TmtnPushMsgResultSearchModel ResultSearchMode = new TmtnPushMsgResultSearchModel(); ResultSearchMode.C_DevStoreCode = devStoreCode; ResultSearchMode.D_Start = Convert.ToDateTime(runRecord.LastOffDate); DevAlarmCount alarmCount = await _mtnPushMsgResultRepository.GetPushMsgResultContentAsync(ResultSearchMode); if (alarmCount.TotalTime!="0.0.0") { runRecord.LastOffDate = alarmCount.LastOffDate; runRecord.RunDuration = alarmCount.nowTime; var arrayRun = runRecord.TotalRunDuration.Split('.'); var runData = alarmCount.TotalTime.Split("."); int m = Convert.ToInt32(arrayRun[2]) + Convert.ToInt32(runData[2]); int h = 0; if (m >= 60) { m = m - 60; h++; } h = h + Convert.ToInt32(arrayRun[1]) + Convert.ToInt32(runData[1]); int d = 0; if (h >= 24) { h = h - 24; d++; } d = d + Convert.ToInt32(arrayRun[0]) + Convert.ToInt32(runData[0]); runRecord.TotalRunDuration = d + "." + h + "." + m; if (!string.IsNullOrEmpty(runRecord.FiratOnDate) && !string.IsNullOrEmpty(runRecord.LastOffDate)) { DateTime time1 = Convert.ToDateTime(runRecord.FiratOnDate); DateTime time2 = Convert.ToDateTime(runRecord.LastOffDate); TimeSpan diff = time2.Subtract(time1); int day = d - diff.Days; if (diff.Hours > h) { d--; h = h + 24; } int hours = h - diff.Hours; if (diff.Minutes > m) { h--; m = m + 60; } int minutes = m - diff.Minutes; runRecord.TotalSpotDuration = day + "." + hours + "." + minutes; } devStore.C_RunRecord = JsonConvert.SerializeObject(runRecord); dbContext.TDEV_DevStore.Update(devStore); int saveresult = await dbContext.SaveChangesAsync(); } } else { TmtnPushMsgResultSearchModel ResultSearchMode = new TmtnPushMsgResultSearchModel(); ResultSearchMode.C_DevStoreCode = devStoreCode; DevAlarmCount alarmCount = await _mtnPushMsgResultRepository.GetPushMsgResultContentAsync(ResultSearchMode); runRecord.LastOffDate = alarmCount.LastOffDate; runRecord.FiratOnDate = alarmCount.FiratOnDate; runRecord.TotalRunDuration = alarmCount.TotalTime; runRecord.TotalSpotDuration = alarmCount.TotalSpotTime; runRecord.RunDuration = alarmCount.nowTime; int RepairOrder = dbContext.TMTN_RepairOrder.Where(x => x.C_DevStoreCode == devStoreCode && x.C_Status == "7").Count(); TmtnDevOpsDetailSearchModel tmtnDevOps = new TmtnDevOpsDetailSearchModel(); tmtnDevOps.C_Status = "7"; tmtnDevOps.C_DevStoreCode = devStoreCode; var DevOps = _tmtnDevOpsRecordRepository.GetDevOpsList(tmtnDevOps); runRecord.MTNRepairOrder = RepairOrder; runRecord.MTNDevOps = DevOps.Result.Count(); devStore.C_RunRecord = JsonConvert.SerializeObject(runRecord); dbContext.TDEV_DevStore.Update(devStore); int saveresult = await dbContext.SaveChangesAsync(); } } catch (Exception ex) { log.Info($"【业主设备运行记录】【异常信息:{ex.Message}】【devStoreCode={devStoreCode}】"); } #endregion } } } catch (Exception ex) { log.Info($"【台账异常】【异常信息:{ex.Message}】【devStoreCode={devStoreCode};devRunSpot={devRunSpot};time={time}】"); } return result; } private void IniInflux() { //连接InfluxDb的API地址、账号、密码 var infuxUrl = _IniInfluxData.infuxUrl;// "http://60.204.212.71:8085/"; var infuxUser = _IniInfluxData.infuxUser;// "admin"; var infuxPwd = _IniInfluxData.infuxPwd;//"123456"; //创建InfluxDbClient实例 clientDb = new InfluxDbClient(infuxUrl, infuxUser, infuxPwd, InfluxDbVersion.Latest); } public async Task GetMeanData(string id, DateTime start, DateTime end) { //传入查询命令,支持多条 var queries = new[] { //"SELECT mean(Val) FROM fanyidev where (Id ='"+id+"') and time > '"+ start.ToUniversalTime().ToString("yyyy-MM-dd hh:mm:ss") +"' and time < '" + end.ToUniversalTime().ToString("yyyy-MM-dd hh:mm:ss") + "'" "SELECT mean(Val) FROM fanyidev where (Id ='"+id+"') and time > '"+ start.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") +"' and time < '" + end.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") + "' TZ('Asia/Shanghai')" //"SELECT mean(Val) FROM fanyidev where (Id ='224873679711261516') and time > now() - 5m and time < now()" }; var dbName = "fanyidb"; //从指定库中查询数据 var response = await clientDb.Client.QueryAsync(queries, dbName); if (!response.Any()) return "0"; //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); //取出第一条命令的查询结果,是一个集合 var list = series[0].Values; //从集合中取出第一条数据 var info_model = list.FirstOrDefault(); Console.WriteLine($"GetMeanData from DevEvent: ${info_model[1]}"); using (var scope = _provider.GetRequiredService().CreateScope()) { var _redisService = scope.ServiceProvider.GetService(); await _redisService.SetAsync("fanyibox_devspot_mean_" + id, info_model[1]); } return info_model[1]?.ToString(); } public async Task GetDataMax(string id, DateTime start, DateTime end) { //传入查询命令,支持多条 var queries = new[] { "SELECT Max(Val) FROM fanyidev where (Id ='"+id+"') and time > '"+ start.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") +"' and time < '" + end.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") + "' TZ('Asia/Shanghai')" }; var dbName = "fanyidb"; //从指定库中查询数据 var response = await clientDb.Client.QueryAsync(queries, dbName); if (!response.Any()) return "0"; //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); //取出第一条命令的查询结果,是一个集合 var list = series[0].Values; //从集合中取出第一条数据 var info_model = list.FirstOrDefault(); Console.WriteLine($"GetMeanData from DevEvent: ${info_model[1]}"); using (var scope = _provider.GetRequiredService().CreateScope()) { var _redisService = scope.ServiceProvider.GetService(); await _redisService.SetAsync("fanyibox_devspot_max_" + id, info_model[1]); } return info_model[1]?.ToString(); } public async Task GetDataMin(string id, DateTime start, DateTime end) { //传入查询命令,支持多条 var queries = new[] { "SELECT MIN(Val) FROM fanyidev where (Id ='"+id+"') and time > '"+ start.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") +"' and time < '" + end.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") + "' TZ('Asia/Shanghai')" }; var dbName = "fanyidb"; //从指定库中查询数据 var response = await clientDb.Client.QueryAsync(queries, dbName); if (!response.Any()) return "0"; //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); //取出第一条命令的查询结果,是一个集合 var list = series[0].Values; //从集合中取出第一条数据 var info_model = list.FirstOrDefault(); Console.WriteLine($"GetMeanData from DevEvent: ${info_model[1]}"); using (var scope = _provider.GetRequiredService().CreateScope()) { var _redisService = scope.ServiceProvider.GetService(); await _redisService.SetAsync("fanyibox_devspot_min_" + id, info_model[1]); } return info_model[1]?.ToString(); } public async Task GetDevAlarmData(string id, DateTime start, DateTime end) { //传入查询命令,支持多条 var queries = new[] { //"SELECT Max(Val) FROM fanyidevalarm where (Id ='"+id+"') and time > '"+ start.ToUniversalTime().ToString("yyyy-MM-dd hh:mm:ss") +"' and time < '" + end.ToUniversalTime().ToString("yyyy-MM-dd hh:mm:ss") + "'" "SELECT Max(Val) FROM fanyidevalarm where (Id ='"+id+"') and time > '"+ start.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") +"' and time < '" + end.AddHours(-8).ToString("yyyy-MM-ddTHH:mm:ssZ") + "' TZ('Asia/Shanghai')" //"SELECT mean(Val) FROM fanyidev where (Id ='224873679711261516') and time > now() - 5m and time < now()" }; var dbName = "fanyidb"; //从指定库中查询数据 var response = await clientDb.Client.QueryAsync(queries, dbName); if (!response.Any()) return 0; //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); //取出第一条命令的查询结果,是一个集合 var list = series[0].Values; //从集合中取出第一条数据 var info_model = list.FirstOrDefault(); if (!info_model.Any()) return 0; Console.WriteLine($"GetMeanData from DevEvent: ${info_model[1]}"); return await Task.FromResult(Convert.ToInt32(info_model[1])); } #endregion } }