代码编织梦想

1. 死信

1.1 死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:

  • 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
    • 后续等到环境好了之后,再消费死信队列中的消息
  • 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
    • 当做延迟队列来处理,死信可以在指定时间内被消费者消费

1.2 死信的来源

  1. 消息TTL过期
  2. 队列达到最大长度(队列满了,无法再添加数据到mq中)
  3. 消息被拒绝(basci.reject或者basic.nack)并且requeue = false

1.3 死信队列也满了怎么办?

如果出现死信队列和普通队列都满的情况,此时考虑消费者消费能力不足,可以对消费者开多线程进行处理。

1.3 死信实战

1.3.1 TTL过期

架构图
在这里插入图片描述
2个交换机和2个队列都是C1创建。

1. 消费者1接口:
public static void main(String[] args) throws Exception {
    Channel channel = MqConnectUtil.getChannel();
    // 声明死信和普通交换机,类型为direct
    channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    // 声明普通队列
    Map<String, Object> arguments = new HashMap<>();
    // 过期时间
    arguments.put("dead-ttl", 10000);
    // 正常队列设置死信交换机
    arguments.put("dead-exchange", DEAD_EXCHANGE);
    // 设置死信routing-key
    arguments.put("dead-routingKey", "lisi");
    channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

    // 声明死信队列
    channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

    // 绑定普通的交换机与队列
    channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
    // 绑定死信的交换机与队列
    channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

    System.out.println("等待接收消息。。。。。");

    channel.basicConsume(NORMAL_QUEUE, true, (counsumerTag, message) -> {
        System.out.println("consumer01接收的消息是:" + new String(message.getBody(), "UTF-8"));
    }, (cancelCallback) -> {});
}
2. 生产者接口:
public static void main(String[] args) throws Exception {
    Channel channel = MqConnectUtil.getChannel();

    // 死信消息,设置TTL时间 time to live
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

    for (int i = 0; i < 10; i++) {
        String msg = i + "";
        channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, msg.getBytes());
    }
}
3. 启动消费者

查看客户端数据:

队列:其中features中的数据是死信队列名称和死信队列routingKey
在这里插入图片描述

交换机:

在这里插入图片描述
查看交换机绑定:
在这里插入图片描述
在这里插入图片描述

4. 关闭消费者,启动生产者

普通队列中:

在这里插入图片描述
10秒后到了死信队列:
在这里插入图片描述

5. 消费者2接口
public static void main(String[] args) throws Exception {
    Channel channel = MqConnectUtil.getChannel();
    System.out.println("等待接收消息。。。。。");

    channel.basicConsume(DEAD_QUEUE, true, (counsumerTag, message) -> {
        System.out.println("consumer02接收的消息是:" + new String(message.getBody(), "UTF-8"));
    }, cancelCallback -> {});
}

启动消费者2:可以看到死信队列中的消息被消费者2给接收了

等待接收消息。。。。。
consumer02接收的消息是:0
consumer02接收的消息是:1
consumer02接收的消息是:2
consumer02接收的消息是:3
consumer02接收的消息是:4
consumer02接收的消息是:5
consumer02接收的消息是:6
consumer02接收的消息是:7
consumer02接收的消息是:8
consumer02接收的消息是:9

1.3.2 队列达到最大长度:

  1. 生产者代码中去掉TTL属性相关代码,即basicProperties设置为null
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes());
  1. C1消费者设置队列的最大接收消息数:
// 设置正常队列长度的限制
arguments.put("x-max-length", 6);
  1. 删除原有的队列,重新启动消费者
  2. 查看客户端:
    在这里插入图片描述
  3. 关闭消费者,启动生产者:
  4. 查看客户端:可以看到由于普通队列最大长度为6,所以有4个到了死信队列
    在这里插入图片描述

1.3.3 消息被拒

  1. 注释刚刚的队列长度,并把之前的消息消费掉
  2. 删除队列,不需要队列长度限制
  3. 修改消费消息的代码:将自动应答改为手动应答,并加msg=5的消息拒绝
