RabbitMQHelper.cs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. using log4net;
  2. using Microsoft.AspNetCore.Connections;
  3. using RabbitMQ.Client;
  4. using RabbitMQ.Client.Events;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Text;
  9. using System.Threading.Channels;
  10. using System.Threading.Tasks;
  11. namespace Ropin.Inspection.Common.Helper
  12. {
  13. public static class RabbitMQHelper
  14. {
  15. /// <summary>
  16. /// 发送RabbitMQ消息
  17. /// </summary>
  18. /// <param name="model"></param>
  19. /// <returns></returns>
  20. public static async Task<bool> SnedRabbitMQ(RabbitMQModel model)
  21. {
  22. if (model.IsOfficial == "false")
  23. {
  24. model.QueueName = model.QueueName + "Test";
  25. }
  26. try
  27. {
  28. var factory = new ConnectionFactory()
  29. {
  30. HostName = model.HostName,// "60.204.212.71",//IP地址
  31. Port = model.Port,// 5672,//端口号
  32. UserName = model.UserName,//"guest",//用户账号
  33. VirtualHost = model.VirtualHost,// "/",
  34. Password = model.Password,//"guest"//用户密码
  35. };
  36. using (IConnection con = factory.CreateConnection())//创建连接对象
  37. {
  38. using (IModel channel = con.CreateModel())//创建连接会话对象
  39. {
  40. //声明队列
  41. var queue = channel.QueueDeclare(
  42. queue: model.QueueName, //消息队列名称
  43. durable: false, //是否缓存
  44. exclusive: false,
  45. autoDelete: false,
  46. arguments: null
  47. );
  48. //var msg = new
  49. //{
  50. // devStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf",
  51. // devRunSpot = "295458455312930168",
  52. // time = DateTime.Now
  53. //};
  54. //string msgStr = JsonConvert.SerializeObject(msg);
  55. channel.ConfirmSelect();
  56. var body = Encoding.UTF8.GetBytes(model.msgStr);
  57. var properties = channel.CreateBasicProperties();
  58. properties.Persistent = true; // 设置消息持久化
  59. channel.BasicPublish("", model.QueueName, properties, body);
  60. bool bol = channel.WaitForConfirms();
  61. return bol;
  62. }
  63. }
  64. }
  65. catch (Exception ex)
  66. {
  67. return false;
  68. }
  69. }
  70. /// <summary>
  71. /// 发送RabbitMQ消息-Direct交换机【Direct Exchange
  72. /// 描述:直连交换机。消息只能被发送到那些与绑定键(binding key)完全匹配的队列。
  73. /// 使用场景:适用于简单的路由规则,例如,基于一个特定的路由键(routing key)将消息发送到特定队列。】
  74. /// </summary>
  75. /// <param name="model"></param>
  76. /// <returns></returns>
  77. public static async Task<bool> SnedRabbitMQ_ExchangeDirect(RabbitMQModel model)
  78. {
  79. if (model.IsOfficial == "false")
  80. {
  81. model.QueueName = model.QueueName + "Test";
  82. }
  83. model.ExchangeName = model.QueueName + ".DirectExchange";
  84. model.RoutingKey = model.QueueName + ".key";
  85. try
  86. {
  87. var factory = new ConnectionFactory()
  88. {
  89. HostName = model.HostName,// "60.204.212.71",//IP地址
  90. Port = model.Port,// 5672,//端口号
  91. UserName = model.UserName,//"guest",//用户账号
  92. VirtualHost = model.VirtualHost,// "/",
  93. Password = model.Password,//"guest"//用户密码
  94. };
  95. using (IConnection con = factory.CreateConnection())//创建连接对象
  96. {
  97. using (IModel channel = con.CreateModel())//创建连接会话对象
  98. {
  99. channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例
  100. //声明队列
  101. var queue = channel.QueueDeclare(
  102. queue: model.QueueName, //消息队列名称
  103. durable: false, //是否缓存
  104. exclusive: false,
  105. autoDelete: false,
  106. arguments: null
  107. );
  108. //将队列与交换机进行绑定
  109. channel.QueueBind(model.QueueName, model.ExchangeName,model.RoutingKey);
  110. channel.ConfirmSelect();
  111. var body = Encoding.UTF8.GetBytes(model.msgStr);
  112. var properties = channel.CreateBasicProperties();
  113. properties.Persistent = true; // 设置消息持久化
  114. channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body);
  115. bool bol = channel.WaitForConfirms();
  116. return bol;
  117. }
  118. }
  119. }
  120. catch (Exception ex)
  121. {
  122. return false;
  123. }
  124. }
  125. /// <summary>
  126. /// 发送RabbitMQ消息-Topic交换机 【Topic Exchange
  127. /// 描述:主题交换机。消息会根据路由键的模式匹配规则发送到队列。路由键可以是多个单词,由点分隔,例如 "stock.usd.nyse",队列可以绑定类似 "stock.*" 或 "*.usd.*" 的模式。
  128. /// 使用场景:适用于需要基于更复杂模式匹配的路由规则的场景。】
  129. /// </summary>
  130. /// <param name="model"></param>
  131. /// <returns></returns>
  132. public static async Task<bool> SnedRabbitMQ_ExchangeTopic(RabbitMQModel model)
  133. {
  134. if (model.IsOfficial == "false")
  135. {
  136. model.QueueName = model.QueueName + "Test";
  137. }
  138. try
  139. {
  140. var factory = new ConnectionFactory()
  141. {
  142. HostName = model.HostName,// "60.204.212.71",//IP地址
  143. Port = model.Port,// 5672,//端口号
  144. UserName = model.UserName,//"guest",//用户账号
  145. VirtualHost = model.VirtualHost,// "/",
  146. Password = model.Password,//"guest"//用户密码
  147. };
  148. using (IConnection con = factory.CreateConnection())//创建连接对象
  149. {
  150. using (IModel channel = con.CreateModel())//创建连接会话对象
  151. {
  152. channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Topic, durable: true); // Topic 交换机示例
  153. //声明队列
  154. var queue = channel.QueueDeclare(
  155. queue: model.QueueName, //消息队列名称
  156. durable: false, //是否缓存
  157. exclusive: false,
  158. autoDelete: false,
  159. arguments: null
  160. );
  161. //将队列与交换机进行绑定
  162. channel.QueueBind(queue.QueueName, model.ExchangeName, model.RoutingKey);
  163. channel.ConfirmSelect();
  164. var body = Encoding.UTF8.GetBytes(model.msgStr);
  165. var properties = channel.CreateBasicProperties();
  166. properties.Persistent = true; // 设置消息持久化
  167. properties.DeliveryMode = 2; // 持久化消息
  168. channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body);
  169. bool bol = channel.WaitForConfirms();
  170. return bol;
  171. }
  172. }
  173. }
  174. catch (Exception ex)
  175. {
  176. return false;
  177. }
  178. }
  179. }
  180. public class RabbitMQModel
  181. {
  182. //值是false 时,消息队列名称+Test
  183. public string IsOfficial { get; set; }
  184. //消息内容
  185. public string msgStr { get; set; }
  186. //消息队列名称
  187. public string QueueName { get; set; }
  188. //IP地址
  189. public string HostName { get; set;}
  190. //端口号
  191. public int Port { get; set;}
  192. //用户账号
  193. public string UserName { get; set; }
  194. //用户密码
  195. public string Password { get; set; }
  196. public string VirtualHost { get; set; }
  197. // 获取或设置交换机名称。
  198. public string ExchangeName { get; set; }
  199. // 路由键
  200. public string RoutingKey { get; set; }
  201. }
  202. }