using FBoxClientDriver.Contract; using log4net; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections; using System.Collections.Generic; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Channels; using System.Threading.Tasks; namespace Ropin.Environmentally.LedgeService1.Controllers { [ApiController] [Route("api/[controller]")] public class WeatherForecastController : ControllerBase { private static readonly string[] Summaries = new[] { "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" }; private readonly ILogger _logger; private static readonly ILog log = LogManager.GetLogger(typeof(WeatherForecastController)); private IConnection con; private IModel channel; public WeatherForecastController(ILogger logger) { _logger = logger; } [HttpGet] public IEnumerable Get() { var rng = new Random(); return Enumerable.Range(1, 5).Select(index => new WeatherForecast { Date = DateTime.Now.AddDays(index), TemperatureC = rng.Next(-20, 55), Summary = Summaries[rng.Next(Summaries.Length)] }) .ToArray(); } [HttpPost("TestSendTopic/str")] public string TestSendTopic(string str) { try { var factory = new ConnectionFactory() { HostName = "60.204.212.71",//IP地址 Port = 5672,//端口号 UserName = "guest",//用户账号 VirtualHost = "/", Password = "guest"//用户密码 }; if (con == null || con.IsOpen == false) { con = factory.CreateConnection(); } if (channel == null || channel.IsOpen == false) { channel = con.CreateModel();//创建连接会话对象 } string change = "rabbit.test.changeTopic"; string queueName = "rabbit.test"; string keyName = "rabbit.test.key"; //声明交换机 //channel.ExchangeDeclare(change, "fanout", durable: true); //channel.ExchangeDeclare(change, type: ExchangeType.Direct); // Direct 交换机示例 channel.ExchangeDeclare(change, type: ExchangeType.Topic, durable: true); // Topic 交换机示例 //声明队列 var queue = channel.QueueDeclare( queue: queueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ); //将队列与交换机进行绑定 channel.QueueBind(queueName, change, keyName); channel.ConfirmSelect(); //var msg = new //{ // devStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf", // devRunSpot = "295458455312930168", // time = DateTime.Now //}; string msgStr = str+"你好!" + DateTime.Now;//JsonConvert.SerializeObject(msg); //var ReceiveData = JsonConvert.DeserializeObject(msgStr); var body = Encoding.UTF8.GetBytes(msgStr); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 设置消息持久化 properties.DeliveryMode = 2; // 持久化消息 channel.BasicPublish(change, keyName, properties, body); bool bol = channel.WaitForConfirms(); Console.WriteLine("返回消息:" + bol + ";数据:" + msgStr); return msgStr; //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 //channel.BasicQos(0, 1, false); //创建消费者对象 //var consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ // var body = ea.Body.ToArray(); // var message = Encoding.UTF8.GetString(body); // logger.LogInformation("【RabbitMQ】"+message); // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message); // // 在这里处理关机事件 // //返回消息确认 // //channel.BasicAck(ea.DeliveryTag, multiple: false); //}; //消费者开启监听 将autoAck设置false 关闭自动确认 //channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer); //发送消息 //channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body); } catch (Exception ex) { log.Info("【异常-RabbitMQ-TestSend】" + ex.Message); throw; } } [HttpPost("TestSendDirect/str")] public string TestSendDirect(string str) { try { var factory = new ConnectionFactory() { HostName = "60.204.212.71",//IP地址 Port = 5672,//端口号 UserName = "guest",//用户账号 VirtualHost = "/", Password = "guest"//用户密码 }; if (con == null || con.IsOpen == false) { con = factory.CreateConnection(); } if (channel == null || channel.IsOpen == false) { channel = con.CreateModel();//创建连接会话对象 } string change = "rabbit.test.DirectExchange"; string queueName = "rabbit.test"; string keyName = "rabbit.test.key"; //声明交换机 //channel.ExchangeDeclare(change, "fanout", durable: true); channel.ExchangeDeclare(change, type: ExchangeType.Direct); // Direct 交换机示例 //声明队列 var queue = channel.QueueDeclare( queue: queueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ); channel.ConfirmSelect(); string msgStr = str+"你好!" + DateTime.Now;//JsonConvert.SerializeObject(msg); //var ReceiveData = JsonConvert.DeserializeObject(msgStr); var body = Encoding.UTF8.GetBytes(msgStr); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 设置消息持久化 properties.DeliveryMode = 2; // 持久化消息 channel.BasicPublish(change, keyName, properties, body); bool bol = channel.WaitForConfirms(); Console.WriteLine("返回消息:" + bol + ";数据:" + msgStr); return msgStr; //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 //channel.BasicQos(0, 1, false); //创建消费者对象 //var consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ // var body = ea.Body.ToArray(); // var message = Encoding.UTF8.GetString(body); // logger.LogInformation("【RabbitMQ】"+message); // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message); // // 在这里处理关机事件 // //返回消息确认 // //channel.BasicAck(ea.DeliveryTag, multiple: false); //}; //消费者开启监听 将autoAck设置false 关闭自动确认 //channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer); //发送消息 //channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body); } catch (Exception ex) { log.Info("【异常-RabbitMQ-TestSend】" + ex.Message); throw; } } [HttpPost("TestReceive")] public string TestReceive() { string message = ""; try { var factory = new ConnectionFactory() { HostName = "60.204.212.71",//IP地址 Port = 5672,//端口号 UserName = "guest",//用户账号 VirtualHost = "/", Password = "guest"//用户密码 }; if (con==null||con.IsOpen==false) { con = factory.CreateConnection(); } if (channel==null||channel.IsOpen==false) { channel = con.CreateModel();//创建连接会话对象 } //声明队列 var queue = channel.QueueDeclare( queue: "rabbit@rabbitmq01_test", //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ); //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息 channel.BasicQos(0, 1, false); //创建消费者对象 var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { try { var body = ea.Body.ToArray(); message = Encoding.UTF8.GetString(body); log.Info("【RabbitMQ】" + message); Console.WriteLine("【RabbitMQ】" + message); // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message); // 在这里处理关机事件 // //返回消息确认 // channel.BasicAck(ea.DeliveryTag, multiple: true); } catch (Exception ex) { log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message); } }; //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭; channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer); } catch (Exception ex) { log.Info("【异常-RabbitMQ-TestReceive】" + ex.Message); throw; } return message; } } }