重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

消息队列之RabbitMQ介绍与运用

RabbitMQ 说明
本章,我们主要从RabbitMQ简介RabbitMQ安装RabbitMQ常用命令RabbitMQ架构模式RabbitMQ使用Quick.RabbitMQPlus的使用RabbitMQ总结这几个方面对
RabbitMQ进行介绍!

目前创新互联建站已为上千的企业提供了网站建设、域名、虚拟空间、网站运营、企业网站设计、镇江网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

1、????RabbitMQ 简介

RabbitMQ 是使用Erlang语言开发的开源消息队列系统,基于 AMQP 协议来实现。

AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性和安全。AMQP 协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

MQ 全称为 Message Queue,消息队列(MQ)是一种应用程序应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。


2、????RabbitMQ 安装

以下我们主要介绍 RabbitMQ 在 Windows 环境中的安装过程。

2.1、???? 下载 OTP

由于 RabbitMQ 使用 Erlang 技术开发,所以需要先安装 Erlang 运行环境后,才能安装消息队列服务。

我们到https://www.erlang.org/downloads下载相应版本的安装包,如这里我们下载https://github.com/erlang/otp/releases/download/OTP-25.0.4/otp_win64_25.0.4.exe这个版本,如下图所示:

2.2、???? 下载 RabbitMQ

我们到https://www.rabbitmq.com/download.html下载相应版本的安装包,如这里我们下载https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.7/rabbitmq-server-3.10.7.exe这个版本,如下图所示:

2.3、???? 安装 Erlang 和 RabbitMQ

我们在 D 盘分别创建目录D:\Net_Program\Net_RabbitMQErlangD:\Net_Program\Net_RabbitMQ实际环境中根据自己的需求新建目录即可),用于安装 Erlang 和 RabbitMQ。

双击下载下来的 otp_win64_25.0.4.exe 和 rabbitmq-server-3.10.7.exe 进行安装,安装过程中将安装目录选择刚才创建的目录即可,其他按照默认设置即可。

2.4、???? 激活 Rabbit MQ's Management Plugin

激活步骤如下所示:

  • 以管理员身份运行 CMD

  • 然后切换到 RabbitMQ 的安装目录 sbin 下,D:\Net_Program\Net_RabbitMQ\rabbitmq_server-3.10.7\sbin

  • 然后输入如下命令并执行

    rabbitmq-plugins.bat enable rabbitmq_management
    

    如出现如下所示的提示信息,原因是安装了 Erlang 没有重启电脑导致的环境变量没有生效,重启电脑即可:

  • 然后输入如下命令重启 RabbitMQ 服务

    net stop rabbitmq && net start rabbitmq
    
  • 最后即可访问 RabbitMQ 的管理控制台了,访问地址(默认账户和密码为 guest):http://localhost:15672

激活过程如下图所示:

2.5、???? 远程设置

如果希望 RabbitMQ 允许远程连接,比如在 Windows Service2012 服务器上安装了 RabbitMQ,其他客户端想连接此服务器的 RabbitMQ,则需要设置防火墙开放端口。

具体设置步骤(以 Windows Service2012 为例):

  • 打开防火墙 → 入站规则 → 新建规则 → 选择“端口”,下一步 → 选择 TCP,并在特定本地端口中填入 15672,5671-5672,下一步 → 选择“允许连接”,下一步 → 下一步 → 输入名称或描述 → 完成。

2.6、???? Docker 中安装 RabbitMQ

如果你电脑上没安装 Docker,请先安装 Docker,可参考:Docker 的安装

RabbitMQ 在 Docker 中的镜像地址:https://hub.docker.com/_/rabbitmq

2.6.1、拉取镜像容器并安装

  • 拉取 RabbitMQ 镜像

    以管理员身份运行 CMD,执行如下命令拉取 RabbitMQ 镜像:

    docker pull rabbitmq
    

    拉取完成后,我们可以打开 Docker Desktop 客户端查看就多了一个名称为 rabbitmq 的镜像了,如下图所示:

  • 新建目录

    D:\Net_Program\Net_Docker\RabbitMQ下分别新建DataLog文件夹,用于存放 RabbitMQ 数据和日志:

    D:\Net_Program\Net_Docker\RabbitMQ\Data

    D:\Net_Program\Net_Docker\RabbitMQ\Log

  • 创建并启动容器

    以管理员身份运行 CMD,执行如下命令创建并启动容器:

    docker run -d --volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq --volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq
    

    :::tip 参数说明:

    完整执行命令如下:

    docker run -d --volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq --volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq
    
    • -d:表示在后台运行容器;

    • --volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq:映射 RabbitMQ 数据存储目录;

    • --volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq:映射 RabbitMQ 日志存储目录;

    • --name rabbitmq:设置容器名称;

    • --hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);

    • -p:将容器的端口 5672(应用访问端口)和 15672 (控制台 Web 端口号)映射到主机中;

    • -e:指定环境变量:

      • RABBITMQ_DEFAULT_VHOST:默认虚拟机名;

      • RABBITMQ_DEFAULT_USER:默认的用户名;

      • RABBITMQ_DEFAULT_PASS:默认的用户密码;

    :::

  • 启动 Docker 的时候自动启动 RabbitMQ

    以管理员身份运行 CMD,执行如下命令:

    docker update rabbitmq --restart=always
    