channel.basicConsume(NORMAL_QUEUE, false, (counsumerTag, message) -> {
    String msg = new String(message.getBody(), "UTF-8");
    if (msg.equals("5")) {
        System.out.println("consumer02拒收此消息:" + msg);
        channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
    } else {
        System.out.println("consumer01接收的消息是:" + msg);
        // 改为手动应答
        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    }
}, (cancelCallback) -> {});
  1. 启动生产者生产消息
  2. 消费者消费结果:
等待接收消息。。。。。
consumer01接收的消息是:0
consumer01接收的消息是:1
consumer01接收的消息是:2
consumer01接收的消息是:3
consumer01接收的消息是:4
consumer02拒收此消息:5
consumer01接收的消息是:6
consumer01接收的消息是:7
consumer01接收的消息是:8
consumer01接收的消息是:9
  1. 客户端:死信队列中有一个
    在这里插入图片描述
  2. 启动消费端2消费此消息:
等待接收消息。。。。。
consumer02接收的消息是:5

2. 延迟队列

即死信队列的一种(TTL策略)。
在这里插入图片描述
此图中,生产者生产消息,同时设置过期时间为10s,此消息在normal队列中停留10到,超时被发送到死信交换机,再被发到dead队列,然后被C2消费。

我们不看normal队列,只看生产者和消费者链路,就会觉得此消息在经过10s后被消费。

2.1 概念

延时队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

2.2 使用场景

  1. 订单在十分钟之内未支付则自动取消;
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒;
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒;
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员;
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭。那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于 “如果账单一周内未支付则进行自动结算” 这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭 “,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下
在这里插入图片描述

2.3 RabbitMQ中的TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为” 死信”。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL

  • 队列设置 TTL:在创建队列的时候设置队列的 “x-message-ttl” 属性
    在这里插入图片描述

  • 消息设置 TTL:是针对每条消息设置 TTL
    在这里插入图片描述
    两者的区别

  • 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃 (如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

  • 另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

2.3 代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
在这里插入图片描述
生产者根据不同的任务选择不同的routingkey来达到延迟不同时间的效果。

2.4 创建队列和交换机的配置类

@Configuration
public class TtlQueueConfig {
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //死信队列
    public static final String DEAD_LETTER_QUEUE = "QD";

    // 声明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 声明 死信队列交换机
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    // 声明队列 A 绑定 X 交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }

    //声明死信队列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    //声明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

2.5 延迟队列实战

2.5.3 生产者编写
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
    }
}
2.5.2 消费者编写
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列消息:{}", new Date().toString(), msg);
    }
}
2.5.3 测试结果

发送消息:
在这里插入图片描述
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。

2.5.4 存在的问题

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求

2.6 延迟队列优化

在这里新增了一个公用队列 QC,绑定关系如下,该队列不设置 TTL 时间

在这里插入图片描述
代码实战:

2.6.1 在配置类中添加QC队列
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";

//声明队列 C 死信交换机
@Bean("queueC")
public Queue queueC() {
    Map<String, Object> args = new HashMap<>(3);
    //声明当前队列绑定的死信交换机
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
    //声明当前队列的死信路由 key
    args.put("x-dead-letter-routing-key", "YD");
    //没有声明 TTL 属性
    return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}

//声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                              @Qualifier("xExchange") DirectExchange xExchange) {
    return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
2.6.2 生产者编写
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
    rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
        correlationData.getMessageProperties().setExpiration(ttlTime);
        return correlationData;
    });
    log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
}
2.6.3 结果:

http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000
在这里插入图片描述

2.6.4 存在的问题:

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时 “死亡 “。

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。这也就是为什么第二个延时 2 秒,却后执行。

此外,我们还可以通过 Rabbitmq 插件实现延迟队列。

2.7Rabbitmq插件实现延迟队列

2.7.1 安装

在官网下载:延迟队列插件
下载到rabbitmq中的plugins目录,然后进行安装:(注意不带后面的版本号)

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装成功:

