代码编织梦想

RabbitMQ------死信队列(六)

死信的概念

死信:无法被消费的消息,一般情况下:生产者将消息投递到broker或者直接到queue中,消费者从queue取出消息进行消费,但是某些时候,由于特定原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信消息,有了死信消息就产生了死信队列。
当消息消费发生异常时,将消息投入死信队列中。比如:用户在商城下单成功并点击去支付后,在指定时间未支付时自动失效。

死信的来源

1.消息TTL(存活时间)过期
2.队列达到最大长度(队列满了,无法再添加数据到mq中)
3.消息被拒绝(basic.reject或basic.nack)并且requeue = false(不放回队列中)
在这里插入图片描述
示例代码:
消费者1是最为重要的,需要声明交换机1,队列1,以及消费者2,队列2,并且需要当消息异常时,将死信消息转发到交换机2,再由交换机2转发到队列2.
消费者2只需要消费队列2的消息。
生产者需要向交换机1发送消息。
消费者1:
和以前不一样,这里需要在队列1,就是普通队列中,添加对死信的操作,因此为arguments参数进行了定义。

/**
 * 死信
 * 消费者01
 */
public class Consumer01 {
    //正常交换机
    public static  final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机
    public static  final String DEAD_EXCHANGE = "dead_exchange";
    //正常队列
    public static  final String NORMAL_QUEUE = "normal_queue";
    //死信队列
    public static  final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtiles.getChannel();
        //声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明普通队列
        /**
         * 普通队列需要指定对应的参数
         * 才能够当消息成为死信后
         * 转发到死信交换机
         * 再发送到死信队列
         * arguments
         */
        Map<String, Object> arguments = new HashMap<>();
        //过期时间 10s  单位毫秒
        //也可以在生产者发送时,指定过期时间,一般使用生产者发送时指定
//        arguments.put("x-message-ttl",10000);
        //正常队列过期后设置死信交换机 x-dead-letter-exchange 固定格式
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信交换机的routingkey
        arguments.put("x-dead-letter-routing-key","roukey2");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定普通交换机与队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"roukey1");

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"roukey2");


        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            //消息有消息头、消息体
            System.out.println("消费者01接收到的消息"+new String(message.getBody(),"UTF-8"));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被终断");
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
    }
}

生产者:
生产者同样做了变动,设置了消息变为死信的过期时间,并且作为参数传递了进去。

public class ProducerLog {
    public static  final String EXCHANGE_NAME = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtiles.getChannel();
        //设置过期时间,变为死信 10s   单位毫秒
        AMQP.BasicProperties  properties = new AMQP.BasicProperties().
                builder().expiration("10000").
                build();
        for (int i = 0; i < 10; i++) {
            String message = "info:"+i;
            channel.basicPublish(EXCHANGE_NAME,"roukey1",properties,message.getBytes("UTF-8"));
        }

    }
}

消费者2
只消费死信队列中的消息

public class Consumer02 {
    //死信队列
    public static  final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtiles.getChannel();
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            //消息有消息头、消息体
            System.out.println("消费者02接收到的消息"+new String(message.getBody(),"UTF-8"));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被终断");
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }
}

消息达到过期时间未被消费,变为死信,再被消费者2消费的场景

1.先启动消费者1,创建正常队列、死信队列、正常交换机、死信交换机
2.再关闭消费者1,交换机和队列创建后,依然存活,模拟消费者1,未能及时消费
3.启动生产者,生产者发送消息值正常交换机
4.现象:正常队列内信息条数为10,过了10秒后,变为0,死信队列消息条数由0变为10.
5.启动消费者2,死信队列消息被消费。

达到最大队列长度场景

需要在消费1,声明正常队列时,加入如下参数即可
设置最大长度为6,超过6个,多余的就为死信,进入死信队列。
生产者删除,最大存活时间的设置。
注,之前队列已经创建,属性改变,之前队列需要删除

        //设置正常队列长度的限制
        arguments.put("x-max—letter",6);

消息被拒绝场景

