代码编织梦想

前言

当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。

9c173a811258da50f47de3fb8b151145.png

循环场景

生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为false时,设置了死信队列则进入死信队列,否则移除消息)。

consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine("拒收");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

当第50条消息拒收,则仍在队列中且处在队列头部,重新推送给消费者,再次拒收,再次推送,反反复复。
3da17d8ad67117656c9bbe7f905f9560.png

最终其他消息全部消费完毕,仅剩第50条消息往复间不断消费,拒收,消费,这将可能导致RabbitMQ出现内存泄漏问题。

c48b36723b19af3e4b3e091aed6c30bb.png

解决方案

RabbitMQ及AMQP协议本身没有提供这类重试功能,但可以利用一些已有的功能来间接实现重试限定(以下只考虑基于手动确认模式情况)。此处只想到或是只查到了如下几种方案解决消息循环消费问题。

  • 一次消费

    • 无论成功与否,消费者都对外返回ack,将拒收原因或是异常信息catch存入本地或是新队列中另作重试。

    • 消费者拒绝消息或是出现异常,返回Nack或Reject,消息进入死信队列或丢弃(requeue设定为false)。

  • 限定重试次数

    • 在消息的头中添加重试次数,并将消息重新发送出去,再每次重新消费时从头中判断重试次数,递增或递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。

    • 可以在Redis、Memcache或其他存储中存储消息唯一键(例如Guid、雪花Id等,但必须在发布消息时手动设置它),甚至在mysql中连同重试次数一起存储,然后在每次重新消费时递增/递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。

    • 队列使用Quorum类型,限制投递次数,超过次数消息被删除。

  • 队列消息过期

    • 设置过期时间,给队列或是消息设置TTL,重试一定次数消息达到过期时间后进入死信队列或丢弃(requeue设定为true)。

  • 也许还有更多好的方案...

一次消费

对外总是Ack

消息到达了消费端,可因某些原因消费失败了,对外可以发送Ack,而在内部走额外的方式去执行补偿操作,比如将消息转发到内部的RabbitMQ或是其他处理方式,终归是只消费一次。

var queueName = "alwaysack_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    try
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            throw new Exception("模拟异常");
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
    finally
    {
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    }
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

当消费端收到消息,处理时出现异常,可以另想办法去处理,而对外保持着ack的返回,以避免消息的循环消费。cd0ea57e2cff780eef889f05f146915c.png

消息不重入队列

在消费者端,因异常或是拒收消息时,对requeue设置为false时,如果设置了死信队列,则符合“消息被拒绝且不重入队列”这一进入死信队列的情况,从而避免消息反复重试。如未设置死信队列,则消息被丢失。

ceb1b8adf84fe90fa4d1265fb9c444f4.png

此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false。

var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

var queueName = "nackorreject_queue";
var arguments = new Dictionary<string, object>
{
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine("拒收");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//关键在于requeue=false
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

如此一来,拒收消息不会重入队列,并且现有队列绑定了死信交换机,因此,消息进入到死信队列中,如不绑定,则消息丢失。
e18412c831be19c6e1379ca5a0a05a6a.png

限定重试次数

设置重试次数,限定循环消费的次数,允许短暂的循环,但最终打破循环。

消息头设定次数

在消息头中设置次数记录作为标记,但是,消费端无法对接收到的消息修改消息头然后将原消息送回MQ,因此,需要将原消息内容重新发送消息到MQ,具体步骤如下

  1. 原消息设置不重入队列。

  2. 再发送新的消息其内容与原消息一致,可设置新消息的消息头来携带重试次数。

  3. 消费端再次消费时,便可从消息头中查看消息被消费的次数。
    a333716303b1339b4391176834253bef.png

此处假定接收10条消息,在接收到第5条消息时设置拒收, 当消息头中重试次数未超过设定的3次时,消息可以重入队列,再次被消费。

var queueName = "messageheaderretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))
    {
        var maxRetryCount = 3;

        Console.WriteLine($"拒收 {DateTime.Now}");

        //初次消费
        if (ea.BasicProperties.Headers == null)
        {
            //原消息设置为不重入队列
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);

            //发送新消息到队列中
            RetryPublishMessage(channel, queueName, message.ToArray(), 1);
            return;
        }

        //获取重试次数
        var retryCount = ParseRetryCount(ea);
        if (retryCount < maxRetryCount)
        {
            //原消息设置为不重入队列
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);

            //发送新消息到队列中
            RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);
            return;
        }

        //到达最大次数,不再重试消息
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount)
{
    var basicProperties = channel.CreateBasicProperties();
    basicProperties.Headers = new Dictionary<string, object>();
    basicProperties.Headers.Add("retryCount", retryCount);
    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
}

static int ParseRetryCount(BasicDeliverEventArgs ea)
{
    var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);
    if (!existRetryRecord)
    {
        throw new Exception("没有设置重试次数");
    }

    return (int)retryCount;
}

