using log4net; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Ropin.Inspection.Common.Helper; using Ropin.Inspection.Model.Entities; using Ropin.Inspection.Repository; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Linq; using Ropin.Inspection.Repository.VMC.Interface; using Ropin.Inspection.Model.ViewModel.VMC; using Ropin.Inspection.Model.SearchModel.VMC; using Ropin.Inspection.Repository.MTN.Interface; using System.IO; using Renci.SshNet.Messages; using Microsoft.Extensions.Configuration; using System.Net.NetworkInformation; using Ropin.Inspection.Repository.SYS.Interface; namespace Ropin.Environmentally.VideoService.service { public class RabbitMQVideoService : IHostedService, IDisposable { private readonly IServiceProvider _provider; private static readonly ILog log = LogManager.GetLogger(typeof(RabbitMQVideoService)); private readonly RabbitMQModel _rabbitMQModel; private readonly IConfiguration _configuration; public RabbitMQVideoService(IServiceProvider provider, RabbitMQModel rabbitMQModel, IConfiguration configuration) { this._provider = provider; _rabbitMQModel = rabbitMQModel; _configuration = configuration; } private IConnection con; private IModel channel; public Task StartAsync(CancellationToken cancellationToken) { Task.Run(async () => { //string dewUrl = "http://124.71.132.255:10000/sms/34020000002020000001/api/v1/downloads/record_0200000001_20241121091328_0.mp4"; //string urls= DownLoad_Video(dewUrl); //log.Info("【DownLoad_Video】" + urls); //List list = new List(); //list.Add(new TMTN_PushMsgResult //{ // C_ID = Guid.NewGuid().ToString(), // C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf" //}); //list.Add(new TMTN_PushMsgResult //{ // C_ID = Guid.NewGuid().ToString(), // C_DevStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf" //}); //list.Add(new TMTN_PushMsgResult //{ // C_ID = Guid.NewGuid().ToString(), // C_DevStoreCode = "acb0c8c2-38d1-4143-a938-ce2df1741c35" //}); //await VideoExecute(list); await AddRabbitMQ(); //while (true) //{ // await Task.Delay(5000);//10000:10s //} }); return Task.CompletedTask; } public async Task AddRabbitMQ() { try { var factory = new ConnectionFactory() { HostName = _rabbitMQModel.HostName,//"60.204.212.71",//IP地址 Port = _rabbitMQModel.Port,//5672,//端口号 UserName = _rabbitMQModel.UserName,//"guest",//用户账号 VirtualHost = _rabbitMQModel.VirtualHost,//"/", Password = _rabbitMQModel.Password,// "guest"//用户密码 }; if (con == null || con.IsOpen == false) { con = factory.CreateConnection();//创建连接对象 } if (channel == null || channel.IsOpen == false) { channel = con.CreateModel();//创建连接会话对象 } channel.ExchangeDeclare(_rabbitMQModel.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例 //声明队列 var queueName = channel.QueueDeclare( queue: _rabbitMQModel.QueueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ).QueueName; channel.QueueBind(queueName, _rabbitMQModel.ExchangeName, _rabbitMQModel.RoutingKey); //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 channel.BasicQos(0, 1, false); channel.ConfirmSelect(); // 开启消息确认模式 //创建消费者对象 var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); log.Info("【RabbitMQ】" + message); //var ReceiveData = JsonConvert.DeserializeObject>(message); //ReceiveData = ReceiveData?.OrderBy(t => t.C_DevStoreCode).ToList(); //await VideoExecute1(ReceiveData); var ReceiveData = JsonConvert.DeserializeObject(message); await VideoExecute(ReceiveData); // 确认消息已处理 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (Exception ex) { channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队或发送到死信队列的逻辑需要自定义实现 log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message); } }; //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } catch (Exception ex) { log.Info("【异常-RabbitMQ】" + ex.Message); } } public async Task VideoExecute(TSYS_Message message) { List tasks0 = new List(); if (message != null) { string devCode = message.C_DevStoreCode; tasks0.Add(Task.Run(async () => { using (var scope = _provider.GetRequiredService().CreateScope()) { try { var vmcCameraRepository = scope.ServiceProvider.GetService(); VmcDevSearch searchModel = new VmcDevSearch(); searchModel.IsPagination = false; searchModel.C_Status = "1"; searchModel.C_DevStoreCode = devCode; IEnumerable viewModelsList = await vmcCameraRepository.GetConditionAsync(searchModel); await SaveData(message, viewModelsList); Thread.Sleep(1000); } catch (Exception ex) { log.Info("【异常-VideoExecute】" + ex.Message); throw; } } })); } await Task.WhenAll(tasks0); } public async Task SaveData(TSYS_Message message, IEnumerable viewModelsList) { if (message != null&& viewModelsList!=null) { List tasks = new List(); foreach (var vmc in viewModelsList) { if (vmc.C_Status == "1") { //线程 tasks.Add(Task.Run(async () => { using (var scope = _provider.GetRequiredService().CreateScope()) { LiveGBSHelper.loginUrl = vmc.C_CameraTypeValue; var messageFileRepository = scope.ServiceProvider.GetService(); try { if (vmc != null && !string.IsNullOrEmpty(vmc.C_Serial) && !string.IsNullOrEmpty(vmc.Codes)) { //TBDM_CodeDetail codeDetail = dbmList.Where(t => t.C_Name == vmc.C_TypeName).FirstOrDefault(); int ss = vmc.F_ShootingTime;//60; //if (codeDetail != null) //{ // ss = Convert.ToInt32(codeDetail.C_Value); //} VideoStartRecording videoUrlModel = await LiveGBSHelper.StartRecording(vmc.C_Serial, vmc.Codes); Thread.Sleep(ss * 1000); if (videoUrlModel != null && !string.IsNullOrEmpty(videoUrlModel.DownloadURL)) { VideoRecording channelVideo = await LiveGBSHelper.StopRecording(vmc.C_Serial, vmc.Codes); if (channelVideo != null && channelVideo.RecordList != null && channelVideo.RecordList.Count > 0) { VideoRecordingMode videoRecording = channelVideo.RecordList.Where(v => v.DownloadURL == videoUrlModel.DownloadURL).FirstOrDefault(); if (videoRecording != null && !string.IsNullOrEmpty(videoRecording.EndTime)) { //string videoUrl = DownLoad_Video(videoUrlModel.DownloadURL); ///api/v1/cloudrecord/video/:operate/:serial/:code/:starttime/:endtime/video.mp4 //string videoUrl = $"{LiveGBSHelper.loginUrl}api/v1/cloudrecord/video/play/{vmc.C_Serial}/{vmc.Codes}/{videoRecording.StartTime}/{videoRecording.EndTime}/video.mp4"; string videoUrl = "/sms" + videoUrlModel.DownloadURL.Split("sms")[1]; TSYS_MessageFile messageFile = new TSYS_MessageFile(); messageFile.C_ID = Guid.NewGuid().ToString(); messageFile.C_MessageCode = message.C_ID; messageFile.C_Url = videoUrl; messageFile.C_Type = "FILE_TYP_004"; messageFile.C_CreateBy = Guid.NewGuid().ToString(); messageFile.D_CreateOn = DateTime.Now; messageFile.C_Status = "1"; messageFileRepository.Create(messageFile); var bolResult = await messageFileRepository.SaveAsync(); } else { log.Info($"【停止实时录像错误,没有录像文件或者录像停止时间为空下载地址不能使用】"); } } else { log.Info($"【停止实时录像错误,没有录像文件】"); } } } } catch (Exception ex) { log.Info("【异常-SaveData】" + ex.Message); } } })); } } await Task.WhenAll(tasks); } } public async Task VideoExecute1(List list) { List tasks0 = new List(); List devCode = list.Select(t => t.C_DevStoreCode).Distinct().ToList(); foreach (var item in devCode) {//线程 tasks0.Add(Task.Run(async () => { using (var scope = _provider.GetRequiredService().CreateScope()) { try { var vmcCameraRepository = scope.ServiceProvider.GetService(); //var bdmCodeDetailRepository = scope.ServiceProvider.GetService(); //var mtnAlarmShadowRecordRepository = scope.ServiceProvider.GetService(); //var dbmList = await bdmCodeDetailRepository.GetByConditionAsync(t => t.C_MainCode == "VIDEORECORD_TIME"); VmcDevSearch searchModel = new VmcDevSearch(); searchModel.IsPagination = false; searchModel.C_Status="1"; searchModel.C_DevStoreCode = item; IEnumerable viewModelsList = await vmcCameraRepository.GetConditionAsync(searchModel); await SaveData1(list, viewModelsList, item); Thread.Sleep(1000); } catch (Exception ex) { log.Info("【异常-VideoExecute】" + ex.Message); throw; } } })); } await Task.WhenAll(tasks0); } public async Task SaveData1(List list,IEnumerable viewModelsList, string devCode) { if (viewModelsList!=null) { List tasks = new List(); foreach (var vmc in viewModelsList) { if (vmc.C_Status == "1") { //线程 tasks.Add(Task.Run(async () => { using (var scope = _provider.GetRequiredService().CreateScope()) { LiveGBSHelper.loginUrl = vmc.C_CameraTypeValue; var mtnAlarmShadowRecordRepository = scope.ServiceProvider.GetService(); try { if (vmc != null && !string.IsNullOrEmpty(vmc.C_Serial) && !string.IsNullOrEmpty(vmc.Codes)) { //TBDM_CodeDetail codeDetail = dbmList.Where(t => t.C_Name == vmc.C_TypeName).FirstOrDefault(); int ss = vmc.F_ShootingTime;//60; //if (codeDetail != null) //{ // ss = Convert.ToInt32(codeDetail.C_Value); //} VideoStartRecording videoUrlModel = await LiveGBSHelper.StartRecording(vmc.C_Serial, vmc.Codes); Thread.Sleep(ss * 1000); if (videoUrlModel != null && !string.IsNullOrEmpty(videoUrlModel.DownloadURL)) { VideoRecording channelVideo = await LiveGBSHelper.StopRecording(vmc.C_Serial, vmc.Codes); if (channelVideo != null && channelVideo.RecordList != null && channelVideo.RecordList.Count > 0) { VideoRecordingMode videoRecording = channelVideo.RecordList.Where(v => v.DownloadURL == videoUrlModel.DownloadURL).FirstOrDefault(); if (videoRecording != null && !string.IsNullOrEmpty(videoRecording.EndTime)) { //string videoUrl = DownLoad_Video(videoUrlModel.DownloadURL); ///api/v1/cloudrecord/video/:operate/:serial/:code/:starttime/:endtime/video.mp4 //string videoUrl = $"{LiveGBSHelper.loginUrl}api/v1/cloudrecord/video/play/{vmc.C_Serial}/{vmc.Codes}/{videoRecording.StartTime}/{videoRecording.EndTime}/video.mp4"; string videoUrl = "/sms" + videoUrlModel.DownloadURL.Split("sms")[1]; var mtnList = list.Where(m => m.C_DevStoreCode == devCode); foreach (var m in mtnList) { TMTN_AlarmShadowRecord alarmShadowRecord = new TMTN_AlarmShadowRecord(); alarmShadowRecord.C_ID = Guid.NewGuid().ToString(); alarmShadowRecord.C_PushMsgResultCode = m.C_ID; alarmShadowRecord.C_RecordUrl = videoUrl; alarmShadowRecord.C_Type = "1"; alarmShadowRecord.C_CameraServiceType = vmc.C_CameraType; alarmShadowRecord.C_CameraCode = vmc.C_CameraCode; alarmShadowRecord.C_DevStoreCode = devCode; mtnAlarmShadowRecordRepository.Create(alarmShadowRecord); } var bolResult = await mtnAlarmShadowRecordRepository.SaveAsync(); } else { log.Info($"【停止实时录像错误,没有录像文件或者录像停止时间为空下载地址不能使用】"); } } else { log.Info($"【停止实时录像错误,没有录像文件】"); } } } } catch (Exception ex) { log.Info("【异常-SaveData】" + ex.Message); } } })); } } await Task.WhenAll(tasks); } } private string DownLoad_Video(string pathUrl) { try { string serviceUrl = _configuration.GetSection("ServiceUrl").Value; //文件下载地址 string path = System.IO.Directory.GetCurrentDirectory(); DateTime time = DateTime.Now; string fileUrl = "/temp/Video/" + time.Year + "/" + time.Month + "/"; string fileNmae = $"{time.ToString("yyyyMMddHHmmssfff")}.mp4"; path += fileUrl; // 如果不存在就创建file文件夹 if (!Directory.Exists(path)) { if (path != null) Directory.CreateDirectory(path); } path += fileNmae; FileHelp.DownLoadVideo(pathUrl, path); string videoUrl= serviceUrl + fileUrl + fileNmae; return videoUrl; } catch (Exception ex) { return null; } } public Task StopAsync(CancellationToken cancellationToken) { Dispose(); return Task.CompletedTask; } public void Dispose() { if (channel != null) { channel.Close(); } if (con != null) { con.Close(); } } } }