2.6.2、安装 Rabbit MQ's Management Plugin

  • 方式 1:

    以管理员身份运行 CMD,执行如下命令先进入 RabbitMQ 容器:

    docker exec -it rabbitmq /bin/bash
    

    再执行如下命令:

    rabbitmq-plugins enable rabbitmq_management
    

    这时候在浏览器中打开http://localhost:15672即可查看 RabbitMQ 的 Web 管理端了。

  • 方式 2:

    以管理员身份运行 CMD,执行如下命令即可:

    docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
    
  • 注意事项:

    当我们安装好插件后,打开http://localhost:15672Web 管理界面,使用 guest 账户登录进去后,点击Channels标签,会出现如下图所示的警告提醒:

    1. 以管理员身份运行 CMD,执行如下命令先进入 RabbitMQ 容器:

      docker exec -it rabbitmq /bin/bash
      
    2. 再执行如下命令切换到以下路径:

      cd /etc/rabbitmq/conf.d/
      
    3. 再执行如下命令修改management_agent.disable_metrics_collector = false

      echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
      
    4. 再执行如下命令退出容器:

      exit
      
    5. 再执行如下命令重启容器:

      docker restart rabbitmq
      

    执行命令过程:

    界面效果:


3、????RabbitMQ 常用命令

3.1、???? 用户管理

  • 增加用户:

    rabbitmqctl add_user user1 pwd1
    
  • 删除用户:

    rabbitmqctl delete_user user1
    
  • 修改密码:

    rabbitmqctl change_password user1 123456
    
  • 查看用户列表:

    rabbitmqctl list_users
    

3.2、???? 用户角色

  1. 设置用户角色:

    rabbitmqctl set_user_tags user1 Tag
    

    user1 为用户名称

    Tag 为角色名称,如:administrator、monitoring、policymaker、management、impersonator

  2. 设置多个角色:

    rabbitmqctl set_user_tags user1 Tag1 Tag2
    

执行命令如下图所示:

![ml]

![ml]

![ml]


4、????RabbitMQ 架构模式

4.1、????RabbitMQ 架构模式

架构模式说明:

  1. 首先建立生产者与 MQ 之间的连接;

  2. 然后生产者将消息发送到 MQ 的交换机中;

  3. 交换机将消息分别存储到与之绑定的队列中;

  4. 建立消费者与 MQ 之间的连接;

  5. 消费者指定消费哪个队列的消息;

  6. 最后队列将消息推送给对应的消费者。

4.2、????RabbitMQ 中几个核心概念

  1. Message(消息):消息是不透明的,是由一系列的可选属性组成,如:路由键(RoutingKey)、相对其他消息的优先权(Priority)、指出该消息是否需要永久存储(DeliveryMode)等;

  2. Producer(生产者):生产者是向交换机发布消息的客户端应用程序;

  3. Exchange(交换机):用来接受消息并将消息路由(存储)给服务器中的队列。交换机有四种类型,即决定消息发布到那个队列,具体有以下的类型:

    • Fanout:发布订阅(广播模式),每个发送到 Fanout 类型的交换器消息,交换器会将消息发送到它绑定的所有队列中,它转发消息是最快的,也是目前使用最多的类型。

    • Direct:路由模式,路由模式下,在发布消息时指定不同的 RouteKey,交换机会根据不同的 RouteKey 分发消息到不同的队列中,简单点说其实就是在 Fanout 基础上多增加了一个 RoutingKey 条件

    • Topic:通配符模式(主题),通配符模式和路由模式其实差不多,不同之处在于通配符模式中的路由可以声明为模糊查询。符号#匹配一个或多个词,符号*匹配一个词。RabbitMQ 中通配符的通配符是用.来分割字符串的,比如a.*只能匹配到 a.b、a.c,而a.#可以匹配到 a.a.c、a.a.b。

    • Headers根据消息内容中的 Headers 属性匹配(性能差,不实用,使用少),该模式基本不使用

  4. Queue(队列):消息的存放容器,一个消息可以放在一个或者多个队列中;

  5. Binding(绑定):如果想要将消息存放到具体的队列中,就需要先将队列和交换机进行绑定,交换机跟队列的绑定可以是多对多的关系;

  6. Connection(连接):如一个 Tcp 连接;

  7. Channel(通道):多路复用连接中的一条独立的双向数据流通道,通道是建立在真实的 TCP 连接内的虚拟通道,AMQP 命令都是通过通道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过通道完成的,因为对于操作系统来说创建和销毁一个 TCP 连接都是很昂贵的开销,所以使用通道以实现复用一条 TCP 连接;

  8. Consumer(消费者):接收和消费消息的客户端应用程序;

  9. Virtual Host(虚拟主机):即小型的 RabbitMQ 服务器,它表示一批交换器,消息队列和相关对象,连接时必须指定,默认是:/(以路径区分);

  10. Broker:消息队列服务器实体。

4.3、????RabbitMQ 几种模式

4.3.1、简单队列模式

在该模式下,不用显示声明交换机,只需声明一个队列即可。

生产者指定队列名称发送消息给 MQ,然后会有一个默认的交换机将消息转发给这个队列。

消费者只需要监听这个队列,一有消息就会得到通知做出响应。

如下图所示:

4.3.2、工作队列模式(Work Queues)

和简单队列模式基本一样,不过有一点不同,该模式有多个消费者在监听队列。

RabbitMQ 会以轮询的方式将消息发给多个消费者确保一条消息只会被一个消费者消费,即:在该模式下一条消息只会被其中一个消费者消费

