代码编织梦想

1:死信队列架构图

 

2:业务描述:

发10条消息,到固定长度为6的普通队列,普通队列有哪些消息?死信队列有哪些消息?

3:结论

普通队列满了之后继续添加消息,前面的消息会挤出到死信队列中。如:
普通队列长度为6,添加10个进去后,先进去的4个会到死信队列中,最后6个消息还在普通队列中。
发送的消息
普通队列的消息
死信队列的消息
9876543210
987654
3210

4:代码实现

1、工具类代码

public class RabbitmqUtils {
    private static Logger   logger          = LoggerFactory.getLogger(RabbitmqUtils.class);

    // 死信相关队列
    public  static String   NORMAL_EXCHANGE_NAME        = "normal_exchange";
    public  static String   NORMAL_EXCHANGE_ROUTING_KEY = "normal_routing_key";
    public  static String   NORMAL_EXCHANGE_QUEUE_NAME  = "normal_queue";
    public  static String   DEAD_EXCHANGE_NAME          = "dead_exchange";
    public  static String   DEAD_EXCHANGE_QUEUE_NAME    = "dead_queue";
    public  static String   DEAD_EXCHANGE_ROUTING_KEY   = "dead_routing_key";

    // 队列持久化
    public static boolean   DURABLE = true;
    public static Connection connection;
    public static Channel   channel;

    static {
        logger.info("getChannel begin...");
        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        logger.info("getChannel factory:{}", factory.toString());

        factory.setHost("192.168.6.8");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123");

        // 创建链接
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
        logger.info("getChannel connection:{}", connection.toString());

        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        logger.info("getChannel channel:{}", channel.toString());
        logger.info("getChannel success!");
    }

    /**
     * 死信队列实战
     *
     * @return
     * @throws IOException
     */
    public static Channel createDeadExchangeAndQueue() throws IOException {
        // 删除已存在的交换机
        channel.exchangeDelete(RabbitmqUtils.NORMAL_EXCHANGE_NAME);
        channel.exchangeDelete(RabbitmqUtils.DEAD_EXCHANGE_NAME);
        // 删除已存在的队列
        channel.queueDelete(RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME);
        channel.queueDelete(RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME);

        // 创建普通交换机
        channel.exchangeDeclare(
                RabbitmqUtils.NORMAL_EXCHANGE_NAME,
                BuiltinExchangeType.DIRECT,
                RabbitmqUtils.DURABLE);
        // 正常队列绑定死信队列信息并设置队列的长度
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange",RabbitmqUtils.DEAD_EXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key",RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
        arguments.put("x-max-length",6);

        // 创建普通队列
        channel.queueDeclare(
                RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
                RabbitmqUtils.DURABLE,
                false,false,arguments);
        // 将不同队列绑定到普通交换机上
        channel.queueBind(
                RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
                RabbitmqUtils.NORMAL_EXCHANGE_NAME,
                RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY);


        // 创建死信交换机
        channel.exchangeDeclare(
                RabbitmqUtils.DEAD_EXCHANGE_NAME,
                BuiltinExchangeType.DIRECT,
                RabbitmqUtils.DURABLE);
        // 创建死信队列
        channel.queueDeclare(
                RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
                RabbitmqUtils.DURABLE,
                false,false,null);
        // 将死信队列绑定到交换机上
        channel.queueBind(
                RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
                RabbitmqUtils.DEAD_EXCHANGE_NAME,
                RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
        return channel;
    }

}

2、生产者代码

public class DeadProducer {
    private static Logger logger = LoggerFactory.getLogger(DeadProducer.class);
    public static void main(String[] args) throws IOException {

        Channel channel = RabbitmqUtils.createDeadExchangeAndQueue();

        // 发送消息
        for (int i=0; i< 10; i++) {
            String message = "info"+i;  
            channel.basicPublish(
                RabbitmqUtils.NORMAL_EXCHANGE_NAME,
                RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY,
                null,
                message.getBytes());
            logger.info("发送的消息内容为:{}",message);
        }

    }
}

生产者输出日志:

发送的消息内容为:info0
发送的消息内容为:info1
发送的消息内容为:info2
发送的消息内容为:info3
发送的消息内容为:info4
发送的消息内容为:info5
发送的消息内容为:info6
发送的消息内容为:info7
发送的消息内容为:info8
发送的消息内容为:info9

生产者发送消息后的结果

3、普通消费者1代码

public class OrdinaryConsumer1 {

    private static Logger logger = LoggerFactory.getLogger(OrdinaryConsumer1.class);

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitmqUtils.channel;

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            String msg = new String(message.getBody(), "UTF-8");
            logger.info("普通队列接受到的消息是:{}", msg);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            logger.info(consumerTag + "消息被中断");
        };
        channel.basicConsume(
                RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
                true,
                deliverCallback,
                cancelCallback);
    }
}

普通消费者消费的日志:

普通队列接受到的消息是:info4
普通队列接受到的消息是:info5
普通队列接受到的消息是:info6
普通队列接受到的消息是:info7
普通队列接受到的消息是:info8
普通队列接受到的消息是:info9

