代码编织梦想

死信队列

死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成死信,有死信自然就有死信队列。

一般应用场景:为了保证订单业务中的消息数据不丢失,需要使用rabbitMQ的死信队列机制,当消息消费发生异常的时候,将消息投入死信队列中,还有比如说:用户在商场下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  • 消息被拒绝(basic.reject或basic.nack)并且requeue= false;

死信实战

在这里插入图片描述

消息TTL过期
在这里插入图片描述

/*
死信队列
 之生产者代码
 */
public class Product {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        //死信消息,设置TTL时间
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties()
                        .builder().expiration("10000").build();

        for (int i = 0; i < 11; i++) {
            String message = "info "+ i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}

/*
死信
    消费者
 */
public class Consumer01 {

    //普通交换机的名称
    private static  final  String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    private  static  final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名称
    private static  final  String NORMAL_QUEUE = "normal_queue";
    //死信队列名称
    private static  final  String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        //声明死信和普通交换机类型 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        // 普通队列
        Map<String,Object> argument = new HashMap<>();
        //过期时间10秒过期   可以不设置,可以交给队列自动去
       // argument.put("x-message-ttl",100000);
        //正常队列设置死信交换机
        argument.put("x-dead-lettle-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        argument.put("x-dead-lettle-routing-key","lisi");

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);

        ///
        //声名死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机和普通队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接受......");

        //绑定死信的交换机和死信的对列
        DeliverCallback deliverCallback = (consumer,message)->{
            System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumer->{});
    }
}
public class Consumer02 {

    //死信队列名称
    private static  final  String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        System.out.println("等待接受......");

        //绑定死信的交换机和死信的对列
        DeliverCallback deliverCallback = (consumer,message)->{
            System.out.println("Consumer 02接受到消息是:" + new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumer->{});
    }
}

解决进不去死信队列的原因了,就是我没给他把过期时间值传进去,导致出现问题。

队列达到最大长度
消息生成者代码去掉TTL属性。

public class Consumer01 {

    //普通交换机的名称
    private static  final  String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    private  static  final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名称
    private static  final  String NORMAL_QUEUE = "normal_queue";
    //死信队列名称
    private static  final  String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        //声明死信和普通交换机类型 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        // 普通队列
        Map<String,Object> argument = new HashMap<>();
        //过期时间10秒过期   可以不设置,可以交给队列自动去
       // argument.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        argument.put("x-dead-letter-routing-key","lisi");

        //设置正常队列的长度的限制
        argument.put("x-max-length",6);


        channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);

        ///
        //声名死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机和普通队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接受......");

        //绑定死信的交换机和死信的对列
        DeliverCallback deliverCallback = (consumer,message)->{
            System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumer->{});
    }

}
public class Product {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        //死信消息,设置TTL时间 单位是ms
     //   AMQP.BasicProperties properties =
      //          new AMQP.BasicProperties()
                        //.builder().expiration("10000").build();

        for (int i = 0; i < 11; i++) {
            String message = "info "+ i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
            System.out.println("生产者发送消息:"+message);
        }
    }
}

在这里插入图片描述
消息被拒绝
一旦消费者拒绝接受,就会成为死信队列。

public class Product {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        //死信消息,设置TTL时间 单位是ms
      //演示长度  AMQP.BasicProperties properties =
         //   new AMQP.BasicProperties()
           //             .builder().expiration("10000").build();

        for (int i = 0; i < 11; i++) {
            String message = "info "+ i;                        //properties
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
            System.out.println("生产者发送消息:"+message);
        }
    }
}
public class Consumer01 {

    //普通交换机的名称
    private static  final  String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    private  static  final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名称
    private static  final  String NORMAL_QUEUE = "normal_queue";
    //死信队列名称
    private static  final  String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        //声明死信和普通交换机类型 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        // 普通队列
        Map<String,Object> argument = new HashMap<>();
        //过期时间10秒过期   可以不设置,可以交给队列自动去
       // argument.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        argument.put("x-dead-letter-routing-key","lisi");

        //设置正常队列的长度的限制
      //演示拒绝  argument.put("x-max-length",6);


        channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);

        ///
        //声名死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机和普通队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接受......");

        //绑定死信的交换机和死信的对列
        DeliverCallback deliverCallback = (consumer,message)->{
       //拒绝消息在这里写
            String message1 = new String(message.getBody(),"UTF-8");
           //指定你要拒绝的消息
            if(message1.equals("info 5")){
                System.out.println("此消息是被拒绝的"+ message1);
                //拒接接收,且不放回队列,避免队列重新发送
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {

                System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
               //   接收且不放回
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }

        };
        //开启手动应答,                      //true是自动应答。
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumer->{});
    }

}
public class Consumer02 {

    private  static  final String DEAD_EXCHANGE = "dead_exchange";

    //死信队列名称
    private static  final  String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接受......");

        //绑定死信的交换机和死信的对列
        DeliverCallback deliverCallback = (consumer,message)->{
            System.out.println("Consumer 02接受到消息是:" + new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumer->{});
    }
}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_45922256/article/details/127797219

java面试官:程序员,请你告诉我是谁把公司面试题泄露给你的?_跟着我学java的博客-爱代码爱编程