4.3.3、Exchange - 发布订阅模式(Fanout)

和上面 2 种模式默认提供交换机不同的是,该模式需要显示声明交换机,然后可以创建多个队列和这个交换机进行绑定。

生产者发消息给 MQ 时需要指定交换机,然后交换机将消息转发给与自己绑定的所有队列.

消费者监听指定的队列获得消息,每个队列可以有多个消费者监听,同样也是以轮询的机制发给消费者。

如下图所示:

该模式是目前使用最多的模式。

4.3.4、Exchange - 路由模式(Direct)

和发布订阅模式不同的是,队列绑定交换机时需要指定一个 RoutingKey。

那么生产者发送消息时不仅需要指定交换机还需要指定 RoutingKey。

这样的话交换机就会把消息转发给跟自己绑定并且 RoutingKey 相匹配的队列。

如下图所示:

PS:当生产者发送了一个消息且发送的 RoutingKey 为 success 时,交换机会根据该 RoutingKey 匹配并转发消息到 Queue1 和 Queue2,两个队列都满足了路由规则;当 RoutingKey 为 error 时,仅 Queue2 满足,则将消息转发给 Queue2。

4.3.5、Exchange - 通配符模式(Topic)

和路由模式唯一的不同就是可以设置带有通配符进行模糊匹配的 RoutingKey。

设定的 RoutingKey(不论是 BindingKey 还是 RoutingKey)都需要为带.的字符串。比如 a.b、c.d.e、fff.gggg.hhhh 等,最多为 255 个字节.

在交换机和队列绑定时,给定的 RoutingKey 可以依照如下来设置:

  • :匹配 0~N 个单词;

  • *:匹配 1 个单词。

比如两个 RoutingKey 分别为 index.和#.crt,当生产者发送消息时给定的 RoutingKey 为 index.a、index.b 或 index.c 等都满足 index.的规则,a.crt、aa.crt 或是 b.crt 等都满足#.crt 的规则。

如下图所示:


5、????RabbitMQ 使用

针对 RabbitMQ 的使用,这里我们主要介绍在.Net Core(.Net6)中的简单使用,其他平台或语言类似,仅作参考。

5.1、???? 安装 RabbitMQ.Client 包

我们将使用RabbitMQ.Client这个包来实现,当然也可以使用其他包,如:EasyNetQ

使用如下命令安装 RabbitMQ.Client 包即可:

Install-Package RabbitMQ.Client -Version 6.4.0

5.2、???? 生产者实现

首先我们定义 RabbitMQHelper.cs 帮助类,该类需要实现泛型 T 的定义(T 为发送和接收的消息实体)。

然后在该类中定义消息通道消息连接交换机名称队列名称集合路由规则变量。

定义构造函数 RabbitMQHelper,并实现连接工厂的定义、消息连接的初始化、消息通道的初始化以及交换机类型(此处我们以发布订阅模式 Fanout 的实现为例)的定义。

注意:以下实现代码仅仅为了展示使用说明,没有进一步进行封装。

具体帮助类和调用示例代码如下所示:

using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;

namespace Quick.RabbitMQPlus
{
    public class RabbitMQHelper where T : class
    {
        /// 
        /// 消息通道
        /// 
        readonly IModel _channel;

        /// 
        /// 消息连接
        /// 
        readonly IConnection _connection;

        /// 
        /// 交换机名称
        /// 
        readonly string _exchangeName = "TestExchangeName";

        /// 
        /// 队列名称集合
        /// 
        readonly List _queueNames = new() { "Queue1", "Queue2" };

        /// 
        /// 路由规则
        /// 
        string _routeKey = "TestRouteName";

        /// 
        /// 构造函数
        /// 
        public RabbitMQHelper()
        {
            //创建连接工厂
            var connectionFactory = new ConnectionFactory
            {
                HostName = "192.168.1.1",
                UserName = "admin",
                Password = "123456",
                Port = 5672
            };

            //创建连接
            _connection = connectionFactory.CreateConnection();

            //创建通道
            _channel = _connection.CreateModel();

            /*
             * 定义一个Fanout类型交换机:
             * 参数1:交换机名称
             * 参数2:交换机类型
             * 参数3:是否开启消息持久化
             * 参数4:是否设置如果这个队列没有其他消费者消费,队列自动删除
             * 参数5:指定队列携带的信息
             */
            _channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout, false, false, null);
        }

        /// 
        /// 发送消息
        /// 
        /// 
        /// 
        public async Task<(bool, string)> Send(T data)
        {
            return await Task.Run(() =>
            {

                if (_channel == null)
                {
                    return (false, "RabbitMQ初始化错误,请检查连接配置!");
                }

                var ret = true;
                var errMsg = string.Empty;

                try
                {
                    if (string.IsNullOrWhiteSpace(_routeKey))
                    {
                        _routeKey = _exchangeName;
                    }

                    //将多个队列绑定到交换机上
                    foreach (var item in _queueNames)
                    {
                        /*
                         * 定义队列:
                         * 参数1:队列名称
                         * 参数2:是否持久化,true为持久化。队列会存储到磁盘,服务器重启时可以保证不丢失信息
                         * 参数3:是否排他,true为排他。如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
                         * 参数4:是否自动删除,true为自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
                         * 参数5:指定队列携带的信息
                         */
                        _channel.QueueDeclare(item, true, false, false, null);

                        //将队列绑定到交换机
                        _channel.QueueBind(item, _exchangeName, _routeKey, null);
                    }

                    //将实体序列化为字符串,该方法(ObjectToJson)需自己实现
                    var msg = ObjectToJson(data);

                    //将字符串转换为byte[]
                    var msgBody = Encoding.UTF8.GetBytes(msg);

                    /*
                     * 发布消息:
                     * 参数1:交换机名称,如果传"",将使用RabbitMQ默认的交换机名称
                     * 参数2:指定路由的规则,使用具体的队列名称,交换机为""时,消息直接发送到队列中
                     * 参数3:指定传递的消息携带的properties
                     * 参数4:指定传递的消息,byte[]类型
                     */
                    _channel.BasicPublish(_exchangeName, _routeKey, null, msgBody);
                }
                catch (Exception e)
                {
                    ret = false;
                    errMsg = e.Message;
                }

                return (ret, errMsg);
            });
        }

