代码编织梦想

死信队列

死信就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,死信队列就是处理死信的。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效


死信原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

代码架构图

在这里插入图片描述

模拟TTL过期

package cn.com.mq.dead;

import cn.com.mq.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 - 消费者01
 *	消费正常队列,如果出现死信消息,要将它们从正常队列中通过死信交换机传送到死信队列
 * @author pangjian
 */
public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        // 声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明死信队列,并绑定:队列、交换机、路由键(routingKey)
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        // 正常队列绑定死信队列信息,为了消息不能被消费称为死信后传入死信交换机,并由交换机传入死信队列
        Map<String, Object> params = new HashMap<>();
        // 正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        // 正常队列,并绑定:队列、交换机、路由键(routingKey),并传入最后一个参数params
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
    }

}
package cn.com.mq.dead;

import cn.com.mq.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author pangjian
 * @ClassName Consumer02
 * @Description 消费死信队列的消息
 * @date 2022/8/7 16:20
 */

public class Consumer02 {

    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }


}
package cn.com.mq.dead;

import cn.com.mq.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

/**
 * @author pangjian
 * @ClassName producer
 * @Description 生产者,要设置消息过期时间,模拟消息过期后消息变成死信
 * @date 2022/8/7 16:18
 */

public class Producer {

    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 设置消息的 TTL 时间 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        // 该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }

    }

}

启动 C1 ,之后关闭消费者,模拟其接收不到消息。再启动 Producer

在这里插入图片描述

模拟队列达到最大长度

先删除刚刚存在的正常队列,因为参数不一样了,这里设置最大长度为6,也就是发10个,有4个会成为死信

  • 修改C1
// 正常队列绑定死信队列信息,为了消息不能被消费称为死信后传入死信交换机,并由交换机传入死信队列
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// 设置正常队列长度,这里设置为5
params.put("x-max-length", 6);
  • 修改Producer,注释掉过期时间
// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
  • 结果
    在这里插入图片描述

模拟消息被拒

package cn.com.mq.dead;

import cn.com.mq.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * @author pangjian
 * @ClassName Consumer03
 * @Description 模拟拒绝info5
 * @date 2022/8/7 16:54
 */

public class Consumer03 {

    // 普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        // 声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        // 死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        // 正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        // 正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        // 正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息........... ");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                // requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }

        };
        // 开启手动应答
        channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});
    }

}

延迟队列

延迟队列是基于死信队列TTL过期时间演变来的,延时队列中的元素是希望在指定时间到了以后或之前取出和处理

延迟队列使用场景

  1. 订单在十分钟之内未支付则自动取消

基于死信队列抽象说明就是用户下订单,订单信息(设置10分钟过期时间,这个有可能不是10分钟判定,建议设置队列过期)并进入正常队列(或者设置队列过期为10分钟),如果10分钟内付款了则正常处理,不然就进入死信队列,死信消费者对死信订单消息进行取消

  1. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  2. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  3. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  4. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议、

RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

  • 队列设置TTL
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
  • 消息设置TTL
// 设置消息的 TTL 时间 10s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

区别:如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间(因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。);另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

整合SpringBoot

依赖和配置

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--RabbitMQ 依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--swagger-->
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger2</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger-ui</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!--RabbitMQ 测试依赖-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.publisher-confirm-type=correlated

代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

在这里插入图片描述

  • 配置类
package cn.com.pang.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author pangjian
 * @ClassName TtlQueueConfig
 * @Description MQ配置类
 * @date 2022/8/7 17:31
 */
@Configuration
public class TtlQueueConfig {

    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //死信队列
    public static final String DEAD_LETTER_QUEUE = "QD";

    // 声明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 声明 死信队列交换机
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    // 声明队列 A ttl 为 10s 并绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    // 声明队列 A 绑定 X 交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    // 声明队列 B ttl 为 40s 并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        // 声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        // 声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }

    // 声明死信队列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    // 声明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}
  • 生产者
package cn.com.pang.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
/**
 * @author pangjian
 * @ClassName SendMsgController
 * @Description TODO
 * @date 2022/8/7 19:54
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
    }

}
  • 延时消费者(死信消费者)
package cn.com.pang.controller;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;
/**
 * @author pangjian
 * @ClassName DeadLetterQueueConsumer
 * @Description TODO
 * @date 2022/8/7 19:55
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }

}
# 发送消息
http://localhost:8080/ttl/sendMsg/嘻嘻嘻

在这里插入图片描述

延时队列TTL优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

优化后代码架构图

在这里插入图片描述

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,我们设置消息的过期时间

@Configuration
public class MsgTtlQueueConfig {
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String QUEUE_C = "QC";

    //声明队列 C 死信交换机
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //没有声明 TTL 属性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}
/**
 * 延时队列优化
 * @param message 消息
 * @param ttlTime 延时的毫秒
 */
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
    rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
        correlationData.getMessageProperties().setExpiration(ttlTime);
        return correlationData;
    });
    log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
}
// 发请求
http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好2/2000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

Rabbitmq 插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。

#可去官网下载  rabbitmq_delayed_message_exchange 插件,放置到 RabbitMQ 的插件目录。
#安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重启服务
systemctl restart rabbitmq-server

