using log4net;
using Microsoft.AspNetCore.Connections;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Ropin.Inspection.Common.Helper
{
public static class RabbitMQHelper
{
///
/// 发送RabbitMQ消息
///
///
///
public static async Task SnedRabbitMQ(RabbitMQModel model)
{
if (model.IsOfficial == "false")
{
model.QueueName = model.QueueName + "Test";
}
try
{
var factory = new ConnectionFactory()
{
HostName = model.HostName,// "60.204.212.71",//IP地址
Port = model.Port,// 5672,//端口号
UserName = model.UserName,//"guest",//用户账号
VirtualHost = model.VirtualHost,// "/",
Password = model.Password,//"guest"//用户密码
};
using (IConnection con = factory.CreateConnection())//创建连接对象
{
using (IModel channel = con.CreateModel())//创建连接会话对象
{
//声明队列
var queue = channel.QueueDeclare(
queue: model.QueueName, //消息队列名称
durable: false, //是否缓存
exclusive: false,
autoDelete: false,
arguments: null
);
//var msg = new
//{
// devStoreCode = "8453d5ed-8a21-4880-88e7-f872e93551bf",
// devRunSpot = "295458455312930168",
// time = DateTime.Now
//};
//string msgStr = JsonConvert.SerializeObject(msg);
channel.ConfirmSelect();
var body = Encoding.UTF8.GetBytes(model.msgStr);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
channel.BasicPublish("", model.QueueName, properties, body);
bool bol = channel.WaitForConfirms();
return bol;
}
}
}
catch (Exception ex)
{
return false;
}
}
///
/// 发送RabbitMQ消息-Direct交换机【Direct Exchange
/// 描述:直连交换机。消息只能被发送到那些与绑定键(binding key)完全匹配的队列。
/// 使用场景:适用于简单的路由规则,例如,基于一个特定的路由键(routing key)将消息发送到特定队列。】
///
///
///
public static async Task SnedRabbitMQ_ExchangeDirect(RabbitMQModel model)
{
if (model.IsOfficial == "false")
{
model.QueueName = model.QueueName + "Test";
}
model.ExchangeName = model.QueueName + ".DirectExchange";
model.RoutingKey = model.QueueName + ".key";
try
{
var factory = new ConnectionFactory()
{
HostName = model.HostName,// "60.204.212.71",//IP地址
Port = model.Port,// 5672,//端口号
UserName = model.UserName,//"guest",//用户账号
VirtualHost = model.VirtualHost,// "/",
Password = model.Password,//"guest"//用户密码
};
using (IConnection con = factory.CreateConnection())//创建连接对象
{
using (IModel channel = con.CreateModel())//创建连接会话对象
{
channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Direct); // Direct 交换机示例
//声明队列
var queue = channel.QueueDeclare(
queue: model.QueueName, //消息队列名称
durable: false, //是否缓存
exclusive: false,
autoDelete: false,
arguments: null
);
//将队列与交换机进行绑定
channel.QueueBind(model.QueueName, model.ExchangeName,model.RoutingKey);
channel.ConfirmSelect();
var body = Encoding.UTF8.GetBytes(model.msgStr);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body);
bool bol = channel.WaitForConfirms();
return bol;
}
}
}
catch (Exception ex)
{
return false;
}
}
///
/// 发送RabbitMQ消息-Topic交换机 【Topic Exchange
/// 描述:主题交换机。消息会根据路由键的模式匹配规则发送到队列。路由键可以是多个单词,由点分隔,例如 "stock.usd.nyse",队列可以绑定类似 "stock.*" 或 "*.usd.*" 的模式。
/// 使用场景:适用于需要基于更复杂模式匹配的路由规则的场景。】
///
///
///
public static async Task SnedRabbitMQ_ExchangeTopic(RabbitMQModel model)
{
if (model.IsOfficial == "false")
{
model.QueueName = model.QueueName + "Test";
}
try
{
var factory = new ConnectionFactory()
{
HostName = model.HostName,// "60.204.212.71",//IP地址
Port = model.Port,// 5672,//端口号
UserName = model.UserName,//"guest",//用户账号
VirtualHost = model.VirtualHost,// "/",
Password = model.Password,//"guest"//用户密码
};
using (IConnection con = factory.CreateConnection())//创建连接对象
{
using (IModel channel = con.CreateModel())//创建连接会话对象
{
channel.ExchangeDeclare(model.ExchangeName, type: ExchangeType.Topic, durable: true); // Topic 交换机示例
//声明队列
var queue = channel.QueueDeclare(
queue: model.QueueName, //消息队列名称
durable: false, //是否缓存
exclusive: false,
autoDelete: false,
arguments: null
);
//将队列与交换机进行绑定
channel.QueueBind(queue.QueueName, model.ExchangeName, model.RoutingKey);
channel.ConfirmSelect();
var body = Encoding.UTF8.GetBytes(model.msgStr);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
properties.DeliveryMode = 2; // 持久化消息
channel.BasicPublish(model.ExchangeName, model.RoutingKey, properties, body);
bool bol = channel.WaitForConfirms();
return bol;
}
}
}
catch (Exception ex)
{
return false;
}
}
}
public class RabbitMQModel
{
//值是false 时,消息队列名称+Test
public string IsOfficial { get; set; }
//消息内容
public string msgStr { get; set; }
//消息队列名称
public string QueueName { get; set; }
//IP地址
public string HostName { get; set;}
//端口号
public int Port { get; set;}
//用户账号
public string UserName { get; set; }
//用户密码
public string Password { get; set; }
public string VirtualHost { get; set; }
// 获取或设置交换机名称。
public string ExchangeName { get; set; }
// 路由键
public string RoutingKey { get; set; }
}
}