[root@yhx plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@yhx:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@yhx...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.

重启rabbitmq,可以看到交换机多了第五种类型:
在这里插入图片描述

2.7.2 基于延迟交换机的延迟队列:

生产者将消息直接分给延迟交换机,延迟交换机在经过延迟时间后将消息分发。

只需要延迟交换机和队列就可以实现。
在这里插入图片描述

2.7.3 基于插件的延迟消息代码编写:
  1. 新增了一个队列delayed.queue,一个自定义交换机delayed.exchange,绑定关系如下:
    • 在我们自定义的交换机中,这是一种新的 交换类型,该类型消息支持延迟投递机制:消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
      在这里插入图片描述
  2. 配置文件代码:
@Configuration
public class DelayedQueueConfig {
    // 延迟交换机
    public static final String DELAYED_EXCHANGE = "delayed.exchange";
    //队列
    public static final String DELAYED_QUEUE = "delayed.queue";
    // 路由
    public static final String DELAYED_ROUTINGKEY = "delayed.routingKey";

    // 声明交换机
    @Bean("delayedExchange")
    public CustomExchange delayedExchange() {
        HashMap<String, Object> argument = new HashMap<>();
        argument.put("x-delayed-message""direct");
        /**
         * 1. 交换机名称
         * 2. 类型
         * 3. 是否需要持久化
         * 4. 是否需要自动删除
         * 5. 其他的参数
         */
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, argument);
    }

    //声明队列
    @Bean("delayedQueue")
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE);
    }
    //声明队列 绑定 交换机
    @Bean
    public Binding delayedBinding(@Qualifier("delayedQueue") Queue delayedQueue,
                                  @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTINGKEY).noargs();
    }
}
  1. 生产者:
@GetMapping("sendDelayedMsg/{message}/{ttlTime}")
public void sendDelayedMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTINGKEY, message, correlationData -> {
        // 发送消息设置延迟时间
        correlationData.getMessageProperties().setDelay(delayTime);
        return correlationData;
    });
    log.info("当前时间:{},发送一条时长{}毫秒 延迟信息给队列 C:{}", new Date(), delayTime, message);
}
  1. 消费者:编写消息监听
// 监听消息
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE)
public void receiveD(Message message) {
    String msg = new String(message.getBody());
    log.info("当前时间:{},收到延迟队列消息:{}", new Date().toString(), msg);
}
  1. 结果:发送2个消息,可以看到虽然消息2后发,但是消息2先被接收了
    1. http://localhost:8091/ttl/sendDelayedMsg/%E4%BD%A0%E5%A5%BD1/20000
    2. http://localhost:8091/ttl/sendDelayedMsg/%E4%BD%A0%E5%A5%BD2/2000
SendMsgController   : 当前时间:Sun Mar 06 16:42:05 CST 2022,发送一条时长20000毫秒 延迟信息给队列 C:你好1
SendMsgController   : 当前时间:Sun Mar 06 16:42:11 CST 2022,发送一条时长2000毫秒 延迟信息给队列 C:你好2
DelayedLetterQueueConsumer     : 当前时间:Sun Mar 06 16:42:13 CST 2022,收到延迟队列消息:你好2
DelayedLetterQueueConsumer     : 当前时间:Sun Mar 06 16:42:25 CST 2022,收到延迟队列消息:你好1

2.8 延迟队列总结:

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39724194/article/details/123270059

RabbitMQ-----------死信队列和延迟队列-爱代码爱编程

目录 死信队列延迟队列 死信队列 一、首先知道什么是死信消息 1.消息被决绝或者未签收,并且没有重新返回到队列中(requeue=false) 2.消息过期 3.队列达到最大长度 死信消息就会出现在死信队列里 二、消费端配置 与之前不同主要声明queue时arguments参数,x-dead-letter-exchange(交换机),x-de

RabbitMQ--过期时间TTL、死信队列、延迟队列和优先队列-爱代码爱编程

过期时间TTL TTL,Time to Live的简称,即过期时间。RabbitMQ 可以对消息和队列设置TTL。 设置消息的TTL: 目前有两种方法可以设置消息的TTL。 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL以两者之间较小

