|
- using FBoxClientDriver.Contract;
- using log4net;
- using Microsoft.AspNetCore.Mvc;
- using Microsoft.Extensions.Logging;
- using Newtonsoft.Json;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Drawing;
- using System.Linq;
- using System.Text;
- using System.Threading.Channels;
- using System.Threading.Tasks;
- namespace Ropin.Environmentally.LedgeService1.Controllers
- {
- [ApiController]
- [Route("api/[controller]")]
- public class WeatherForecastController : ControllerBase
- {
- private static readonly string[] Summaries = new[]
- {
- "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
- };
- private readonly ILogger<WeatherForecastController> _logger;
- private static readonly ILog log = LogManager.GetLogger(typeof(WeatherForecastController));
- private IConnection con;
- private IModel channel;
- public WeatherForecastController(ILogger<WeatherForecastController> logger)
- {
- _logger = logger;
- }
- [HttpGet]
- public IEnumerable<WeatherForecast> Get()
- {
- var rng = new Random();
- return Enumerable.Range(1, 5).Select(index => new WeatherForecast
- {
- Date = DateTime.Now.AddDays(index),
- TemperatureC = rng.Next(-20, 55),
- Summary = Summaries[rng.Next(Summaries.Length)]
- })
- .ToArray();
- }
- [HttpPost("TestSendTopic/str")]
- public string TestSendTopic(string str)
- {
- try
- {
- var factory = new ConnectionFactory()
- {
- HostName = "60.204.212.71",//IP地址
- Port = 5672,//端口号
- UserName = "guest",//用户账号
- VirtualHost = "/",
- Password = "guest"//用户密码
- };
- if (con == null || con.IsOpen == false)
- {
- con = factory.CreateConnection();
- }
- if (channel == null || channel.IsOpen == false)
- {
- channel = con.CreateModel();//创建连接会话对象
- }
- string change = "rabbit.test.changeTopic";
- string queueName = "rabbit.test";
- string keyName = "rabbit.test.key";
- //声明交换机
- //channel.ExchangeDeclare(change, "fanout", durable: true);
- //channel.ExchangeDeclare(change, type: ExchangeType.Direct); // Direct 交换机示例
- channel.ExchangeDeclare(change, type: ExchangeType.Topic, durable: true); // Topic 交换机示例
- //声明队列
- var queue = channel.QueueDeclare(
- queue: queueName, //消息队列名称
- durable: false, //是否缓存
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
- //将队列与交换机进行绑定
- channel.QueueBind(queueName, change, keyName);
- channel.ConfirmSelect();
- //var msg = new
- //{
- // devStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf",
- // devRunSpot = "295458455312930168",
- // time = DateTime.Now
- //};
- string msgStr = str+"你好!" + DateTime.Now;//JsonConvert.SerializeObject(msg);
- //var ReceiveData = JsonConvert.DeserializeObject<ReceiveRabbitMQData>(msgStr);
- var body = Encoding.UTF8.GetBytes(msgStr);
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true; // 设置消息持久化
- properties.DeliveryMode = 2; // 持久化消息
- channel.BasicPublish(change, keyName, properties, body);
- bool bol = channel.WaitForConfirms();
- Console.WriteLine("返回消息:" + bol + ";数据:" + msgStr);
- return msgStr;
- //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
- //channel.BasicQos(0, 1, false);
- //创建消费者对象
- //var consumer = new EventingBasicConsumer(channel);
- //consumer.Received += (model, ea) =>
- //{
- // var body = ea.Body.ToArray();
- // var message = Encoding.UTF8.GetString(body);
- // logger.LogInformation("【RabbitMQ】"+message);
- // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message);
- // // 在这里处理关机事件
- // //返回消息确认
- // //channel.BasicAck(ea.DeliveryTag, multiple: false);
- //};
- //消费者开启监听 将autoAck设置false 关闭自动确认
- //channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer);
- //发送消息
- //channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
- }
- catch (Exception ex)
- {
- log.Info("【异常-RabbitMQ-TestSend】" + ex.Message);
- throw;
- }
- }
- [HttpPost("TestSendDirect/str")]
- public string TestSendDirect(string str)
- {
- try
- {
- var factory = new ConnectionFactory()
- {
- HostName = "60.204.212.71",//IP地址
- Port = 5672,//端口号
- UserName = "guest",//用户账号
- VirtualHost = "/",
- Password = "guest"//用户密码
- };
- if (con == null || con.IsOpen == false)
- {
- con = factory.CreateConnection();
- }
- if (channel == null || channel.IsOpen == false)
- {
- channel = con.CreateModel();//创建连接会话对象
- }
- string change = "rabbit.test.DirectExchange";
- string queueName = "rabbit.test";
- string keyName = "rabbit.test.key";
- //声明交换机
- //channel.ExchangeDeclare(change, "fanout", durable: true);
- channel.ExchangeDeclare(change, type: ExchangeType.Direct); // Direct 交换机示例
- //声明队列
- var queue = channel.QueueDeclare(
- queue: queueName, //消息队列名称
- durable: false, //是否缓存
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
- channel.ConfirmSelect();
- string msgStr = str+"你好!" + DateTime.Now;//JsonConvert.SerializeObject(msg);
- //var ReceiveData = JsonConvert.DeserializeObject<ReceiveRabbitMQData>(msgStr);
- var body = Encoding.UTF8.GetBytes(msgStr);
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true; // 设置消息持久化
- properties.DeliveryMode = 2; // 持久化消息
- channel.BasicPublish(change, keyName, properties, body);
- bool bol = channel.WaitForConfirms();
- Console.WriteLine("返回消息:" + bol + ";数据:" + msgStr);
- return msgStr;
- //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
- //channel.BasicQos(0, 1, false);
- //创建消费者对象
- //var consumer = new EventingBasicConsumer(channel);
- //consumer.Received += (model, ea) =>
- //{
- // var body = ea.Body.ToArray();
- // var message = Encoding.UTF8.GetString(body);
- // logger.LogInformation("【RabbitMQ】"+message);
- // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message);
- // // 在这里处理关机事件
- // //返回消息确认
- // //channel.BasicAck(ea.DeliveryTag, multiple: false);
- //};
- //消费者开启监听 将autoAck设置false 关闭自动确认
- //channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer);
- //发送消息
- //channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
- }
- catch (Exception ex)
- {
- log.Info("【异常-RabbitMQ-TestSend】" + ex.Message);
- throw;
- }
- }
- [HttpPost("TestReceive")]
- public string TestReceive()
- {
- string message = "";
- try
- {
- var factory = new ConnectionFactory()
- {
- HostName = "60.204.212.71",//IP地址
- Port = 5672,//端口号
- UserName = "guest",//用户账号
- VirtualHost = "/",
- Password = "guest"//用户密码
- };
- if (con==null||con.IsOpen==false)
- {
- con = factory.CreateConnection();
- }
- if (channel==null||channel.IsOpen==false)
- {
- channel = con.CreateModel();//创建连接会话对象
- }
- //声明队列
- var queue = channel.QueueDeclare(
- queue: "rabbit@rabbitmq01_test", //消息队列名称
- durable: false, //是否缓存
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
- //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
- channel.BasicQos(0, 1, false);
- //创建消费者对象
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += async (model, ea) =>
- {
- try
- {
- var body = ea.Body.ToArray();
- message = Encoding.UTF8.GetString(body);
- log.Info("【RabbitMQ】" + message);
- Console.WriteLine("【RabbitMQ】" + message);
- // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message);
- // 在这里处理关机事件
- // //返回消息确认
- // channel.BasicAck(ea.DeliveryTag, multiple: true);
- }
- catch (Exception ex)
- {
- log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message);
- }
- };
- //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭;
- channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer);
- }
- catch (Exception ex)
- {
- log.Info("【异常-RabbitMQ-TestReceive】" + ex.Message);
- throw;
- }
- return message;
- }
- }
- }
|