需要在消费者1接收消息的回调函数中,进行改造,增加拒绝操作

        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            String mes = new String(message.getBody(), "UTF-8");
            if (mes.equals("info:5")){
                System.out.println("消费者01拒绝消息"+mes);
                /**
                 * 拒绝消息
                 * 消息的标签
                 * 是否放回原队列
                 */
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {
                /**
                 * 应答消息
                 * 消息的标签
                 * 是否批量应答
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                //消息有消息头、消息体
                System.out.println("消费者01接收到的消息"+mes);
            }
        };
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/cz_chen_zhuo/article/details/127823306

springboot rabbitmq 之死信队列(延迟消费消息)_weixin_34288121的博客-爱代码爱编程

2019独角兽企业重金招聘Python工程师标准>>> 之前探讨了springboot 集成 rabbitmq  以及开启ack模式    传送门:https://my.oschina.net/u/2948566/blog/1624963 接着该篇 搞一下 死信队列 概念 死信队列 听上去像 消息

rabbitmq实现消息的消费确认,死信队列、延迟重试队列_小黑小黑白的博客-爱代码爱编程

消息的消费确认实现原理: 当消费者的消息消费异常时,消息进入延迟重试队列,待超时后重新发送到重试队列指定的死信队列,死信队列重新消费信息,如果又出现死信情况,继续进入延时重试队列,依次循环,当重试超过3次后,消息进入失败队

rabbitmq死信队列,消息延迟消费-爱代码爱编程

rabbitmq中如果消息被拒绝,或消息过期,或极端情况队列达到最大长度,这时消息就变成了死信,死信消息可以通过队列绑定死信交换器,从而将消息路由到死信交换器绑定的队列里。 public static void main(String[] argv) throws Exception { ConnectionFactory factory = n

RabbitMQ的死信队列实现消息的延时消费-爱代码爱编程

文章目录 前言RabbitMQ 延时消息说明Spring Boot + RabbitMQ 实现消息的延时消费1、导入依赖2、编写配置3、定义交换机、队列、路由key4、配置 RabbitMQ 绑定关系5、延时消息生产者6、延时消息的消费者7、编写接口测试发送消息8、测试结果 前言 在日常的开发中我们常常会遇到需要在一个事情完成之后的一段时间后

死信队列-爱代码爱编程

死信队列 什么是死信队列 一般来说,producer将消息投递到queue中,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信(Dead Letter),所有的死信都会放到死信队列中。 “死信”消息会被RabbitMQ进行特殊处理,如果配置了

RabbitMQ(一)使用死信队列实现延迟队列消费-爱代码爱编程

在使用rabbitmq实现延迟消费时,需要先明白什么是死信队列,什么是延迟队列。 rabbitmq并没有直接支持延迟队列,延迟队列是通过死信队列来实现的。 死信队列 DLX(Dead Letter Exchange),死信交换器。当队列中的消息被拒绝、或者过期会变成死信,死信可以被重新发布到另一个交换器,这个交换器就是DLX,与DLX绑定的队列称为死信

RabbitMQ中的死信队列-爱代码爱编程

文章目录 死信队列概念死信的来源用一个例子来演示死信队列写消费者1写生产者写消费者2测试 死信队列 概念 当queue消息队列中的消息由于一些原因没办法被消费,如果这些消息一直没办法被处理,就会从这个正常的消息队列转移到死信消息队列中。 应用场景:当用户下单之后,如果一直不支付,那么用户下单的这条消息就会被存放到死信队列中去。 死信的来

RabbitMq延时消费(死信队列)-爱代码爱编程

业务背景,创建订单后进行延迟回调确认订单 思想:定义AExchange、BExchange;AQueue、BQueue;AKey、BKey。没有消费者消费AQueue, 有消费者消费BQueue,当AQueue队列中的消息到了过期时间后,就自动转到私信队列BQueue中,且对用的key为Bkey,由BQueue的消费者去消费消息。具体设置见下代码: A

基于死信队列实现RabbitMQ消息消费失败后的处理方案-爱代码爱编程

死信队列的使用:处理失败的消息 一般生产环境中,如果你有丰富的架构设计经验,都会在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。 核心业务队列,就是比如上面专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。 之所以我们这篇文章抛出一个面试题,结果先长篇大论说一个生产实践案例和业务场景,就是因为面试

RabbitMQ的死信队列-爱代码爱编程

什么是死信 在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。 死信就是消息在特定场景下的一种表现形式,这些场景包括: 消息被拒绝访问,即 RabbitMQ返回 nack 的信号时消息的 TTL 过期时消息队列达到最大长度消息不能入队时。上述场景经常产生死信,即消息在这些场景中时,被称为死信。 什么是死信队列 死信队列就

RabbitMQ之死信队列-爱代码爱编程

死信队列: 死信队列用于处理无法被正常消费的消息。 一条消息初次消费失败会被重试消费,若重试次数达到最大值(默认16次,在客户端可配置)时,依然消费失败,则其将被投递到该消费者对应的特殊队列(即死信队列)中,这种消息被称为死信消息。 应用场景: 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息出现异常时,将消息投入

rabbitmq 死信队列详解_web18484626332的博客-爱代码爱编程

一、死信的概念 死信,顾名思义就是无法被消费的消息。一般来说,Producer 将消息投递到 Broker 或者直接到 Queue 里了,Consumer 从 Queue 取出消息进行消费,但某些时候由于特定的原因导致 Q

rabbitmq(五)死信队列_lvhaoit的博客-爱代码爱编程

RabbitMQ(五)死信队列 文章目录 RabbitMQ(五)死信队列6. 死信队列6.1 死信产生的原因6.2 死信队列实战6.2.1 代码架构图6.2.2 消息TTL过期6.2.3 队列达到最大长度6.2.5

rocketmq的死信队列_刘java的博客-爱代码爱编程

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该

【rocketmq 二十六】rocketmq应用之死信队列_笨基乙胺的博客-爱代码爱编程

1 什么是死信队列 当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队