        /// 
        /// 关闭通道和连接
        /// 
        public void Close()
        {
            _channel.Close();
            _connection.Close();
        }
    }
}
//消息实体
var msgModel = new TestRabbitMQModel
{
    UserId = rand.Next(1, 9999),
    UserName = "Quber",
    UserAge = rand.Next(20, 80),
    CreateTime = DateTime.Now
};

//定义发送对象
var sendInstance = new RabbitMQHelper();

//发送消息
var sendRet = await sendInstance.Send(msgModel);

if (sendRet.Item1)
{
    //发送成功
    //……

    //消息发送完成后,关闭通道(根据实际情况自行决定要不要调用关闭方法)
    sendInstance.Close();
}
else
{
    //发送失败
    var errMsg = $"失败原因:{sendRet.Item2}";
}

5.3、???? 消费者实现

消费者的实现和生产者的实现基本一样,直接上代码,如下所示:

using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;

namespace Quick.RabbitMQPlus
{
    public class RabbitMQHelper where T : class
    {
        /// 
        /// 消息通道
        /// 
        readonly IModel _channel;

        /// 
        /// 消息连接
        /// 
        readonly IConnection _connection;

        /// 
        /// 交换机名称
        /// 
        readonly string _exchangeName = "TestExchangeName";

        /// 
        /// 队列名称集合
        /// 
        readonly List _queueNames = new() { "Queue1", "Queue2" };

        /// 
        /// 路由规则
        /// 
        string _routeKey = "TestRouteName";

        /// 
        /// 构造函数
        /// 
        public RabbitMQHelper()
        {
            //创建连接工厂
            var connectionFactory = new ConnectionFactory
            {
                HostName = "192.168.1.1",
                UserName = "admin",
                Password = "123456",
                Port = 5672
            };

            //创建连接
            _connection = connectionFactory.CreateConnection();

            //创建通道
            _channel = _connection.CreateModel();

            /*
             * 定义一个Fanout类型交换机:
             * 参数1:交换机名称
             * 参数2:交换机类型
             * 参数3:是否开启消息持久化
             * 参数4:是否设置如果这个队列没有其他消费者消费,队列自动删除
             * 参数5:指定队列携带的信息
             */
            _channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout, false, false, null);
        }

        /// 
        /// 接收消息
        /// 
        /// 队列名称
        /// 回调方法
        /// 设置RabbitMQ一次最多推送多少条消息给消费者,默认为10
        public async Task<(bool, string)> Receive(string queueName, Func> received, ushort prefetchCount = 10)
        {
            return await Task.Run(() =>
            {
                if (_channel == null)
                {
                    return (false, "RabbitMQ初始化错误,请检查连接配置!");
                }

                var ret = true;
                var errMsg = string.Empty;

                try
                {
                    /*
                     * 设置限流机制
                     * 参数1:消息本身的大小,如果设置为0,那么表示对消息本身的大小不限制
                     * 参数2:设置RabbitMQ一次最多推送多少条消息给消费者
                     * 参数3:是否将上面的设置应用于整个通道,false表示只应用于当前消费者
                     */
                    _channel.BasicQos(0, prefetchCount, false);

                    //创建消费者对象
                    var consumer = new EventingBasicConsumer(_channel);

                    //接收到消息事件
                    consumer.Received += async (_, ea) =>
                    {
                        //获取消息以及反序列化为实体(JsonToObject方法需自己实现)
                        var msg = Encoding.UTF8.GetString(ea.Body.ToArray());
                        var data = JsonToObject(msg);

                        var retRec = true;

                        try
                        {
                            //接收消费事件,如果返回true则代表处理成功,以便告知RabbitMQ该消息已消费并处理成功
                            retRec = await received(data, msg);
                        }
                        catch (Exception e)
                        {
                            retRec = false;
                        }

                        //业务处理成功的时候
                        if (retRec)
                        {
                            //告知RabbitMQ该消息成功处理,可以从队列中删除该消息了
                            _channel.BasicAck(ea.DeliveryTag, false);
                        }
                        else
                        {
                            /*
                             * 告知RabbitMQ该消息处理失败,重新加入队列,以便后续可再次消费该消息
                             * 参数1:
                             * 参数2:是否将该消息重新加入队列,true为重新加入队列
                             *
                             * 需要注意的是:
                             *     假设await received(data, msg);一直对某些消息都处理失败(即retRec=false),
                             *     那么这些数据(这一批次的所有数据)会重新进入队列,并在下次重新消费,
                             *     如果业务方法received不做处理的话,有可能会造成一直循环消费该批次的消息
                             */
                            _channel.BasicReject(ea.DeliveryTag, true);
                        }
                    };

                    //启动消费者,设置为手动应答消息
                    /*
                     * 启动消费者:
                     * 参数1:指定要消费哪个队列的名称
                     * 参数2:指定是否自动告诉RabbitMQ该消息已收到
                     * 参数3:指定消息的回调
                     */
                    _channel.BasicConsume(queueName, false, consumer);
                }
                catch (Exception e)
                {
                    ret = false;
                    errMsg = e.Message;
                }

                return (ret, errMsg);
            });
        }

