代码编织梦想

RabbitMQ(五)死信队列


6. 死信队列

​ 死信:无法被消费的消息,在某些时候由于特定的原因到导致了queue中某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信,有了死信,就自然有了死信队列

​ 应用场景,为了保证订单业务的数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息发生消费异常时,将消息投入死信队列中。场景:用户在商城下单成功并点击支付后在指定时间未支付时自动失效。

6.1 死信产生的原因

1. 消息TTL过期
2. 队列达到了最大长度(队列已经排满了,无法再添加到MQ中)
3. 消息被拒绝(basic.reject 或 basic.nack)并且requeue = false

6.2 死信队列实战

6.2.1 代码架构图

在这里插入图片描述

6.2.2 消息TTL过期

​ 我们可以在交换机声明处,或者队列声明处设置消息过期时间TTL,当消息超过这个时间还没有被消费,则会转入死信队列。

​ 我们在这次实战中通过关闭消费端,由生产者发送十条信息,消息发送后没有被消费,超时后就会发送到死信队列中,由死信队列消费。

  1. 生产者 Producer (设置过期时间TTL,发送消息)

    public class Producer {
      public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtil.getChannel();
    
        //死信消息 设置TTL时间 单位是ms
        AMQP.BasicProperties properties =
          new AMQP.BasicProperties()
          .builder().expiration("10000").build();
    
        for (int i = 1; i <= 10; i++) {
          String message = "info"+i;
          channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",properties,message.getBytes("UTF-8"));
          System.out.println("发送消息 "+message+" 成功");
        }
      }
    }
    
  2. 普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)

    public class Consumer01 {
    
      public static void main(String[] args)throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        //普通交换机
        channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
        HashMap<String, Object> arguments = new HashMap<>();
    
        //过期时间 10s 队列处也可以设置ttl 但是一般在交换机处设置
        //        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
    
        //声明普通队列
        channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
    
        /
        //声明死信队列
        channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
        //绑定普通队列
        channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
    
        //绑定死信队列
        channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
        System.out.println("等待接收消息。。。。");
        //ttl接收消息
        channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> {
          String msg = new String(message.getBody(), "UTF-8");
          System.out.println("ConsumerO1接收到消息:" + msg);
          System.out.println("处理完成");
        }, (consumerTag) -> {
          System.out.println("Consumer01 错误消息被中断:" + consumerTag);
        });
    
      }
    }
    
    
  3. 死信队列消费者 Consumer02 (启动后关闭此队列,观察消息去向后打开消费掉死信)

    /**
     * 死信队列, 消费者2
     */
    public class Consumer02 {
    
        public static void main(String[] args)throws Exception {
            Channel channel = RabbitMqUtil.getChannel();
            //声明死信交换机
            channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
            //声明死信队列
            channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
            //绑定死信队列
            channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
            System.out.println("等待接收消息。。。。");
            //接收消息
            channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->{
                System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8"));
            },tag->{});
    
        }
    }
    
    

操作步骤

  1. 查看未发送消息状态下 队列状态

    在这里插入图片描述

  2. 启动生产者代码 发送十条消息 此时正常队列中有十条未消费信息

    在这里插入图片描述

  3. 等待时间过去10s,等待消息过期,正常队列中的消息由于没有被消费,消息进入死信队列

    在这里插入图片描述

  4. 启动死信队列,可以看到过期的消息被一条一条消费。

    在这里插入图片描述

6.2.3 队列达到最大长度

​ 我们可以在队列声明处设置队列的长度,当队列中消息超过这个长度时,则会转入死信队列。

​ 我们在这次实战中通过给消费者设置队列长度,然后将它关闭,不消费信息,消息发送后超过这个长度,超过这个长度的消息就会发送到死信队列中,由死信队列消费。

  1. 生产者 Producer

    public class Producer {
      public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtil.getChannel();
    
        for (int i = 1; i <= 10; i++) {
          String message = "info"+i;
          channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8"));
          System.out.println("发送消息 "+message+" 成功");
        }
      }
    }
    
  2. 普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)

    public class Consumer01 {
    
      public static void main(String[] args)throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        //普通交换机
        channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
        HashMap<String, Object> arguments = new HashMap<>();
    
        //设置队列最大长度,达到这个长度将后续消息转发给c2消费者
        arguments.put("x-max-length",6);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
    
        //声明普通队列
        channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
    
        /
        //声明死信队列
        channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
        //绑定普通队列
        channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
    
        //绑定死信队列
        channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
        System.out.println("等待接收消息。。。。");
        //ttl接收消息
        channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> {
          String msg = new String(message.getBody(), "UTF-8");
          System.out.println("ConsumerO1接收到消息:" + msg);
          System.out.println("处理完成");
        }, (consumerTag) -> {
          System.out.println("Consumer01 错误消息被中断:" + consumerTag);
        });
    
      }
    }
    
    
  3. 死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)

    /**
     * 死信队列, 消费者2
     */
    public class Consumer02 {
    
        public static void main(String[] args)throws Exception {
            Channel channel = RabbitMqUtil.getChannel();
            //声明死信交换机
            channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
            //声明死信队列
            channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
            //绑定死信队列
            channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
            System.out.println("等待接收消息。。。。");
            //接收消息
            channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->{
                System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8"));
            },tag->{});
    
        }
    }
    
    