前情提要: 面试官:你好!请先做一下自我介绍! 程序员:balabalabala... 前戏先过了.... 面试官:先介绍SpringCloud核心组件及其作用 程序员:SpringCloud由以下5个核心组件构成...另外,SpringCloud的工作流程是这样子的↓ 面试官(疑惑的低头看了看自己的问题):嗯,不错,工作流程都说出

rabbitmq第二个实操小案例——workqueue_keeling1720的博客-爱代码爱编程

文章目录 RabbitMQ第二个实操小案例——WorkQueue RabbitMQ第二个实操小案例——WorkQueue 讲第二个案例之前,我们先看下前面第一个案例的模型: 可以看到,我们只有一个发布者和一

rabbitmq学习-发布和确认_子非吾喵的博客-爱代码爱编程

发布和确认 生产者 - - 发送消息 – 队列hello 🍎必须保存在磁盘上才能达到持久化操作。 设置要去队列必须持久化设置要求队列中的消息必须持久化发布确认 单个确认发布: 每次生产者生产一个消息他都会确认一次,这

rabbitmq学习-交换机(exchanges)_子非吾喵的博客-爱代码爱编程

交换机(exchanges) 当使用到交换机的时候,我们用的就不是普通的模式了,而是发布订阅模式了。 生产者生成的消息不会直接发送到队列,而是直接将消息先发送到交换机,并且只能发送到交换机,之钱的我们可以直接发送到队列(

rabbitmq学习-延迟队列_子非吾喵的博客-爱代码爱编程

延迟队列 延迟队列,对列内部是有序的,最重要的特性就是体现在他的延时属性上,延时队列中的元素时希望在指定时间到了之后或者之前取出和处理,简单的来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列的使用场

rabbitmq学习---了解同步异步通讯,rabbitmq的安装,创建第一个用户,以及虚拟主机的权限划分_头发掉完键盘砸烂的博客-爱代码爱编程

消息队列的认识 同步异步通讯 微服务间通讯有同步和异步两种方式 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 同步调用的优点: 时效性较强,可以立即得到结果

【一文带你详细学习rocketmq存储设计方案、rocketmq中消息文件存储结构、过期文件删除机制、零拷贝与mmap内存映射】_硕风和炜的博客-爱代码爱编程

一.知识回顾 【0.RocketMQ专栏的内容在这里哟,帮你整理好了,更多内容持续更新中】 【1.Docker安装部署RocketMQ消息中间件详细教程】 【2.RocketMQ生产者发送消息的三种方式:发送同步消息

常见消息队列分析对比_娜布其 20224016017的博客-爱代码爱编程

消息队列 应用场景 应用解耦,比如,用户下单后,订单系统需要通知库存系统,假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。订单系统与库存系统耦合,这个时候如果使用消息队列,可以返回给用户成功,先把消息持久化,等库存系统恢复后,就可以正常消费减去库存了。削峰填谷,比如,秒杀活动,一般会因为流量过大,从而导致流量暴增,应用挂掉,这个时候加上消息

rabbitmq_l10711097061的博客-爱代码爱编程

RabbitMQ 1.MQ引言 MessageQueue: 消息队列 模块之间的耦合度多高,导致一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。 1.1什么是MQ MQ(Me

rabbitmq_windows系统下安装rabbitmq详细教程_mudrock__的博客-爱代码爱编程

在安装RabbitMQ之前,需要先确认当前计算机上是否安装了Erlang(RabbitMQ的运行需要Erlang环境);未安装Erlang也不要紧,RabbitMQ官网提供了Erlang的下载链接,下文会一同说明 说明:该博客选择的RabbitMQ版本为该博客发布时的最新版本(3.11.3),若想选择其余版本,可在官网首页右侧选择;3.11.3

rabbitmq------死信队列(消息超时、达到最大长度、消费拒绝)(六)_诗与猿方的博客-爱代码爱编程

RabbitMQ------死信队列(六) 死信的概念 死信:无法被消费的消息,一般情况下:生产者将消息投递到broker或者直接到queue中,消费者从queue取出消息进行消费,但是某些时候,由于特定原因导致queu

rabbitmq---springamqp的使用,五种消息模型的示例_头发掉完键盘砸烂的博客-爱代码爱编程

SpringAMQP的使用: SpringAMQP 提供了三个功能: 自动声明队列、交换机及其绑定关系 基于注解的监听器模式,异步接收消息 封装了 RabbitTemplate 工具,用于发送消息 第一步引入

rabbitmq第三个实操小案例——发布者/订阅者(publish/subscribe)_keeling1720的博客-爱代码爱编程

文章目录 RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe) RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe) 发布者/订阅者 模

rabbitmq-爱代码爱编程

RabbitMQ------交换机(五) 交换机是rabbitMq的一个重要组成部分,生产者先将消息发送到交换机,再由交换机路由到不同的队列当中。 之前都没有指定交换机。 传一个空串,默认会走AMQP default默认交

rabbitmq-爱代码爱编程

RabbitMq简介 RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。 RabbitMQ 基础架构如下图: Rab

rabbitmq-爱代码爱编程

Rabbitmq消息确认机制 https://blog.csdn.net/yorsola/article/details/108436276 官网:https://www.rabbitmq.com/confirms.htm