        /// 
        /// 关闭通道和连接
        /// 
        public void Close()
        {
            _channel.Close();
            _connection.Close();
        }
    }
}
//定义接收对象
var recInstance = new RabbitMQHelper();

//接收队列Queue1的消息
var retRec = await recInstance.Receive("Queue1", async (data, msg) =>
{
    await Task.Delay(5000);

    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"\r\n队列1消息:{msg}");

    //返回true代表业务逻辑处理成功,会告知MQ这条消息已经接收成功,会从MQ队列中删除
    //返回false代表业务逻辑处理失败,会告知MQ这条消息没有处理成功,则MQ会继续推送这条消息
    return true;
}, 1);

//接收消息失败
if (!retRec.Item1)
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine($"\r\n队列1接收失败:{retRec1.Item2}");
}

//关闭通道(根据实际情况自行决定要不要调用关闭方法)
recInstance.Close();

6、????Quick.RabbitMQPlus.Furion 的使用

为了更好更简单的在.Net Core 中使用 RabbitMQ,特此基于RabbitMQ.Client封装了Quick.RabbitMQPlus.FurionQuick.RabbitMQPlus组件。

  • Quick.RabbitMQPlus.Furion:依赖于.Net6+、Furion

  • Quick.RabbitMQPlus:依赖于.Net6+

Quick.RabbitMQPlus.Furion 包地址为:https://www.nuget.org/packages/Quick.RabbitMQPlus.Furion

关于 Quick.RabbitMQPlus.Furion 的详细使用说明,如下所示:

6.1、???? 更新日志

  • 1.0.9

    • 在路由模式消费的时候,去掉了调用实例化方法Instance()必须传入参数true的参数;

    • 提供了两种使用方式,第一种就是v1.0.8之前的通过实例化的方式进行初始化使用,第二种就是v1.0.9之后增加了可以通过依赖注入的方式进行初始化使用(推荐使用依赖注入的方式);

    • 新增加了AddRabbitMQPlusGetInstance方法,这两个方法分别有 2 个重载,具体说明参见Quick.RabbitMQPlus方法

    • 增加了 IQuickRabbitMQPlus 接口,如果使用依赖注入的方式,就可以实现该接口并使用其中的各个方法。

  • 1.0.8

    • 去掉了通过实体特性去配置各个属性,目的在于简化各个配置,都通过配置文件进行设置;

    • 在接收数据的时候,可以通过实体特性QuickRabbitMQPlusReceive去设置接收的消息队列(如[QuickRabbitMQPlusReceive("TestRabbitMQName1")]),如果不要实体特性,那么就会通过配置中的QueueNames属性去控制;

    • 对接收数据的方法进行了重载;

    • 配置中增加了Default属性,用于设置默认连接配置(如果配置中有多个连接配置,并且没有一个配置中设置 Default 为 true,那么默认会使用第一个配置);

    • 配置中增加了PrefetchCount属性,用于全局设置 RabbitMQ 一次最多推送多少条消息给消费者,默认为 10;

    • 配置中去掉了RouteKey配置,如果使用的是路由模式,请使用RouteKeys属性进行配置,前提是RouteKeysQueueNames集合的数量需要保持一致;

    • 实例化组件对象的时候,去掉了泛型的定义,如var sendInstance = QuickRabbitMQPlusInstance.Instance();

    • 发送和接收的方法,增加了泛型的定义,这样做的目的是不受限于在组件实例化的时候指定只能是某个泛型,将泛型的定义设置到方法上更灵活,如同一个组件实例化对象可发送不同的泛型实体数据。

  • 1.0.7

    • 去掉了必须要设置实体特性的控制(如果没有实体特性,那么就需要在配置文件中将相关属性配置齐全),默认使用的是配置中的第一个配置;

    • 在路由模式下,可指定将消息发送到对应的队列中,需要配置QueueNamesRouteKeys的集合数量保持一一对应的关系;

    • Send 方法增加了第二个参数,路由 Key 名称;

    • 实例化对象的方法 Instance 增加了参数Instance(bool isReceive = false),当前实例化对象是否为接收消息。

  • 1.0.6

    • 新增加了可动态切换连接的方法ChangeConn

    • 去掉了Furion的依赖;

    • 去掉了Newtonsoft.Json的依赖;

    • 同时将原来的Quick.RabbitMQPlus分为了Quick.RabbitMQPlusQuick.RabbitMQPlus.Furion这两个版本。

6.2、???? Quick.RabbitMQPlus.Furion 使用说明

该组件是基于RabbitMQ.ClientFurion组件进行封装使用的,目的在于结合.Net Core 更快、更简单和更灵活的使用 RabbitMQ!!!

