WeatherForecastController.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. using FBoxClientDriver.Contract;
  2. using log4net;
  3. using Microsoft.AspNetCore.Mvc;
  4. using Microsoft.Extensions.Logging;
  5. using Newtonsoft.Json;
  6. using RabbitMQ.Client;
  7. using RabbitMQ.Client.Events;
  8. using System;
  9. using System.Collections;
  10. using System.Collections.Generic;
  11. using System.Drawing;
  12. using System.Linq;
  13. using System.Text;
  14. using System.Threading.Channels;
  15. using System.Threading.Tasks;
  16. namespace Ropin.Environmentally.LedgeService1.Controllers
  17. {
  18. [ApiController]
  19. [Route("api/[controller]")]
  20. public class WeatherForecastController : ControllerBase
  21. {
  22. private static readonly string[] Summaries = new[]
  23. {
  24. "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
  25. };
  26. private readonly ILogger<WeatherForecastController> _logger;
  27. private static readonly ILog log = LogManager.GetLogger(typeof(WeatherForecastController));
  28. private IConnection con;
  29. private IModel channel;
  30. public WeatherForecastController(ILogger<WeatherForecastController> logger)
  31. {
  32. _logger = logger;
  33. }
  34. [HttpGet]
  35. public IEnumerable<WeatherForecast> Get()
  36. {
  37. var rng = new Random();
  38. return Enumerable.Range(1, 5).Select(index => new WeatherForecast
  39. {
  40. Date = DateTime.Now.AddDays(index),
  41. TemperatureC = rng.Next(-20, 55),
  42. Summary = Summaries[rng.Next(Summaries.Length)]
  43. })
  44. .ToArray();
  45. }
  46. [HttpPost("TestSendTopic/str")]
  47. public string TestSendTopic(string str)
  48. {
  49. try
  50. {
  51. var factory = new ConnectionFactory()
  52. {
  53. HostName = "60.204.212.71",//IP地址
  54. Port = 5672,//端口号
  55. UserName = "guest",//用户账号
  56. VirtualHost = "/",
  57. Password = "guest"//用户密码
  58. };
  59. if (con == null || con.IsOpen == false)
  60. {
  61. con = factory.CreateConnection();
  62. }
  63. if (channel == null || channel.IsOpen == false)
  64. {
  65. channel = con.CreateModel();//创建连接会话对象
  66. }
  67. string change = "rabbit.test.changeTopic";
  68. string queueName = "rabbit.test";
  69. string keyName = "rabbit.test.key";
  70. //声明交换机
  71. //channel.ExchangeDeclare(change, "fanout", durable: true);
  72. //channel.ExchangeDeclare(change, type: ExchangeType.Direct); // Direct 交换机示例
  73. channel.ExchangeDeclare(change, type: ExchangeType.Topic, durable: true); // Topic 交换机示例
  74. //声明队列
  75. var queue = channel.QueueDeclare(
  76. queue: queueName, //消息队列名称
  77. durable: false, //是否缓存
  78. exclusive: false,
  79. autoDelete: false,
  80. arguments: null
  81. );
  82. //将队列与交换机进行绑定
  83. channel.QueueBind(queueName, change, keyName);
  84. channel.ConfirmSelect();
  85. //var msg = new
  86. //{
  87. // devStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf",
  88. // devRunSpot = "295458455312930168",
  89. // time = DateTime.Now
  90. //};
  91. string msgStr = str+"你好!" + DateTime.Now;//JsonConvert.SerializeObject(msg);
  92. //var ReceiveData = JsonConvert.DeserializeObject<ReceiveRabbitMQData>(msgStr);
  93. var body = Encoding.UTF8.GetBytes(msgStr);
  94. var properties = channel.CreateBasicProperties();
  95. properties.Persistent = true; // 设置消息持久化
  96. properties.DeliveryMode = 2; // 持久化消息
  97. channel.BasicPublish(change, keyName, properties, body);
  98. bool bol = channel.WaitForConfirms();
  99. Console.WriteLine("返回消息:" + bol + ";数据:" + msgStr);
  100. return msgStr;
  101. //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
  102. //channel.BasicQos(0, 1, false);
  103. //创建消费者对象
  104. //var consumer = new EventingBasicConsumer(channel);
  105. //consumer.Received += (model, ea) =>
  106. //{
  107. // var body = ea.Body.ToArray();
  108. // var message = Encoding.UTF8.GetString(body);
  109. // logger.LogInformation("【RabbitMQ】"+message);
  110. // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message);
  111. // // 在这里处理关机事件
  112. // //返回消息确认
  113. // //channel.BasicAck(ea.DeliveryTag, multiple: false);
  114. //};
  115. //消费者开启监听 将autoAck设置false 关闭自动确认
  116. //channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer);
  117. //发送消息
  118. //channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
  119. }
  120. catch (Exception ex)
  121. {
  122. log.Info("【异常-RabbitMQ-TestSend】" + ex.Message);
  123. throw;
  124. }
  125. }
  126. [HttpPost("TestSendDirect/str")]
  127. public string TestSendDirect(string str)
  128. {
  129. try
  130. {
  131. var factory = new ConnectionFactory()
  132. {
  133. HostName = "60.204.212.71",//IP地址
  134. Port = 5672,//端口号
  135. UserName = "guest",//用户账号
  136. VirtualHost = "/",
  137. Password = "guest"//用户密码
  138. };
  139. if (con == null || con.IsOpen == false)
  140. {
  141. con = factory.CreateConnection();
  142. }
  143. if (channel == null || channel.IsOpen == false)
  144. {
  145. channel = con.CreateModel();//创建连接会话对象
  146. }
  147. string change = "rabbit.test.DirectExchange";
  148. string queueName = "rabbit.test";
  149. string keyName = "rabbit.test.key";
  150. //声明交换机
  151. //channel.ExchangeDeclare(change, "fanout", durable: true);
  152. channel.ExchangeDeclare(change, type: ExchangeType.Direct); // Direct 交换机示例
  153. //声明队列
  154. var queue = channel.QueueDeclare(
  155. queue: queueName, //消息队列名称
  156. durable: false, //是否缓存
  157. exclusive: false,
  158. autoDelete: false,
  159. arguments: null
  160. );
  161. channel.ConfirmSelect();
  162. string msgStr = str+"你好!" + DateTime.Now;//JsonConvert.SerializeObject(msg);
  163. //var ReceiveData = JsonConvert.DeserializeObject<ReceiveRabbitMQData>(msgStr);
  164. var body = Encoding.UTF8.GetBytes(msgStr);
  165. var properties = channel.CreateBasicProperties();
  166. properties.Persistent = true; // 设置消息持久化
  167. properties.DeliveryMode = 2; // 持久化消息
  168. channel.BasicPublish(change, keyName, properties, body);
  169. bool bol = channel.WaitForConfirms();
  170. Console.WriteLine("返回消息:" + bol + ";数据:" + msgStr);
  171. return msgStr;
  172. //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
  173. //channel.BasicQos(0, 1, false);
  174. //创建消费者对象
  175. //var consumer = new EventingBasicConsumer(channel);
  176. //consumer.Received += (model, ea) =>
  177. //{
  178. // var body = ea.Body.ToArray();
  179. // var message = Encoding.UTF8.GetString(body);
  180. // logger.LogInformation("【RabbitMQ】"+message);
  181. // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message);
  182. // // 在这里处理关机事件
  183. // //返回消息确认
  184. // //channel.BasicAck(ea.DeliveryTag, multiple: false);
  185. //};
  186. //消费者开启监听 将autoAck设置false 关闭自动确认
  187. //channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer);
  188. //发送消息
  189. //channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
  190. }
  191. catch (Exception ex)
  192. {
  193. log.Info("【异常-RabbitMQ-TestSend】" + ex.Message);
  194. throw;
  195. }
  196. }
  197. [HttpPost("TestReceive")]
  198. public string TestReceive()
  199. {
  200. string message = "";
  201. try
  202. {
  203. var factory = new ConnectionFactory()
  204. {
  205. HostName = "60.204.212.71",//IP地址
  206. Port = 5672,//端口号
  207. UserName = "guest",//用户账号
  208. VirtualHost = "/",
  209. Password = "guest"//用户密码
  210. };
  211. if (con==null||con.IsOpen==false)
  212. {
  213. con = factory.CreateConnection();
  214. }
  215. if (channel==null||channel.IsOpen==false)
  216. {
  217. channel = con.CreateModel();//创建连接会话对象
  218. }
  219. //声明队列
  220. var queue = channel.QueueDeclare(
  221. queue: "rabbit@rabbitmq01_test", //消息队列名称
  222. durable: false, //是否缓存
  223. exclusive: false,
  224. autoDelete: false,
  225. arguments: null
  226. );
  227. //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
  228. channel.BasicQos(0, 1, false);
  229. //创建消费者对象
  230. var consumer = new EventingBasicConsumer(channel);
  231. consumer.Received += async (model, ea) =>
  232. {
  233. try
  234. {
  235. var body = ea.Body.ToArray();
  236. message = Encoding.UTF8.GetString(body);
  237. log.Info("【RabbitMQ】" + message);
  238. Console.WriteLine("【RabbitMQ】" + message);
  239. // Console.WriteLine(" [x] Received shutdown signal: '{0}'", message);
  240. // 在这里处理关机事件
  241. // //返回消息确认
  242. // channel.BasicAck(ea.DeliveryTag, multiple: true);
  243. }
  244. catch (Exception ex)
  245. {
  246. log.Info("【RabbitMQ-接收消息处理异常了】" + ex.Message);
  247. }
  248. };
  249. //消费者开启监听 将autoAck设置false 手动确认关闭;true:自动关闭;
  250. channel.BasicConsume(queue: queue.QueueName, autoAck: true, consumer: consumer);
  251. }
  252. catch (Exception ex)
  253. {
  254. log.Info("【异常-RabbitMQ-TestReceive】" + ex.Message);
  255. throw;
  256. }
  257. return message;
  258. }
  259. }
  260. }