4、死信消费者2代码

public class DeadConsumer2 {

    private static Logger logger = LoggerFactory.getLogger(DeadConsumer2.class);

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitmqUtils.channel;

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            String msg = new String(message.getBody(), "UTF-8");
            logger.info("死信队列接受到的消息是:{}", msg);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            logger.info(consumerTag + "消息被中断");
        };
        channel.basicConsume(
                RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
                true,
                deliverCallback,
                cancelCallback);
    }
}

死信消费者消费的日志:

死信队列接受到的消息是:info0
死信队列接受到的消息是:info1
死信队列接受到的消息是:info2
死信队列接受到的消息是:info3

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39519454/article/details/129671251

Java-RabbitMq-死信队列-爱代码爱编程

RabbitMq死信队列有三种发生情况: 消息 TTL 过期队列达到最大长度(队列满了,无法再添加数据到 mq 中)消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false. 生产者: import com.cn.mq.utils.ConnectionMq; import com.rabbitmq.client.

07-rabbitMQ-死信队列-爱代码爱编程

一、死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死

RabbitMq(五) -- 死信队列和延迟队列-爱代码爱编程

1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,

RabbitMQ:死信队列+延迟队列-爱代码爱编程

文章目录 1、死信队列1.1、概念1.2、死信的来源1.3、死信实战1.3.1、消息TTL过期1.3.2、队列达到最大长度1.3.3、消息被拒2、延迟队列2.1、概念2.2、延迟队列使用场景2.3、整合SpringBoot2.3.1、添加依赖2.3.2、修改配置文件2.3.3、添加Swagger配置类2.4、队列TTL2.4.1、配置文件类2.4.

rabbitmq之死信队列_codepanda@gpf的博客-爱代码爱编程

文章目录 1. 死信队列概念2. 消息 TTL 过期3. 队列达到最大长度4. 消息被拒 1. 死信队列概念 死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue

7.rabbitmq死信和死信队列_machoul的博客-爱代码爱编程

rabbitmq死信和死信队列 概述 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有

rabbitmq的死信队列和延迟队列_pmc0_0的博客-爱代码爱编程

文章目录 死信队列死信原因代码架构图模拟TTL过期模拟队列达到最大长度模拟消息被拒 延迟队列延迟队列使用场景RabbitMQ 中的 TTL整合SpringBoot依赖和配置 代码架构图延时队列TTL优化优化

rabbitmq-死信队列、延迟队列(原生+springboot+插件实现)_萌萌虎儿的博客-爱代码爱编程

目录 一、死信队列 1.1 概念 1.2 来源 1.3 演示 二、延迟队列 2.1 TTL-消息最大存活时间 2.2 在SpringBoot中演示延迟队列与死信队列 2.2.1 基本演示 2.2.2 优化-动态设置TTL 2.2.3 使用插件实现延迟队列 2.3 总结 一、死信队列 1.1 概念        死信顾名思义

rabbitmq-死信队列_四哥,喔喔喔的博客-爱代码爱编程

死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取

rabbitmq学习-死信队列_子非吾喵的博客-爱代码爱编程

死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致queue中的某些消息无法

rabbitmq------死信队列(消息超时、达到最大长度、消费拒绝)(六)_诗与猿方的博客-爱代码爱编程

RabbitMQ------死信队列(六) 死信的概念 死信:无法被消费的消息,一般情况下:生产者将消息投递到broker或者直接到queue中,消费者从queue取出消息进行消费,但是某些时候,由于特定原因导致queu

rabbitmq初步到精通-第六章-rabbitmq之死信队列_mr-昊哥的博客-爱代码爱编程

目录 第六章-RabbitMQ之死信队列 1. 死信概念 2. 死信架构 3. 死信来源 3.1 消息 TTL 过期 3.2 队列达到最大长度(队列满了,无法再添加数据到 mq 中) 3.3 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队) 4. 验证代码 4.1 TTL

rabbitmq死信队列_乔木先生i的博客-爱代码爱编程

死信,就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被

rabbitmq-死信队列-爱代码爱编程

  一、介绍          死信,顾名思义就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。        应用场景:为

谈谈rabbitmq中的死信队列-爱代码爱编程

那什么消息是死信(dead message)呢? 被拒绝(basic.reject或basic.nack)并且requeue=false的消息 TTL过期消息的消息 队列达到最大长度(队列满了,无法再添加数据到mq中)的消息 DLX(Dead Letter Exchanges)死信队列,本身也是一个普通的消息队列,在创建队列的时候,通过设置一些

rabbit之服务异步通讯-爱代码爱编程

目录 1. 同步通讯和异步通讯 1.1 同步通信与异步通信区别: 1.2 同步调用的问题  1.2.1 同步调用的缺点 1.2.2 总结  1.3 异步调用方案 1.3.1 异步调用常见实现就是事件驱动模式  1.3.2 事件驱动优势 1.3.3 总结 1.4 快速入门MQ  1.4.1 RabbitMQ概述 1.4.2 总结