功能说明:

  • 支持发布订阅模式路由模式,通配符模式Headers属性模式

  • 可根据配置文件读取 RabbitMQ 连接的各个配置(如:RabbitMQ 服务地址、账号、密码和交换机名称等);

  • 支持配置多个 RabbitMQ 的连接配置;

  • 支持动态切换 RabbitMQ 的连接配置;

  • 可根据实体定义的特性发布和订阅消息(已废弃),目前只针对接收消息定义了实体特性,并且只能指定接收消息的队列(v1.0.8 调整)

  • 支持配置将多个队列绑定到交换机;

  • 一个消费端支持可以同时消费多个多列的消息等;

  • 支持使用同一个实体,将不同的消息发送到不同的队列中(使用路由模式,同时在发送的时候将路由 Key 传入);

  • 支持全局设置接收消息一次性接收多少条的配置(v1.0.8 新增)

  • 支持两种使用方式,第一种就是v1.0.8之前的通过实例化的方式进行初始化使用,第二种就是v1.0.9之后增加了可以通过依赖注入的方式进行初始化使用(推荐使用依赖注入的方式)

6.3、???? 安装

安装命令如下所示:

Install-Package Quick.RabbitMQPlus.Furion

该组件的命名空间为:Quick.RabbitMQPlus,包括Quick.RabbitMQPlus原生组件。

6.4、???? 生产端

6.4.1、配置appsettings.json

appsettings.json配置文件中创建节点QuickRabbitMQPlus>PrefetchCountQuickRabbitMQPlusConfigs,PrefetchCount 为设置 RabbitMQ 一次最多推送多少条消息给消费者(默认为 10),QuickRabbitMQPlusConfigs 为数组类型(即可配置多个 RabbitMQ 服务地址),具体配置如下所示:

{
    "QuickRabbitMQPlus": {
        "PrefetchCount": 1,
        "QuickRabbitMQPlusConfigs": [
            {
                "Default": false,
                "ConnId": 1,
                "UserName": "quber",
                "Password": "0807_quberONE",
                "HostName": "127.0.0.1",
                "Port": 5672,
                "ExchangeType": "direct",
                "ExchangeName": "TestExchangeName",
                "QueueNames": ["TestRabbitMQName1", "TestRabbitMQName2"],
                "RouteKeys": ["TestRouteKey1", "TestRouteKey2"] //ExchangeType=direct才起作用,并且和QueueNames一一对应
                //"ExchangeDurable": true,
                //"QueueDurable": true,
                //"MessageDurable": true
            },
            //fanout模式
            {
                "Default": true,
                "ConnId": 2,
                "UserName": "quber",
                "Password": "0807_quberONE",
                "HostName": "127.0.0.1",
                "Port": 5672,
                "ExchangeType": "fanout",
                "ExchangeName": "TestExchangeName",
                "QueueNames": ["TestRabbitMQName1", "TestRabbitMQName2"],
                "RouteKeys": ["TestRouteKey1", "TestRouteKey2"] //ExchangeType=direct才起作用,并且和QueueNames一一对应
                //"ExchangeDurable": true,
                //"QueueDurable": true,
                //"MessageDurable": true
            }
        ]
    }
}

配置说明(消费端通用):

属性名称 属性说明 是否必填 备注
PrefetchCount 全局设置 RabbitMQ 一次最多推送多少条消息给消费者,默认为 10 消费端才使用的属性
Default 是否为默认连接 默认为 false
ConnId 连接 Id(请确保该 Id 的唯一性) 如果要动态切换连接配置,请确保该 Id 有值并且唯一
UserName RabbitMQ 连接账户
Password RabbitMQ 连接密码
HostName RabbitMQ 连接 IP
Port RabbitMQ 连接端口 不填就是默认端口5672
ExchangeType 交换机类型(fanout:发布订阅模式、direct:路由模式、topic:通配符模式、headers:属性匹配模式)
ExchangeName 交换机名称
QueueNames 队列名称集合(与交换机 ExchangeName 进行绑定) 此处为集合,目的是在发布消息时将消息存储到该队列集合中去
RouteKeys 路由名称集合(或通配符名称集合) ExchangeType=direct才起作用,并且和 QueueNames 是一一对应的关系,这样配置目的是可以实现将消息 1 发送到队列 1,将消息 2 发送到队列 2
ExchangeDurable 交换机是否持久化,默认为 true 如果采用默认的设置,配置文件可以不要该属性
QueueDurable 队列是否持久化,默认为 true 如果采用默认的设置,配置文件可以不要该属性
MessageDurable 消息是否持久化,默认为 true 如果采用默认的设置,配置文件可以不要该属性

6.4.2、配置 Program.cs

