RabbitMQVideoService.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. using log4net;
  2. using Microsoft.Extensions.DependencyInjection;
  3. using Microsoft.Extensions.Hosting;
  4. using Newtonsoft.Json;
  5. using RabbitMQ.Client;
  6. using RabbitMQ.Client.Events;
  7. using Ropin.Inspection.Common.Helper;
  8. using Ropin.Inspection.Model.Entities;
  9. using Ropin.Inspection.Repository;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Text;
  13. using System.Threading;
  14. using System.Threading.Tasks;
  15. using System.Linq;
  16. using Ropin.Inspection.Repository.VMC.Interface;
  17. using Ropin.Inspection.Model.ViewModel.VMC;
  18. using Ropin.Inspection.Model.SearchModel.VMC;
  19. using Ropin.Inspection.Repository.MTN.Interface;
  20. using System.IO;
  21. using Renci.SshNet.Messages;
  22. using Microsoft.Extensions.Configuration;
  23. using System.Net.NetworkInformation;
  24. using Ropin.Inspection.Repository.SYS.Interface;
  25. namespace Ropin.Environmentally.VideoService.service
  26. {
  27. public class RabbitMQVideoService : IHostedService, IDisposable
  28. {
  29. private readonly IServiceProvider _provider;
  30. private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQVideoService));
  31. private readonly RabbitMQModel _rabbitMQModel;
  32. private readonly IConfiguration _configuration;
  33. public RabbitMQVideoService(IServiceProvider provider, RabbitMQModel rabbitMQModel, IConfiguration configuration)
  34. {
  35. this._provider = provider;
  36. _rabbitMQModel = rabbitMQModel;
  37. _configuration = configuration;
  38. }
  39. private IConnection con;
  40. private IModel channel;
  41. public Task StartAsync(CancellationToken cancellationToken)
  42. {
  43. Task.Run(async () =>
  44. {
  45. //string dewUrl = "http://124.71.132.255:10000/sms/34020000002020000001/api/v1/downloads/record_0200000001_20241121091328_0.mp4";
  46. //string urls= DownLoad_Video(dewUrl);
  47. //log.Info("【DownLoad_Video】" + urls);
  48. //List<TMTN_PushMsgResult> list = new List<TMTN_PushMsgResult>();
  49. //list.Add(new TMTN_PushMsgResult
  50. //{
  51. // C_ID = Guid.NewGuid().ToString(),
  52. // C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf"
  53. //});
  54. //list.Add(new TMTN_PushMsgResult
  55. //{
  56. // C_ID = Guid.NewGuid().ToString(),
  57. // C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf"
  58. //});
  59. //list.Add(new TMTN_PushMsgResult
  60. //{
  61. // C_ID = Guid.NewGuid().ToString(),
  62. // C_DevStoreCode = "acb0c8c2-38d1-4143-a938-ce2df1741c35"
  63. //});
  64. //await VideoExecute(list);
  65. await AddRabbitMQ();
  66. //while (true)
  67. //{
  68. // await Task.Delay(5000);//10000:10s
  69. //}
  70. });
  71. return Task.CompletedTask;
  72. }
  73. public async Task AddRabbitMQ()
  74. {
  75. try
  76. {
  77. var factory = new ConnectionFactory()
  78. {
  79. HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址
  80. Port = _rabbitMQModel.Port,//5672,//端口号
  81. UserName = _rabbitMQModel.UserName,//"guest",//用户账号
  82. VirtualHost = _rabbitMQModel.VirtualHost,//"/",
  83. Password = _rabbitMQModel.Password,// "guest"//用户密码
  84. };
  85. if (con == null || con.IsOpen == false)
  86. {
  87. con = factory.CreateConnection();//创建连接对象
  88. }
  89. if (channel == null || channel.IsOpen == false)
  90. {
  91. channel = con.CreateModel();//创建连接会话对象
  92. }
  93. channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例
  94. //声明队列
  95. var queueName = channel.QueueDeclare(
  96. queue: _rabbitMQModel.QueueName, //消息队列名称
  97. durable: false, //是否缓存
  98. exclusive: false,
  99. autoDelete: false,
  100. arguments: null
  101. ).QueueName;
  102. channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey);
  103. //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
  104. channel.BasicQos(0, 1, false);
  105. channel.ConfirmSelect(); // 开启消息确认模式
  106. //创建消费者对象
  107. var consumer = new EventingBasicConsumer(channel);
  108. consumer.Received += async (model, ea) =>
  109. {
  110. try
  111. {
  112. var body = ea.Body.ToArray();
  113. var message = Encoding.UTF8.GetString(body);
  114. log.Info("【RabbitMQ】" + message);
  115. //var ReceiveData = JsonConvert.DeserializeObject<List<TMTN_PushMsgResult>>(message);
  116. //ReceiveData = ReceiveData?.OrderBy(t => t.C_DevStoreCode).ToList();
  117. //await VideoExecute1(ReceiveData);
  118. var ReceiveData = JsonConvert.DeserializeObject<TSYS_Message>(message);
  119. await VideoExecute(ReceiveData);
  120. // 确认消息已处理
  121. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  122. }
  123. catch (Exception ex)
  124. {
  125. channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现
  126. log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message);
  127. }
  128. };
  129. //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭;
  130. channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
  131. }
  132. catch (Exception ex)
  133. {
  134. log.Info("【异常-RabbitMQ】" + ex.Message);
  135. }
  136. }
  137. public async Task VideoExecute(TSYS_Message message)
  138. {
  139. List<Task> tasks0 = new List<Task>();
  140. if (message != null)
  141. {
  142. string devCode = message.C_DevStoreCode;
  143. tasks0.Add(Task.Run(async () =>
  144. {
  145. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
  146. {
  147. try
  148. {
  149. var vmcCameraRepository = scope.ServiceProvider.GetService<IVMCDevCameraRepository>();
  150. VmcDevSearch searchModel = new VmcDevSearch();
  151. searchModel.IsPagination = false;
  152. searchModel.C_Status = "1";
  153. searchModel.C_DevStoreCode = devCode;
  154. IEnumerable<VmcDevCameraViewModel> viewModelsList = await vmcCameraRepository.GetConditionAsync(searchModel);
  155. await SaveData(message, viewModelsList);
  156. Thread.Sleep(1000);
  157. }
  158. catch (Exception ex)
  159. {
  160. log.Info("【异常-VideoExecute】" + ex.Message);
  161. throw;
  162. }
  163. }
  164. }));
  165. }
  166. await Task.WhenAll(tasks0);
  167. }
  168. public async Task SaveData(TSYS_Message message, IEnumerable<VmcDevCameraViewModel> viewModelsList)
  169. {
  170. if (message != null&& viewModelsList!=null)
  171. {
  172. List<Task> tasks = new List<Task>();
  173. foreach (var vmc in viewModelsList)
  174. {
  175. if (vmc.C_Status == "1")
  176. {
  177. //线程
  178. tasks.Add(Task.Run(async () =>
  179. {
  180. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
  181. {
  182. LiveGBSHelper.loginUrl = vmc.C_CameraTypeValue;
  183. var messageFileRepository = scope.ServiceProvider.GetService<ITsysMessageFileRepository>();
  184. try
  185. {
  186. if (vmc != null && !string.IsNullOrEmpty(vmc.C_Serial) && !string.IsNullOrEmpty(vmc.Codes))
  187. {
  188. //TBDM_CodeDetail codeDetail = dbmList.Where(t => t.C_Name == vmc.C_TypeName).FirstOrDefault();
  189. int ss = vmc.F_ShootingTime;//60;
  190. //if (codeDetail != null)
  191. //{
  192. // ss = Convert.ToInt32(codeDetail.C_Value);
  193. //}
  194. VideoStartRecording videoUrlModel = await LiveGBSHelper.StartRecording(vmc.C_Serial, vmc.Codes);
  195. Thread.Sleep(ss * 1000);
  196. if (videoUrlModel != null && !string.IsNullOrEmpty(videoUrlModel.DownloadURL))
  197. {
  198. VideoRecording channelVideo = await LiveGBSHelper.StopRecording(vmc.C_Serial, vmc.Codes);
  199. if (channelVideo != null && channelVideo.RecordList != null && channelVideo.RecordList.Count > 0)
  200. {
  201. VideoRecordingMode videoRecording = channelVideo.RecordList.Where(v => v.DownloadURL == videoUrlModel.DownloadURL).FirstOrDefault();
  202. if (videoRecording != null && !string.IsNullOrEmpty(videoRecording.EndTime))
  203. {
  204. //string videoUrl = DownLoad_Video(videoUrlModel.DownloadURL);
  205. ///api/v1/cloudrecord/video/:operate/:serial/:code/:starttime/:endtime/video.mp4
  206. //string videoUrl = $"{LiveGBSHelper.loginUrl}api/v1/cloudrecord/video/play/{vmc.C_Serial}/{vmc.Codes}/{videoRecording.StartTime}/{videoRecording.EndTime}/video.mp4";
  207. string videoUrl = "/sms" + videoUrlModel.DownloadURL.Split("sms")[1];
  208. TSYS_MessageFile messageFile = new TSYS_MessageFile();
  209. messageFile.C_ID = Guid.NewGuid().ToString();
  210. messageFile.C_MessageCode = message.C_ID;
  211. messageFile.C_Url = videoUrl;
  212. messageFile.C_Type = "FILE_TYP_004";
  213. messageFile.C_CreateBy = Guid.NewGuid().ToString();
  214. messageFile.D_CreateOn = DateTime.Now;
  215. messageFile.C_Status = "1";
  216. messageFileRepository.Create(messageFile);
  217. var bolResult = await messageFileRepository.SaveAsync();
  218. }
  219. else
  220. {
  221. log.Info($"【停止实时录像错误,没有录像文件或者录像停止时间为空下载地址不能使用】");
  222. }
  223. }
  224. else
  225. {
  226. log.Info($"【停止实时录像错误,没有录像文件】");
  227. }
  228. }
  229. }
  230. }
  231. catch (Exception ex)
  232. {
  233. log.Info("【异常-SaveData】" + ex.Message);
  234. }
  235. }
  236. }));
  237. }
  238. }
  239. await Task.WhenAll(tasks);
  240. }
  241. }
  242. public async Task VideoExecute1(List<TMTN_PushMsgResult> list)
  243. {
  244. List<Task> tasks0 = new List<Task>();
  245. List<string> devCode = list.Select(t => t.C_DevStoreCode).Distinct().ToList();
  246. foreach (var item in devCode)
  247. {//线程
  248. tasks0.Add(Task.Run(async () =>
  249. {
  250. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
  251. {
  252. try
  253. {
  254. var vmcCameraRepository = scope.ServiceProvider.GetService<IVMCDevCameraRepository>();
  255. //var bdmCodeDetailRepository = scope.ServiceProvider.GetService<ITbdmCodeDetailRepository>();
  256. //var mtnAlarmShadowRecordRepository = scope.ServiceProvider.GetService<ImtnAlarmShadowRecordRepository>();
  257. //var dbmList = await bdmCodeDetailRepository.GetByConditionAsync(t => t.C_MainCode == "VIDEORECORD_TIME");
  258. VmcDevSearch searchModel = new VmcDevSearch();
  259. searchModel.IsPagination = false;
  260. searchModel.C_Status="1";
  261. searchModel.C_DevStoreCode = item;
  262. IEnumerable<VmcDevCameraViewModel> viewModelsList = await vmcCameraRepository.GetConditionAsync(searchModel);
  263. await SaveData1(list, viewModelsList, item);
  264. Thread.Sleep(1000);
  265. }
  266. catch (Exception ex)
  267. {
  268. log.Info("【异常-VideoExecute】" + ex.Message);
  269. throw;
  270. }
  271. }
  272. }));
  273. }
  274. await Task.WhenAll(tasks0);
  275. }
  276. public async Task SaveData1(List<TMTN_PushMsgResult> list,IEnumerable<VmcDevCameraViewModel> viewModelsList, string devCode)
  277. {
  278. if (viewModelsList!=null)
  279. {
  280. List<Task> tasks = new List<Task>();
  281. foreach (var vmc in viewModelsList)
  282. {
  283. if (vmc.C_Status == "1")
  284. {
  285. //线程
  286. tasks.Add(Task.Run(async () =>
  287. {
  288. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
  289. {
  290. LiveGBSHelper.loginUrl = vmc.C_CameraTypeValue;
  291. var mtnAlarmShadowRecordRepository = scope.ServiceProvider.GetService<ImtnAlarmShadowRecordRepository>();
  292. try
  293. {
  294. if (vmc != null && !string.IsNullOrEmpty(vmc.C_Serial) && !string.IsNullOrEmpty(vmc.Codes))
  295. {
  296. //TBDM_CodeDetail codeDetail = dbmList.Where(t => t.C_Name == vmc.C_TypeName).FirstOrDefault();
  297. int ss = vmc.F_ShootingTime;//60;
  298. //if (codeDetail != null)
  299. //{
  300. // ss = Convert.ToInt32(codeDetail.C_Value);
  301. //}
  302. VideoStartRecording videoUrlModel = await LiveGBSHelper.StartRecording(vmc.C_Serial, vmc.Codes);
  303. Thread.Sleep(ss * 1000);
  304. if (videoUrlModel != null && !string.IsNullOrEmpty(videoUrlModel.DownloadURL))
  305. {
  306. VideoRecording channelVideo = await LiveGBSHelper.StopRecording(vmc.C_Serial, vmc.Codes);
  307. if (channelVideo != null && channelVideo.RecordList != null && channelVideo.RecordList.Count > 0)
  308. {
  309. VideoRecordingMode videoRecording = channelVideo.RecordList.Where(v => v.DownloadURL == videoUrlModel.DownloadURL).FirstOrDefault();
  310. if (videoRecording != null && !string.IsNullOrEmpty(videoRecording.EndTime))
  311. {
  312. //string videoUrl = DownLoad_Video(videoUrlModel.DownloadURL);
  313. ///api/v1/cloudrecord/video/:operate/:serial/:code/:starttime/:endtime/video.mp4
  314. //string videoUrl = $"{LiveGBSHelper.loginUrl}api/v1/cloudrecord/video/play/{vmc.C_Serial}/{vmc.Codes}/{videoRecording.StartTime}/{videoRecording.EndTime}/video.mp4";
  315. string videoUrl = "/sms" + videoUrlModel.DownloadURL.Split("sms")[1];
  316. var mtnList = list.Where(m => m.C_DevStoreCode == devCode);
  317. foreach (var m in mtnList)
  318. {
  319. TMTN_AlarmShadowRecord alarmShadowRecord = new TMTN_AlarmShadowRecord();
  320. alarmShadowRecord.C_ID = Guid.NewGuid().ToString();
  321. alarmShadowRecord.C_PushMsgResultCode = m.C_ID;
  322. alarmShadowRecord.C_RecordUrl = videoUrl;
  323. alarmShadowRecord.C_Type = "1";
  324. alarmShadowRecord.C_CameraServiceType = vmc.C_CameraType;
  325. alarmShadowRecord.C_CameraCode = vmc.C_CameraCode;
  326. alarmShadowRecord.C_DevStoreCode = devCode;
  327. mtnAlarmShadowRecordRepository.Create(alarmShadowRecord);
  328. }
  329. var bolResult = await mtnAlarmShadowRecordRepository.SaveAsync();
  330. }
  331. else
  332. {
  333. log.Info($"【停止实时录像错误,没有录像文件或者录像停止时间为空下载地址不能使用】");
  334. }
  335. }
  336. else
  337. {
  338. log.Info($"【停止实时录像错误,没有录像文件】");
  339. }
  340. }
  341. }
  342. }
  343. catch (Exception ex)
  344. {
  345. log.Info("【异常-SaveData】" + ex.Message);
  346. }
  347. }
  348. }));
  349. }
  350. }
  351. await Task.WhenAll(tasks);
  352. }
  353. }
  354. private string DownLoad_Video(string pathUrl)
  355. {
  356. try
  357. {
  358. string serviceUrl = _configuration.GetSection("ServiceUrl").Value;
  359. //文件下载地址
  360. string path = System.IO.Directory.GetCurrentDirectory();
  361. DateTime time = DateTime.Now;
  362. string fileUrl = "/temp/Video/" + time.Year + "/" + time.Month + "/";
  363. string fileNmae = $"{time.ToString("yyyyMMddHHmmssfff")}.mp4";
  364. path += fileUrl;
  365. // 如果不存在就创建file文件夹
  366. if (!Directory.Exists(path))
  367. {
  368. if (path != null) Directory.CreateDirectory(path);
  369. }
  370. path += fileNmae;
  371. FileHelp.DownLoadVideo(pathUrl, path);
  372. string videoUrl= serviceUrl + fileUrl + fileNmae;
  373. return videoUrl;
  374. }
  375. catch (Exception ex)
  376. {
  377. return null;
  378. }
  379. }
  380. public Task StopAsync(CancellationToken cancellationToken)
  381. {
  382. Dispose();
  383. return Task.CompletedTask;
  384. }
  385. public void Dispose()
  386. {
  387. if (channel != null)
  388. {
  389. channel.Close();
  390. }
  391. if (con != null)
  392. {
  393. con.Close();
  394. }
  395. }
  396. }
  397. }