消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中,如此来控制消费次数,但是这种场景下,新消息排在了队列的尾部,而不是原消息排在队列头部。
489786f6182ca7f5793437ffd7c49583.png

存储重试次数

在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在。

d8d9b7b5477caad20e9ca55616063c27.png

与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了。需要注意的是,消息发送端需要设置消息的唯一标识(MessageId属性)

//模拟外部存储服务
var MessageRetryCounts = new Dictionary<ulong, int>();

var queueName = "storageretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        var maxRetryCount = 3;
        Console.WriteLine("拒收");
    
        //重试次数判断
        var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);
        if (!existRetryRecord)
        {
            //重入队列,继续重试
            MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)
        {
            //重入队列,继续重试
            MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        //到达最大次数,不再重试消息
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

除第一次拒收外,允许三次重试机会,三次重试完毕后,设置requeue为false,消息丢失或进入死信队列(如有设置的话)。
aa9183157b5fdefa3d6abefddfa5cc69.png

队列使用Quorum类型

第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用Quorum,由MQ来限定消息的投递次数,也就控制了重试次数。

99ce94032e4ba0e46f46cd4ff0dd7bc4.png

设置队列类型为quorum,设置投递最大次数,当超过投递次数后,消息被丢弃。

var queueName = "quorumtype_queue";
var arguments = new Dictionary<string, object>()
{
    { "x-queue-type", "quorum"},
    { "x-delivery-limit", 3 }
};
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine($"拒收 {DateTime.Now}");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

第一次消费被拒收重入队列后,经最大三次投递后,消费端不再收到消息,如此一来也限制了消息的循环消费。
ffbe228e80fa17aad41a459b0871b022.png

队列消息过期

当为消息设置了过期时间时,当消息没有受到Ack,且还在队列中,受到过期时间的限制,反复消费但未能成功时,消息将走向过期,进入死信队列或是被丢弃。

聚焦于过期时间的限制,因此在消费者端,因异常或是拒收消息时,需要对requeue设置为true,将消息再次重入到原队列中。

ce4492d837b6fb4eb07e548df2780a3d.png

设定消费者端第五十条消息会被拒收,且队列的TTL设置为5秒。

//死信交换机和死信队列
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

//常规队列
var queueName = "normalmessage_queue";
var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 5000},
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine($"拒收 {DateTime.Now}");

        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

当消费者端拒收消息后消息重入队列,再次消费,反复进行超过5秒后,消息在队列中达到了过期时间,则被挪入到死信队列中。
feb2b3b42080098057e19e59f7b52c88.png

从Web管理中死信队列中可查看该条过期的消息。

40a22dced2d8e107703837dfbcba1733.png

参考资料

  1. https://www.jianshu.com/p/f77a0b10c140

  2. https://www.jianshu.com/p/4904c609632f

  3. https://stackoverflow.com/questions/23158310/how-do-i-set-a-number-of-retry-attempts-in-rabbitmq

2022-10-29,望技术有成后能回来看见自己的脚步

技术群:添加小编微信并备注进群
小编微信:mm1552923   公众号:dotNet编程大全

rabbitmq之五种消息模型_-arthur-的博客-爱代码爱编程_rabbitmq的五种消息模型

首先什么是MQ MQ全称是Message Queue,即消息对列!消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接

使用.net core 2.1,rabbitmq,signalr,ef core 2.1和angular 6开发微服务-爱代码爱编程

目录 介绍 单一软件 微服务架构 微服务设计与规划 示例应用程序 示例应用程序的微服务 微服务进程间通信 微服务与消息队列之间的消息传递 RabbitMQ消息代理 消息队列体系结构目标和决策 帐户管理登录Web API  JSON Web令牌生成 ASP.NET Core 2.1 Web API配置和启动 配置ASP.NET C

rabbitmq实战(三)-消息者消费消息之拉取模式_wenqi365的博客-爱代码爱编程_rabbitmq主动拉取消息

「扫码关注我,面试、各种技术(mysql、zookeeper、微服务、redis、jvm)持续更新中~」 RabbitMQ学习列表: RabbitMQ实战(一)-消息通信基本概念 RabbitMQ实战(二)-消息持久化策

rabbitmq学习(一)-爱代码爱编程

// 之前项目有一个发送邮件的功能要去做,但由于是海外的服务器,所以发送总是不稳定。 // 于是想到采用消息队列的形式异步去发送邮件 消息队列分类 mq的实现方式很多,比如:RabbitMQ、Kafka、ActiveMQ、ZeroMQ和RocketMQ,以及redis 其中kafka擅长处理大数据,追求高吞吐量,常用于日志收集和传输 rabbitmq可

5.rabbitMQ的高级特性:限制消费者拉取个数,设置消息过期时间,死信交换机,延迟队列-爱代码爱编程