操作步骤

​ 注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。

  1. 启动两个消费者 然后关闭,打开生产者(可以发现超出长度的消息自动转入死信队列中)

    在这里插入图片描述

  2. 启动两个消费者,可以看到死信队列消费了其中四条消息

    在这里插入图片描述

6.2.5 消息被拒绝

​ 我们可以不使用自动应答,而采用手动应答,在手动应答中的拒绝应答,并且拒绝消息重新入队,若存在死信队列,则消息会转入死信队列。

​ 我们在这次实战中通过在消息应答里拒绝一部分应答,并阻止消息重新入队,这条消息就会发送到死信队列中,由死信队列消费。

  1. 生产者 Producer

    public class Producer {
      public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtil.getChannel();
    
        for (int i = 1; i <= 10; i++) {
          String message = "info"+i;
          channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8"));
          System.out.println("发送消息 "+message+" 成功");
        }
      }
    }
    
  2. 普通消费者 Consumer01

    public class Consumer01 {
    
      public static void main(String[] args)throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        //普通交换机
        channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
        HashMap<String, Object> arguments = new HashMap<>();
    
        //设置队列最大长度,达到这个长度将后续消息转发给c2消费者
        arguments.put("x-max-length",6);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
    
        //声明普通队列
        channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
    
        /
        //声明死信队列
        channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
        //绑定普通队列
        channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
    
        //绑定死信队列
        channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
        System.out.println("等待接收消息。。。。");
        channel.basicConsume(QueueName.Normal_Queue.getName(), false,(consumerTag,message)->{
          String msg = new String(message.getBody(),"UTF-8");
          if ("info5".equals(msg)){
            System.out.println(msg+"此消息是被拒绝的!");
            //拒绝应答 1为消息标签,2为是否放回普通队列
            channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
          }else {
            System.out.println("Consumer01 接收到消息:"+new String(message.getBody(),"UTF-8"));
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
          }
    
          System.out.println("处理完成");
        },(consumerTag)->{
          System.out.println("Consumer01 错误消息被中断:"+consumerTag);
        });
      }
    }
    
    
  3. 死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)

    /**
     * 死信队列, 消费者2
     */
    public class Consumer02 {
    
        public static void main(String[] args)throws Exception {
            Channel channel = RabbitMqUtil.getChannel();
            //声明死信交换机
            channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
            //声明死信队列
            channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
            //绑定死信队列
            channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
            System.out.println("等待接收消息。。。。");
            //接收消息
            channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->{
                System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8"));
            },tag->{});
    
        }
    }
    
    

操作步骤

​ 注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。

  1. 启动两个消费者 ,打开生产者(可以发现 info5 的消息自动转入死信队列中)

    在这里插入图片描述

思考:如果拒绝了某一条消息,但是并没有拒绝它重新入队会导致什么?

if ("info5".equals(msg)) {
  System.out.println(msg + "此消息是被拒绝的!");
  //拒绝应答 1为消息标签,2为是否放回普通队列
  //channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
channel.basicReject(message.getEnvelope().getDeliveryTag(), true); //拒绝但是允许重新入队
}

答案是:如果允许重新入队,队列会一直循环处理这一条消息,这条消息也会被循环入队

在这里插入图片描述

在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_27331467/article/details/126150718

rabbitmq死信队列详解与使用_小码农叔叔的博客-爱代码爱编程_rabbitmq死信队列

什么是死信队列 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消

RabbitMQ设置死信队列-爱代码爱编程

学习死信队列,首先要理解死信产生的原因或条件: 消息被拒 ( basic.reject or basic.nack ) 并且没有重新入队 ( requeue=false ); 消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL ( Time To Live ) 时间; 当前队列中的消息数量已经超过最大长度。 一旦消息变成了死信

php mq死信队列,RabbitMQ的死信队列详解-爱代码爱编程

死信队列介绍 死信队列:DLX,dead-letter-exchange 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX 消息变成死信有以下几种情况 消息被拒绝(basic.reject / basic.nack),并且requeue

RabbitMQ的死信队列机制-爱代码爱编程

