MqttWorkService.cs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. using Autofac.Core;
  2. using LinqKit;
  3. using Microsoft.Extensions.Hosting;
  4. using Ropin.Inspection.Model.Entities;
  5. using Microsoft.Extensions.Logging;
  6. using Ropin.Inspection.Repository;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Microsoft.EntityFrameworkCore.Internal;
  12. using System.Linq;
  13. using System.Data;
  14. using Microsoft.Extensions.DependencyInjection;
  15. using Quartz.Impl.AdoJobStore.Common;
  16. using Ropin.Core.Extensions.Redis;
  17. using AutoMapper.Configuration;
  18. using Newtonsoft.Json;
  19. using System.Security.Cryptography;
  20. using log4net;
  21. using Ropin.Inspection.Model;
  22. namespace Ropin.IOT.MqttService
  23. {
  24. public class MqttWorkService : IHostedService, IDisposable
  25. {
  26. private readonly ILogger _logger;
  27. private readonly IServiceProvider _provider;
  28. private readonly IRedisBasketRepository _redisService;
  29. private readonly Func<InspectionDbContext> _dbFuncContextFactory;
  30. public readonly ITdevBoxDevSpotRepository _tdevBoxDevSpotRepository;
  31. public readonly MqttOptions _mqttOptions;
  32. private static readonly ILog log = LogManager.GetLogger(typeof(MqttWorkService));
  33. public MqttWorkService(ILogger<MqttWorkService> logger,
  34. IServiceProvider provider,
  35. MqttOptions mqttOptions,
  36. IRedisBasketRepository redisService,
  37. Func<InspectionDbContext> dbContextFactory
  38. //, ITdevBoxDevSpotRepository tdevBoxDevSpotRepository
  39. )
  40. {
  41. _logger = logger;
  42. _provider = provider;
  43. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
  44. {
  45. _tdevBoxDevSpotRepository = scope.ServiceProvider.GetService<ITdevBoxDevSpotRepository>();
  46. };
  47. _mqttOptions = mqttOptions;
  48. _redisService = redisService;
  49. _dbFuncContextFactory = dbContextFactory;
  50. //_tdevBoxDevSpotRepository = tdevBoxDevSpotRepository;
  51. _redisService.Set("mqtt_BoxDevSpot_IsUpdate", "1", TimeSpan.FromDays(7));
  52. }
  53. public void Dispose()
  54. {
  55. //throw new NotImplementedException();
  56. }
  57. public Task StartAsync(CancellationToken cancellationToken)
  58. {
  59. Task.Run(async () =>
  60. {
  61. while (true)
  62. {
  63. try
  64. {
  65. var bol = await _redisService.Exist("mqtt_BoxDevSpot_IsUpdate");
  66. log.Info($"[开始] MqttWorkService.【mqtt_BoxDevSpot_IsUpdate是否存在:{bol}】");
  67. if (bol)
  68. {
  69. string val = await _redisService.GetValue("mqtt_BoxDevSpot_IsUpdate");
  70. log.Info($"[开始] MqttWorkService.【mqtt_BoxDevSpot_IsUpdate是否修改值:{bol}】");
  71. if (val=="1")
  72. {
  73. IList<TDEV_BoxDevSpot> boxDevSpot = null;
  74. List <MqttDevicePoint> devicePointList = null;
  75. using (var dbContext = _dbFuncContextFactory())
  76. {
  77. //boxDevSpot = dbContext.TDEV_BoxDevSpot.AsQueryable().ToList();
  78. //if (boxDevSpot != null && boxDevSpot.Count > 0)
  79. //{
  80. // boxDevSpot = boxDevSpot.Where(t => t.C_Status == "1").ToList();
  81. //}
  82. var query = from a in dbContext.TDEV_Box
  83. join b in dbContext.TDEV_BoxDevSpot
  84. on a.C_ID equals b.C_BoxCode
  85. where a.C_Status == "1" && b.C_Status == "1"
  86. //group b by b.C_ID into g
  87. select new MqttDevicePoint
  88. {
  89. Id = b.C_ID,
  90. ChineseName = b.C_ChineseName,
  91. EnglishName = b.C_EnglishName,
  92. BoxId = a.C_BoxNo,
  93. GroupName = b.C_GroupName,
  94. };
  95. devicePointList = query.ToList();
  96. }
  97. log.Info($"[开始] MqttWorkService.TDEV_BoxDevSpot数据【{JsonConvert.SerializeObject(boxDevSpot)}】");
  98. if (devicePointList != null&& devicePointList.Count>0)
  99. {
  100. MqttClientService.AddMqttClient1(_mqttOptions, devicePointList);
  101. await _redisService.Set("mqtt_BoxDevSpot_IsUpdate", "0", TimeSpan.FromDays(7));
  102. log.Info($"[修改] mqtt_BoxDevSpot_IsUpdate=0");
  103. }
  104. }
  105. }
  106. await Task.Delay(6000);
  107. }
  108. catch (Exception ex)
  109. {
  110. log.Error("[MqttWorkService]Task1:" + ex.Message);
  111. await Task.Delay(5000);//5秒
  112. }
  113. }
  114. });
  115. return Task.CompletedTask;
  116. }
  117. public Task StopAsync(CancellationToken cancellationToken)
  118. {
  119. Dispose();
  120. _logger.LogInformation("内部任务计划结束");
  121. return Task.CompletedTask;
  122. }
  123. }
  124. }