rabbitMQ的高级特性 目录 rabbitMQ的高级特性1. 限制消费者每秒从队列拉取的消息的数量2. 设置队列/消息的过期时间3. 死信交换机4. 延迟队列 使用这些特性解决一些特定的问题 1. 限制消费者每秒从队列拉取的消息的数量 如果并发数量很高,那么这个时候队列中就会有很多消息等待处理,如果不限制消费者的拉取数量,消费者就会每

rabbitmq 集群保证顺序消费_RabbitMQ消息投递、可靠性传输、重复消费、消息的顺序性等问题...-爱代码爱编程

1、什么是RabbitMQ?为什么要使用RabbitMQ? RabbitMQ是一款开源的、Erlang语言编写的、基于AMQP协议的消息中间件。 解耦:实现消费者和生产者之间的解耦 异步:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度 削峰:将高并发时的同步访问变为串行访问达到一定量的限流,利于数据库的操作 2、Rabbi

rabbitmq取消自动重连_.net/c# RabbitMQ 连接断开处理-断线重连(转载)-爱代码爱编程

Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,就会导致消费者或者生产者挂掉。 下图是生产者发送消息,我手动停止了rabbitmq,然后又重新启动了rabbitmq,大概等启动成功以后,为了防止服务没有完全启动,我又等待了10秒钟 服务完全启动成功以后,我

C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率-爱代码爱编程

 一、引言     使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。     二、示例     2.1、环境准备     在NuGet上安装RabbitMQ.Client。     2.2、工

关于.net高并发商品秒杀方案(RabbitMQ)-爱代码爱编程

五、Rabbit事件总线写法: 引用CAP框架,使用RabbitMQ来管理消息队列替代redis,采用SQLServer进行本地消息表的存储进行发布和订阅 RabbitMQ安装方法:https://blog.csdn.net/zhm3023/article/details/82217222 压力测试: 线程数为10,100,1000三种情况进行测试,R

分布式技术(下)-Redis&FastDFS&RabbitMQ-爱代码爱编程

第七阶段模块二 Redis 1. 概述 1.1 互联网架构的演变历程 第1阶段:数据访问量不大,简单的架构即可搞定! [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4ntkevuE-1633005745103)(E:\MarkDown\拉勾笔记\redis 架构第一阶段)] 第2阶段:数据访问量大,使用缓存技术来

三、RabbitMQ之工作队列模式 以及(多个消费者时的消费策略、消费者消息应答机制、RabbitMQ 持久化操作)-爱代码爱编程

文章目录 RabbitMQ之工作队列模式-Work Queues1、有多个消费者时的消费策略:轮询分发消息1.1、抽取工具类1.2、启动两个工作线程(消费者)1.3、 启动一个发送线程(生产者)1.4、结果展示2、消息应答机制—保证消息在消费过程中不丢失2.1、概念2.2、自动应答2.3、手动应答2.3.1、手动应答相关函数2.3.2、手动应答的好

rabbitmq 一次发送消息,多个模块消费_马力2020的博客-爱代码爱编程

@Bean(name = "test1Queue") public Queue test1Queue() { return new Queue("test1Queue", true); } @Bean(name = "test2Queue") public Queue test2Queue() { return new Queue("te

【.net 6】rabbitmq延迟消息指南_人生短几个秋的博客-爱代码爱编程

背景 最近遇到一个比较特殊需求,需要修改一个的RabbitMQ消费者,以实现在消费某种特定的类型消息时,延迟1小时再处理,几个需要注意的点: 延迟是以小时为单位不是所有消息都延迟消费,只延迟特定类型的消息只在第一次消费时

rabbitmq高级特性(三):rabbitmq实现消费端限流qos_rabbitmq qos-爱代码爱编程

RabbitMQ高级特性(三):RabbitMQ实现消费端限流Qos 一、生产者工程 (1)RabbitMQ配置文件(rabbitmq.properties) rabbitmq.host=192.168.116.1

rabbitmq 消费预取限制_rabbitmq每次只取一条-爱代码爱编程

上文我们讲解了rabbitmq基本的使用方法下面我开始介绍RabbitMQ 消费预取限制 源码在文章末尾👇🏻 1. 案例场景 work queue,工作队列,可以提高消息处理速度,避免队列消息堆积 为了避免消息堆积, 现在我们有两个消费者分别是孙悟空和猪八戒, 孙悟空的速度快, 自然工作效率更高, 而猪八戒呢自然效率不如

rabbitmq学习(六).net client之rpc_mq .net cpre-爱代码爱编程

6 RPC Remote procedure call implementation Python | Java | Ruby | PHP| C# 转载请注明出处:jiq•钦's technical Blog Remote procedure call (RPC) (using the .NET client) 在第二个教程

asp.net core websocket集群实现思路详解-爱代码爱编程

前言 提到WebSocket相信大家都听说过,它的初衷是为了解决客户端浏览器与服务端进行双向通信,是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询,来实现服务端的变更响应到客户端,现在服务端也可以主动发送数据到客户端浏览器。WebSocket协议和Http协议平行,都属于TCP/IP四层模