一、死信队列 死信指的是无法被消费的消息,由于消息TTL过期、队列达到最大长度、消息被拒绝等原因,导致队列中一些消息无法被消费,这样的消息如果没有进行后续的处理,就会变成死信。为了保证消息数据不丢失,需要使用死信队列机制。 二、死信队列的实现 1、消息 TTL 过期 普通队列消费者Consumer01 public class Consume

RabbitMQ的死信队列-爱代码爱编程

什么是死信 在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。 死信就是消息在特定场景下的一种表现形式,这些场景包括: 消息被拒绝访问,即 RabbitMQ返回 nack 的信号时消息的 TTL 过期时消息队列达到最大长度消息不能入队时。上述场景经常产生死信,即消息在这些场景中时,被称为死信。 什么是死信队列 死信队列就

rabbitmq实现死信队列(springboot+rabbitmq)-爱代码爱编程

1.什么是死信队列(专门用来存放死信的队列) 当一个队列中的消息变成死信以后,该消息被重新publish到另一个交换机(该交换机叫做死信交换机)上,交换机将消息发送到指定的队列(该队列叫死信队列)中。 2. 消息什么时候会变成死信 1)消息被拒绝(basic.reject / basic.nack),并且requeue = false

rabbitmq实现死信队列-爱代码爱编程

所谓的死信队列,也就是我们说的延迟队列。其实现方式就是给普通队列绑定一个所谓的死信队列,给消息设置一个过期时间,在该时间内如果消息没有被消费,那么则会进入死信队列。我这里以Direct模式处理,fanout模式也是类似 下面开始整活。(前提是你已经会基本使用springboot+rabbitmq) <dependency>

RabbitMQ——死信队列-爱代码爱编程

1. 死信队列 1.1 概念 死信就是无法被消费的消息,消费者从队列中取消息时,由于某些特定原因导致消息无法被消费,即没有了后续的处理,就变成了死信继而有了死信队列。 1.2 应用场景 可以保证消息不会消失,如果消费者在进行消费时发送异常,可以先放到死信队列中,等后面运行环境好了之后再进行消费。 1.3 死信来源 消息TTL

【尚硅谷 RabbitMQ】5、图文详解 死信队列-爱代码爱编程

1、概述 死信:就是 无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因,导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景: 为了保证订单业务的消息

RabbitMq——死信队列-爱代码爱编程

死信:由于某些原因导致队列中的消息无法被消费者消费,消息就会成为死信。 死信来源:消息ttl过期、消息被拒绝·,队列已达最大长度。 死信队列:为保护系统订单业务数据不丢失,当消息成为死信时,会将消息保存到死信队列中。 其架构图如下: 案例:一个生产者发送10条消息,消息5被拒收,进入死信队列,其他消息正常消费: 1、创建连接获取信道工具类

rabbitmq创建死信队列_一个头发贼多的小火鸡的博客-爱代码爱编程

向Queue_TTL队列里发送消息,10秒后当消息没有被正常消费掉则消息过期被转发至 Queue_DLX死信队列 const amqp = require('amqplib'); async function TTL_production() { var conn = await amqp.connect("amqp://localhost

springboot:整合rabbitmq之死信队列_yololee_的博客-爱代码爱编程

文章目录 springboot:整合rabbitmq之死信队列一、项目准备依赖配置类yaml配置文件二、死信队列介绍三、案例创建生产者和消费者测试结果修改消费者、手动确认重启测试四、总结 springboot:整合rabbitmq之死信队列 一、项目准备 依赖 <dependency>

rabbitmq的死信队列详解_知难行难1985的博客-爱代码爱编程

1.基础知识 1.死信队列介绍 死信交换机:DLX,dead-letter-exchange利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,进而推送到死信队列,这个Exchange就是DLX2.消息变成死信有以下几种情况 消息被拒绝(basic.reject / basic

rabbitmq(三)持久化与发布确认_lvhaoit的博客-爱代码爱编程

RabbitMQ(三)持久化与发布确认 文章目录 **RabbitMQ(三)持久化与发布确认**3.3 RabbitMQ 持久化3.3.1 队列持久化3.3.2 消息持久化3.3.3 不公平分发3.3.4 预取值

rabbitmq的高可用方案_倔强100%的博客-爱代码爱编程

RabbitMQ的高可用方案 集群方案cluster镜像Federation插件Shovel插件 部署方式多机多节点单机多节点 集群方案 cluster Cluster普通模式 如图所

rabbitmq之死信队列_rabbitmq 死信队列-爱代码爱编程

一、死信队列概念 顾名思义,(死去的消息)即无法被消费的消息,指的是消费者在消费生产者生产的消息时发生了某些特殊情况(下文会说),导致消息无法被正常消费,存放这些未被消费的消息的队列即为死信队列。 二、死信队列应用场景