RabbitMq死信队列/延迟队列-爱代码爱编程

什么是死信队列(DLX) 死信队列又被称为延迟队列、延时队列,也是RabbitMq队列中的一种,利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange, 这个Exchange就是DLX。 死信队列能在任何的队列上被指定,实际上就是设置某个队列的属性,标识这个队列是否需要死信队列的存在。当

RabbitMq--延迟队列(死信实现)-爱代码爱编程

RabbitMq–延迟队列(死信实现) 1、 环境 java 8Spring Boot 2.26RabbitMq 3.5.42、架构图 3、代码 //配置 spring: rabbitmq: host: localhost port: 5672 username: xxx password: xxx

springboot整合rabbitmq消息可靠投递-死信队列-延迟队列-爱代码爱编程

第一步,添加依赖 消费端 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

RabbitMQ 学习笔记 -- 12 死信队列 DLX + TTL 方式实现延迟队列-爱代码爱编程

死信队列 DLX + TTL 方式实现延迟队列 延迟队列与死信队列时息息相关的,它具有特点: 队列,意味着内部的元素是有序的,元素的出队和入队是有方向性的,元素从一端进入,从另一端取出 延时,这是最重要的特性,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望等待特定时间后,消费者才能拿到这个消息进行消费。 TTL TTL 是

RabbitMQ(一)使用死信队列实现延迟队列消费-爱代码爱编程

在使用rabbitmq实现延迟消费时,需要先明白什么是死信队列,什么是延迟队列。 rabbitmq并没有直接支持延迟队列,延迟队列是通过死信队列来实现的。 死信队列 DLX(Dead Letter Exchange),死信交换器。当队列中的消息被拒绝、或者过期会变成死信,死信可以被重新发布到另一个交换器,这个交换器就是DLX,与DLX绑定的队列称为死信

RabbitMQ----死信队列和延迟队列-爱代码爱编程

RabbitMQ----死信队列和延迟队列   先想一想,如果你有一个快递,一直尝试给你派送,但是一直联系不上你,签收不了,那么此时这个快递该如何处理?是再次尝试派送,还是处理掉?   现在引入一个名词,死信,顾名思义:就是不能被消费的信息,字面意思可以这样理解,生产者(发送方)将信息传递到交换机,交换机将信息发送到队列中,消费者从队列中取出信息并消费

五、RabbitMQ死信队列和延迟队列-爱代码爱编程

文章目录 RabbitMQ死信队列和延迟队列RabbitMQ死信队列死信队列及TTL使用RabbitMQ控制台做一个死信队列测试Rabbit的延迟队列Rabbit延时队列样例 RabbitMQ死信队列和延迟队列 RabbitMQ死信队列 死信队列及TTL 什么是TTL? time to live 消息存活时间如果消息在存活时间内未被消费

Rabbitmq---死信、延迟、优先级队列-爱代码爱编程

什么是死信队列 ​ DLX,全称Dead-Letter-Exchange,可以称之为死信交换机,也可以称为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另外一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列。 ​ 消息变成死信一般由于以下几种情况: 1. 消息被拒绝,并且设置requeue

Rabbitmq高级特性-TTL-死信队列-延迟队列 基本认识-爱代码爱编程

TTL Time To Live,消息过期时间设置 配置文件 /*** TTL:过期时间 * 1. 队列统一过期 ** 2. 消息单独过期 ** * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。 * 队列过期后,会将队列所有消息全部移除。 * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉) * */

rabbitmq的延迟队列和死信队列-爱代码爱编程

一、延迟队列和死信队列 死信队列:元素产生后没及时的被消费,一直存放在队列中。 延迟队列:延时队列,队列内部是有顺序的,其最重要的特性是延迟时间,是希望在指定时间到了以后或之前取出和处理,类似一个定时任务,但是比定时任务要节省资源。延时队列其实就是特殊的死信队列。 但是队列的特点是先进先出,如果对应不同的元素有不同的时间,那么如果存在延迟时间小的元素