using log4net; using Microsoft.AspNetCore.Connections; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Channels; using System.Threading.Tasks; namespace Ropin.Inspection.Common.Helper { public static class RabbitMQHelper { /// /// 发送RabbitMQ消息 /// /// /// public static async Task SnedRabbitMQ(RabbitMQModel model) { if (model.IsOfficial == "false") { model.QueueName = model.QueueName + "Test"; } try { var factory = new ConnectionFactory() { HostName = model.HostName,// "60.204.212.71",//IP地址 Port = model.Port,// 5672,//端口号 UserName = model.UserName,//"guest",//用户账号 VirtualHost = model.VirtualHost,// "/", Password = model.Password,//"guest"//用户密码 }; using (IConnection con = factory.CreateConnection())//创建连接对象 { using (IModel channel = con.CreateModel())//创建连接会话对象 { //声明队列 var queue = channel.QueueDeclare( queue: model.QueueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ); //var msg = new //{ // devStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf", // devRunSpot = "295458455312930168", // time = DateTime.Now //}; //string msgStr = JsonConvert.SerializeObject(msg); channel.ConfirmSelect(); var body = Encoding.UTF8.GetBytes(model.msgStr); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 设置消息持久化 channel.BasicPublish("", model.QueueName, properties, body); bool bol = channel.WaitForConfirms(); return bol; } } } catch (Exception ex) { return false; } } /// /// 发送RabbitMQ消息-Direct交换机【Direct Exchange /// 描述:直连交换机。消息只能被发送到那些与绑定键(binding key)完全匹配的队列。 /// 使用场景:适用于简单的路由规则,例如,基于一个特定的路由键(routing key)将消息发送到特定队列。】 /// /// /// public static async Task SnedRabbitMQ_ExchangeDirect(RabbitMQModel model) { if (model.IsOfficial == "false") { model.QueueName = model.QueueName + "Test"; } model.ExchangeName = model.QueueName + ".DirectExchange"; model.RoutingKey = model.QueueName + ".key"; try { var factory = new ConnectionFactory() { HostName = model.HostName,// "60.204.212.71",//IP地址 Port = model.Port,// 5672,//端口号 UserName = model.UserName,//"guest",//用户账号 VirtualHost = model.VirtualHost,// "/", Password = model.Password,//"guest"//用户密码 }; using (IConnection con = factory.CreateConnection())//创建连接对象 { using (IModel channel = con.CreateModel())//创建连接会话对象 { channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例 //声明队列 var queue = channel.QueueDeclare( queue: model.QueueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ); //将队列与交换机进行绑定 channel.QueueBind(model.QueueName, model.ExchangeName,model.RoutingKey); channel.ConfirmSelect(); var body = Encoding.UTF8.GetBytes(model.msgStr); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 设置消息持久化 channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body); bool bol = channel.WaitForConfirms(); return bol; } } } catch (Exception ex) { return false; } } /// /// 发送RabbitMQ消息-Topic交换机 【Topic Exchange /// 描述:主题交换机。消息会根据路由键的模式匹配规则发送到队列。路由键可以是多个单词,由点分隔,例如 "stock.usd.nyse",队列可以绑定类似 "stock.*" 或 "*.usd.*" 的模式。 /// 使用场景:适用于需要基于更复杂模式匹配的路由规则的场景。】 /// /// /// public static async Task SnedRabbitMQ_ExchangeTopic(RabbitMQModel model) { if (model.IsOfficial == "false") { model.QueueName = model.QueueName + "Test"; } try { var factory = new ConnectionFactory() { HostName = model.HostName,// "60.204.212.71",//IP地址 Port = model.Port,// 5672,//端口号 UserName = model.UserName,//"guest",//用户账号 VirtualHost = model.VirtualHost,// "/", Password = model.Password,//"guest"//用户密码 }; using (IConnection con = factory.CreateConnection())//创建连接对象 { using (IModel channel = con.CreateModel())//创建连接会话对象 { channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Topic, durable: true); // Topic 交换机示例 //声明队列 var queue = channel.QueueDeclare( queue: model.QueueName, //消息队列名称 durable: false, //是否缓存 exclusive: false, autoDelete: false, arguments: null ); //将队列与交换机进行绑定 channel.QueueBind(queue.QueueName, model.ExchangeName, model.RoutingKey); channel.ConfirmSelect(); var body = Encoding.UTF8.GetBytes(model.msgStr); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 设置消息持久化 properties.DeliveryMode = 2; // 持久化消息 channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body); bool bol = channel.WaitForConfirms(); return bol; } } } catch (Exception ex) { return false; } } } public class RabbitMQModel { //值是false 时,消息队列名称+Test public string IsOfficial { get; set; } //消息内容 public string msgStr { get; set; } //消息队列名称 public string QueueName { get; set; } //IP地址 public string HostName { get; set;} //端口号 public int Port { get; set;} //用户账号 public string UserName { get; set; } //用户密码 public string Password { get; set; } public string VirtualHost { get; set; } // 获取或设置交换机名称。 public string ExchangeName { get; set; } // 路由键 public string RoutingKey { get; set; } } }