由于我们使用的是Furion,因此,我们可在程序启动文件中配置如下代码(具体可参考Furion 入门指南),目的是注册RabbitMQ 服务配置选项 QuickRabbitMQPlusOptions

  1. 依赖注入方式-WinForm 中使用:

    [STAThread]
    static void Main()
    {
    	ApplicationConfiguration.Initialize();
    
    	//初始化Furion
    	Serve.Run(GenericRunOptions.DefaultSilence);
    }
    public void ConfigureServices(IServiceCollection services)
    {
    	//注册FrmMain窗体类
    	services.AddScoped();
    
    	//注入IQuickRabbitMQPlus的方式
    	//通过AddRabbitMQPlus添加依赖注入
    	services.AddRabbitMQPlus();
    
    	////使用构造函数获取实例的方式:
    	////通过AddRabbitMQPlus添加依赖注入,并注册TestConsumerClassForDI类
    	//services.AddRabbitMQPlus()
    
    	//DI容器生成serviceProvider
    	var serviceProvider = services.BuildServiceProvider();
    
    	//通过serviceProvider获取MainForm的注册实例
    	var frmMain = serviceProvider.GetRequiredService();
    	//var frmMain = (FrmMain)serviceProvider.GetService(typeof(FrmMain));
    
    	Application.Run(frmMain);
    }

    说明:上述的关键点就在于调用.AddRabbitMQPlus()或者.AddRabbitMQPlus()方法对服务进行注册。

  2. 实例化方式-WinForm 中使用:

    [STAThread]
    static void Main()
    {
    	//初始化Furion
    	Serve.Run(GenericRunOptions.DefaultSilence);
    
    	//或者
    	//Serve.Run(RunOptions.DefaultSilence.ConfigureBuilder(builder =>
    	//{
    		//注册RabbitMQ连接配置对象
    		//builder.Services.AddConfigurableOptions();
    	//}).Configure(app =>
    	//{
    	//}));
    
    	ApplicationConfiguration.Initialize();
    	Application.Run(new FrmMain());
    }
    public void ConfigureServices(IServiceCollection services)
    {
    	//注册RabbitMQ连接配置对象
    	services.AddConfigurableOptions();
    }
  3. Quick.RabbitMQPlus 组件,依赖注入方式-WinForm 中使用:

    Program.cs 的 Main 方法:

    ApplicationConfiguration.Initialize();
    
    using IHost host = Host.CreateDefaultBuilder()
    	.ConfigureServices((_, services) =>
    		{
    			//注册FrmMain窗体类
    			services.AddScoped();
    
    			//注入IQuickRabbitMQPlus的方式
    			//通过AddRabbitMQPlus添加依赖注入
    			services.AddRabbitMQPlus();
    
    			////使用构造函数获取实例的方式:
    			////通过AddRabbitMQPlus添加依赖注入,并注册TestConsumerClassForDI类
    			//services.AddRabbitMQPlus()
    
    			//DI容器生成serviceProvider
    			var serviceProvider = services.BuildServiceProvider();
    
    			//通过serviceProvider获取MainForm的注册实例
    			var frmMain = serviceProvider.GetRequiredService();
    			//var frmMain = (FrmMain)serviceProvider.GetService(typeof(FrmMain));
    
    			Application.Run(frmMain);
    		}
    	)
    	.Build();
    
    host.RunAsync();
  4. Quick.RabbitMQPlus 组件,实例化方式-WinForm 中使用:

    Program.cs 的 Main 方法:

    [STAThread]
    static void Main()
    {
    	ApplicationConfiguration.Initialize();
    	Application.Run(new FrmMain());
    }
    

其他库的使用方式也基本类似,就不一一介绍了。

6.4.3、定义发送消息实体

如下所示我们可以定义一个消息实体:

namespace Quick.RabbitMQPlus.Publisher
{
    public class TestRabbitMQModel
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}

6.4.4、发送消息 Demo

定义发送对象:

public partial class FrmMain : Form
{
	private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;

	public FrmMain(IQuickRabbitMQPlus quickRabbitMqPlus)
	{
		InitializeComponent();

		//定义发送对象
		_quickRabbitMqPlus = quickRabbitMqPlus;
	}
}
//定义发送对象
var sendInstance = QuickRabbitMQPlusInstance.Instance();

发送单条消息:

//发送10条数据
for (int i = 0; i < 10; i++)
{
	var msgModel = new TestRabbitMQModel
	{
		UserId = rand.Next(1, 9999),
		UserName = "Quick" + (i + 1),
		UserAge = rand.Next(20, 80),
		CreateTime = DateTime.Now
	};

	var sendRet = await _quickRabbitMqPlus.Send(msgModel);

	if (sendRet.Item1)
	{
		//发送成功
	}
	else
	{
		//发送失败
		var errMsg = $"失败原因:{sendRet.Item2}";
	}

	//间隔2秒发送一次
	await Task.Delay(2000);
}

//消息发送完成后,关闭通道
_quickRabbitMqPlus.Close();
//当i % 2为0时,发送给路由TestRouteKey1对应的队列TestRabbitMQName1,否则发送给路由TestRouteKey2对应的队列TestRabbitMQName2
//此处就实现了在路由模式下,将不同的消息发送给不同的队列
//需要注意的时候,此方式需要将交换机类型配置为direct路由模式,同时需要设置配置的QueueNames和RouteKeys属性(这两属性的集合数量需要保持一致,一一对应的关系)
var sendRet = await _quickRabbitMqPlus.Send(msgModel, i % 2 == 0 ? "TestRouteKey1" : "TestRouteKey2");

发送多条消息:

var sendList = new List{
	new TestRabbitMQModel(),
	new TestRabbitMQModel()
};

var sendRet = await _quickRabbitMqPlus.Send(sendList);

切换连接:

//切换到connId=2的配置
_quickRabbitMqPlus.ChangeConn(2);

var sendRetConn2 = await _quickRabbitMqPlus.Send(msgModel);

//切换到connId=3的配置
_quickRabbitMqPlus.ChangeConn(3);

var sendRetConn3 = await _quickRabbitMqPlus.Send(msgModel);

6.5、???? 消费端

