123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- using log4net;
- using Microsoft.AspNetCore.Connections;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Channels;
- using System.Threading.Tasks;
- namespace Ropin.Inspection.Common.Helper
- {
- public static class RabbitMQHelper
- {
- /// <summary>
- /// 发送RabbitMQ消息
- /// </summary>
- /// <param name="model"></param>
- /// <returns></returns>
- public static async Task<bool> SnedRabbitMQ(RabbitMQModel model)
- {
- if (model.IsOfficial == "false")
- {
- model.QueueName = model.QueueName + "Test";
- }
- try
- {
- var factory = new ConnectionFactory()
- {
- HostName = model.HostName,// "60.204.212.71",//IP地址
- Port = model.Port,// 5672,//端口号
- UserName = model.UserName,//"guest",//用户账号
- VirtualHost = model.VirtualHost,// "/",
- Password = model.Password,//"guest"//用户密码
- };
- using (IConnection con = factory.CreateConnection())//创建连接对象
- {
- using (IModel channel = con.CreateModel())//创建连接会话对象
- {
- //声明队列
- var queue = channel.QueueDeclare(
- queue: model.QueueName, //消息队列名称
- durable: false, //是否缓存
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
- //var msg = new
- //{
- // devStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf",
- // devRunSpot = "295458455312930168",
- // time = DateTime.Now
- //};
- //string msgStr = JsonConvert.SerializeObject(msg);
- channel.ConfirmSelect();
- var body = Encoding.UTF8.GetBytes(model.msgStr);
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true; // 设置消息持久化
- channel.BasicPublish("", model.QueueName, properties, body);
- bool bol = channel.WaitForConfirms();
- return bol;
- }
- }
- }
- catch (Exception ex)
- {
- return false;
- }
- }
- /// <summary>
- /// 发送RabbitMQ消息-Direct交换机【Direct Exchange
- /// 描述:直连交换机。消息只能被发送到那些与绑定键(binding key)完全匹配的队列。
- /// 使用场景:适用于简单的路由规则,例如,基于一个特定的路由键(routing key)将消息发送到特定队列。】
- /// </summary>
- /// <param name="model"></param>
- /// <returns></returns>
- public static async Task<bool> SnedRabbitMQ_ExchangeDirect(RabbitMQModel model)
- {
- if (model.IsOfficial == "false")
- {
- model.QueueName = model.QueueName + "Test";
- }
- model.ExchangeName = model.QueueName + ".DirectExchange";
- model.RoutingKey = model.QueueName + ".key";
- try
- {
- var factory = new ConnectionFactory()
- {
- HostName = model.HostName,// "60.204.212.71",//IP地址
- Port = model.Port,// 5672,//端口号
- UserName = model.UserName,//"guest",//用户账号
- VirtualHost = model.VirtualHost,// "/",
- Password = model.Password,//"guest"//用户密码
- };
- using (IConnection con = factory.CreateConnection())//创建连接对象
- {
- using (IModel channel = con.CreateModel())//创建连接会话对象
- {
- channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例
- //声明队列
- var queue = channel.QueueDeclare(
- queue: model.QueueName, //消息队列名称
- durable: false, //是否缓存
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
- //将队列与交换机进行绑定
- channel.QueueBind(model.QueueName, model.ExchangeName,model.RoutingKey);
- channel.ConfirmSelect();
- var body = Encoding.UTF8.GetBytes(model.msgStr);
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true; // 设置消息持久化
- channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body);
- bool bol = channel.WaitForConfirms();
- return bol;
- }
- }
- }
- catch (Exception ex)
- {
- return false;
- }
- }
- /// <summary>
- /// 发送RabbitMQ消息-Topic交换机 【Topic Exchange
- /// 描述:主题交换机。消息会根据路由键的模式匹配规则发送到队列。路由键可以是多个单词,由点分隔,例如 "stock.usd.nyse",队列可以绑定类似 "stock.*" 或 "*.usd.*" 的模式。
- /// 使用场景:适用于需要基于更复杂模式匹配的路由规则的场景。】
- /// </summary>
- /// <param name="model"></param>
- /// <returns></returns>
- public static async Task<bool> SnedRabbitMQ_ExchangeTopic(RabbitMQModel model)
- {
- if (model.IsOfficial == "false")
- {
- model.QueueName = model.QueueName + "Test";
- }
- try
- {
- var factory = new ConnectionFactory()
- {
- HostName = model.HostName,// "60.204.212.71",//IP地址
- Port = model.Port,// 5672,//端口号
- UserName = model.UserName,//"guest",//用户账号
- VirtualHost = model.VirtualHost,// "/",
- Password = model.Password,//"guest"//用户密码
- };
- using (IConnection con = factory.CreateConnection())//创建连接对象
- {
- using (IModel channel = con.CreateModel())//创建连接会话对象
- {
- channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Topic, durable: true); // Topic 交换机示例
- //声明队列
- var queue = channel.QueueDeclare(
- queue: model.QueueName, //消息队列名称
- durable: false, //是否缓存
- exclusive: false,
- autoDelete: false,
- arguments: null
- );
- //将队列与交换机进行绑定
- channel.QueueBind(queue.QueueName, model.ExchangeName, model.RoutingKey);
- channel.ConfirmSelect();
- var body = Encoding.UTF8.GetBytes(model.msgStr);
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true; // 设置消息持久化
- properties.DeliveryMode = 2; // 持久化消息
- channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body);
- bool bol = channel.WaitForConfirms();
- return bol;
- }
- }
- }
- catch (Exception ex)
- {
- return false;
- }
- }
-
- }
- public class RabbitMQModel
- {
- //值是false 时,消息队列名称+Test
- public string IsOfficial { get; set; }
- //消息内容
- public string msgStr { get; set; }
- //消息队列名称
- public string QueueName { get; set; }
- //IP地址
- public string HostName { get; set;}
- //端口号
- public int Port { get; set;}
- //用户账号
- public string UserName { get; set; }
- //用户密码
- public string Password { get; set; }
- public string VirtualHost { get; set; }
- // 获取或设置交换机名称。
- public string ExchangeName { get; set; }
- // 路由键
- public string RoutingKey { get; set; }
- }
- }
|