在这里插入图片描述

架构图

在这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

在这里插入图片描述

代码实现

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

在这里插入图片描述

第二条消息并不会受第一条TTL影响,第二个消息被先消费掉了,符合预期

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/pmc0_0/article/details/126212561

rabbitmq实现死信队列/延迟队列/优先级队列_天瑕的博客-爱代码爱编程

1 死信队列 / 延迟队列         死信队列指的是因为某些原因,队列中的某些消息变成了死信(dead letter)后,它们被重新路由到死信交换器(DLX)绑定的队列上,该队列即为死信队列。我们可以监听该死信队列中的消息,以进行相应的处理。         消息变为死信的原因一般有以下三种: 消息被拒绝,并且设置requeue参数为false

springboot rabbitmq死信队列与延迟队列实战-爱代码爱编程

以下例子代码可在github或者在gitee下载 github:代码链接 gitee:代码链接在上一篇博文中提到,在消息确认消费的过程中,即消息处理过程中出现了异常,为避免消息重新归入队列又继续异常,也为了避免消息不归入队列而把消息丢弃掉,那么可以采用死信队列来处理该情况,当然这个也要结合实际场景,也不一定非要用死信队列,之前遇到过的场景就没采用死信队列,

五、RabbitMQ死信队列和延迟队列-爱代码爱编程

文章目录 RabbitMQ死信队列和延迟队列RabbitMQ死信队列死信队列及TTL使用RabbitMQ控制台做一个死信队列测试Rabbit的延迟队列Rabbit延时队列样例 RabbitMQ死信队列和延迟队列 RabbitMQ死信队列 死信队列及TTL 什么是TTL? time to live 消息存活时间如果消息在存活时间内未被消费

RabbitMQ死信队列和延时队列-爱代码爱编程

本文从本人博客搬运,原文格式更加美观,可以移步原文阅读:RabbitMQ死信队列和延时队列 死信队列 1.死信概念 死信,顾名思义就是无法被消费的消息。一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没

RabbitMQ 死信队列 延迟队列-爱代码爱编程

死信队列 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死

rabbitMQ死信队列、延迟队列、消息积压、幂等性、集群-爱代码爱编程

死信队列 英文缩写 DLX 全称Dead Letter Exchange(读音 带的 来特 一可死陈只 死信交换机) 别的mq没有交换机这个概念 所以叫做死信队列 在tabbitMq中可以叫做死信交换机 根据名字可以看出 这个队列用来保存 被丢弃或者被删除的信息等 被丢弃被删除的信息跟死了一样 简称死信 把这些数据放到一个队列中 称为死信队列 当一个消息

rabbitmq 死信队列-爱代码爱编程

1.什么是死信队列  2.实现死信队列 2.1.下载死信队列插件 下载地址:https://www.rabbitmq.com/community-plugins.html 2.2 进入下载列表  2.3 选择版本    2.4 下载 我自己安装的3.8.19  所有我选择与我版本接近的3.8.17 2.5 将下载的文件上传

RabbitMq死信队列和延时队列-爱代码爱编程

  RabbitMq提供了一个死信队列的功能,可以在消息被拒绝或者过期后还没消费时,扔到死信队列里面,然后通过邮件告警等方式来手工处理. 进入死信队列的三种方式 1.消息被Basic.reject()或者Basic.nack().并且request设置为false,即不重试 2.消息设置了过期时间,并且过期时间到了还没有被消费 3.消息队列的长度达

RabbitMq 死信队列-爱代码爱编程

死信队列 死信,顾名思义就是无法被消费的信息。一般来说,producer将消息投递到broker或者直接到queue里,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信(Dead Letter),所有的死信都会放到死信队列中。      应用场景:为了

支付超时订单关单策略之RabbitMQ死信队列方式-爱代码爱编程

目录 一、需求背景之为什么要有超时关单 二、所以多数订单业务都是会有这个功能,那如何设计呢? ​三、RabbitMQ死信队列-延迟消息知识点回顾 3.1什么是rabbitmq的死信队列 3.2什么是rabbitmq的死信交换机  3.3消息有哪几种情况成为死信 3.4什么是延迟队列 四、代码环节  4.1 Rabbitmq死信队列配置  

springcloud:RabbitMQ死信队列与延迟交换机实现(四)-爱代码爱编程

0.引言 死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306中的30分钟后未支付订单取消。那么本期,我们就来讲解死信队列,以及如何通过延迟交换机来实现延迟发送的需求。 1. 死信队列 1.2 什么是死信? 理解死信队列前,我们先讲解什么是死信,所谓死信就是没有被成功消费的消息,但并不是所有未成功消费的消息

rabbitmq死信队列以及延迟_channel.queue_declare(queue='dead_letter_queue')-爱代码爱编程

死信队列介绍 死信(Dead Letter)是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况: 1)消息被拒绝 2)消息在队列的存活时间超过设置的TTL时间。 3)消息队列的消息数量已经

rabbitmq(五)死信队列、延迟队列_死信队列和延时队列的区别-爱代码爱编程

一、死信队列概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死