6.5.1、配置appsettings.json与实体特性QuickRabbitMQPlusReceive

  • 配置说明:

    具体配置请参见生产端(和生产端完全一致)。

    需要注意的是,消费端中,增加了PrefetchCount配置,目的用于全局设置 RabbitMQ 一次最多推送多少条消息给消费者,默认为 10。

    需要注意的是,如果消费端中的 QueueNames 属性设置了多个队列,就代表该消费端同时接收多个队列的消息

  • 实体特性配置说明(消费端使用):

    属性名称 属性说明 是否必填 备注
    queueName 队列名称(多个队列名称请使用英文逗号,分隔) 如果同时设置了实体特性的队列名称和配置中的QueueNames属性,那么会优先采用实体的队列名称

    如下所示:

    namespace Quick.RabbitMQPlus.Publisher
    {
        [QuickRabbitMQPlusReceive("TestRabbitMQName1")]
        //[QuickRabbitMQPlusReceive("TestRabbitMQName1,TestRabbitMQName2")]
        public class TestRabbitMQModel
        {
            public int UserId { get; set; }
    
            public string UserName { get; set; }
    
            public int UserAge { get; set; }
    
            public DateTime CreateTime { get; set; }
        }
    }

6.5.2、配置 Program.cs

由于我们使用的是Furion,因此,我们可在程序启动文件中配置如下代码(具体可参考Furion 入门指南),目的是注册RabbitMQ 服务配置选项 QuickRabbitMQPlusOptions

  1. 依赖注入方式-Worker Service 中使用:

    //初始化Furion
    Serve.Run(GenericRunOptions.Default);
    
    public void ConfigureServices(IServiceCollection services)
    {
    	//通过AddRabbitMQPlus添加依赖注入
    	services.AddRabbitMQPlus();
    }

    说明:上述的关键点就在于调用.AddRabbitMQPlus()或者.AddRabbitMQPlus()方法对服务进行注册。

  2. 依赖注入方式-控制台中使用:

    //初始化Furion
    Serve.Run(GenericRunOptions.DefaultSilence);
    
    public void ConfigureServices(IServiceCollection services)
    {
    	////通过AddRabbitMQPlus添加依赖注入
    	//services.AddRabbitMQPlus();
    
    	//使用构造函数获取实例的方式:
    	//通过AddRabbitMQPlus添加依赖注入,并注册TestConsumerClassForDI类
    	services.AddRabbitMQPlus();
    }
  3. Quick.RabbitMQPlus 组件,依赖注入方式-Worker Service 中使用:

    Program.cs 的 Main 方法:

    IHost host = Host.CreateDefaultBuilder(args)
    	.ConfigureServices(services =>
    	{
    		services.AddHostedService();
    
    		//通过AddRabbitMQPlus添加依赖注入
    		services.AddRabbitMQPlus();
    	})
    	.Build();
    
    await host.RunAsync();
  4. Quick.RabbitMQPlus 组件,依赖注入方式-控制台中使用:

    Program.cs 的 Main 方法:

    using IHost host = Host.CreateDefaultBuilder(args)
    	.ConfigureServices((_, services) =>
    	//注入IQuickRabbitMQPlus的方式
    	//通过AddRabbitMQPlus添加依赖注入
    	services.AddRabbitMQPlus()
    
    	////使用构造函数获取实例的方式:
    	////通过AddRabbitMQPlus添加依赖注入,并注册TestConsumerClassForDI类
    	//services.AddRabbitMQPlus()
    )
    .Build();

6.5.3、定义接收消息实体

如下所示我们可以定义 3 个消息实体(第一个用于接收队列TestRabbitMQName1的消息,第二个用于接收队列TestRabbitMQName2的消息,第三个用于接收队列TestRabbitMQName1TestRabbitMQName2):

namespace Quick.RabbitMQPlus.Consumer
{
    [QuickRabbitMQPlusReceive("TestRabbitMQName1")]
    public class TestRabbitMQModel1
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}
namespace Quick.RabbitMQPlus.Consumer
{
    [QuickRabbitMQPlusReceive("TestRabbitMQName2")]
    public class TestRabbitMQModel2
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}
namespace Quick.RabbitMQPlus.Consumer
{
    [QuickRabbitMQPlusReceive("TestRabbitMQName1,TestRabbitMQName2")]
    public class TestRabbitMQModel3
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}

6.5.4、接收消息 Demo

定义接收对象(依赖注入方式):

public class Worker : BackgroundService
{
	private readonly ILogger _logger;
	private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;

	public Worker(ILogger logger, IQuickRabbitMQPlus quickRabbitMqPlus)
	{
		_logger = logger;
		_quickRabbitMqPlus = quickRabbitMqPlus;
	}
}
//获取IQuickRabbitMQPlus的实例(App是Furion中的静态类)
var _quickRabbitMqPlus = App.GetService();
//获取IQuickRabbitMQPlus的实例(其中的host为IHost对象,GetInstance方法为封装的扩展方法)
//var _quickRabbitMqPlus = host.Services.GetInstance();

//获取IQuickRabbitMQPlus的实例(其中的host为IHost对象,GetInstance方法为封装的扩展方法)
var _quickRabbitMqPlus = host.GetInstance();

定义接收对象(实例化方式):

//定义接收对象
var recInstance = QuickRabbitMQPlusInstance.Instance();

定义两个消费端,一个消费端消费一个队列,具体的接收消息代码如下所示(接收单条消息):


            本文题目:消息队列之RabbitMQ介绍与运用            
网页网址:http://cqcxhl.com/article/dsoidds.html
在线咨询
服务热线
服务热线:028-86922220
TOP