RabbitMq(五) -- 死信队列和延迟队列-爱代码爱编程
1. 死信
1.1 死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:
- 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
- 后续等到环境好了之后,再消费死信队列中的消息
- 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
- 当做延迟队列来处理,死信可以在指定时间内被消费者消费
1.2 死信的来源
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
- 消息被拒绝(basci.reject或者basic.nack)并且requeue = false
1.3 死信队列也满了怎么办?
如果出现死信队列和普通队列都满的情况,此时考虑消费者消费能力不足,可以对消费者开多线程进行处理。
1.3 死信实战
1.3.1 TTL过期
架构图
2个交换机和2个队列都是C1创建。
1. 消费者1接口:
public static void main(String[] args) throws Exception {
Channel channel = MqConnectUtil.getChannel();
// 声明死信和普通交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明普通队列
Map<String, Object> arguments = new HashMap<>();
// 过期时间
arguments.put("dead-ttl", 10000);
// 正常队列设置死信交换机
arguments.put("dead-exchange", DEAD_EXCHANGE);
// 设置死信routing-key
arguments.put("dead-routingKey", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 绑定普通的交换机与队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
// 绑定死信的交换机与队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收消息。。。。。");
channel.basicConsume(NORMAL_QUEUE, true, (counsumerTag, message) -> {
System.out.println("consumer01接收的消息是:" + new String(message.getBody(), "UTF-8"));
}, (cancelCallback) -> {});
}
2. 生产者接口:
public static void main(String[] args) throws Exception {
Channel channel = MqConnectUtil.getChannel();
// 死信消息,设置TTL时间 time to live
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String msg = i + "";
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, msg.getBytes());
}
}
3. 启动消费者
查看客户端数据:
队列:其中features中的数据是死信队列名称和死信队列routingKey
交换机:
查看交换机绑定:
4. 关闭消费者,启动生产者
普通队列中:
10秒后到了死信队列:
5. 消费者2接口
public static void main(String[] args) throws Exception {
Channel channel = MqConnectUtil.getChannel();
System.out.println("等待接收消息。。。。。");
channel.basicConsume(DEAD_QUEUE, true, (counsumerTag, message) -> {
System.out.println("consumer02接收的消息是:" + new String(message.getBody(), "UTF-8"));
}, cancelCallback -> {});
}
启动消费者2:可以看到死信队列中的消息被消费者2给接收了
等待接收消息。。。。。
consumer02接收的消息是:0
consumer02接收的消息是:1
consumer02接收的消息是:2
consumer02接收的消息是:3
consumer02接收的消息是:4
consumer02接收的消息是:5
consumer02接收的消息是:6
consumer02接收的消息是:7
consumer02接收的消息是:8
consumer02接收的消息是:9
1.3.2 队列达到最大长度:
- 生产者代码中去掉TTL属性相关代码,即basicProperties设置为null
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes());
- C1消费者设置队列的最大接收消息数:
// 设置正常队列长度的限制
arguments.put("x-max-length", 6);
- 删除原有的队列,重新启动消费者
- 查看客户端:
- 关闭消费者,启动生产者:
- 查看客户端:可以看到由于普通队列最大长度为6,所以有4个到了死信队列
1.3.3 消息被拒
- 注释刚刚的队列长度,并把之前的消息消费掉
- 删除队列,不需要队列长度限制
- 修改消费消息的代码:将自动应答改为手动应答,并加msg=5的消息拒绝
channel.basicConsume(NORMAL_QUEUE, false, (counsumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if (msg.equals("5")) {
System.out.println("consumer02拒收此消息:" + msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("consumer01接收的消息是:" + msg);
// 改为手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, (cancelCallback) -> {});
- 启动生产者生产消息
- 消费者消费结果:
等待接收消息。。。。。
consumer01接收的消息是:0
consumer01接收的消息是:1
consumer01接收的消息是:2
consumer01接收的消息是:3
consumer01接收的消息是:4
consumer02拒收此消息:5
consumer01接收的消息是:6
consumer01接收的消息是:7
consumer01接收的消息是:8
consumer01接收的消息是:9
- 客户端:死信队列中有一个
- 启动消费端2消费此消息:
等待接收消息。。。。。
consumer02接收的消息是:5
2. 延迟队列
即死信队列的一种(TTL策略)。
此图中,生产者生产消息,同时设置过期时间为10s,此消息在normal队列中停留10到,超时被发送到死信交换机,再被发到dead队列,然后被C2消费。
我们不看normal队列,只看生产者和消费者链路,就会觉得此消息在经过10s后被消费。
2.1 概念
延时队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
2.2 使用场景
- 订单在十分钟之内未支付则自动取消;
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒;
- 用户注册成功后,如果三天内没有登陆则进行短信提醒;
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员;
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭。那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于 “如果账单一周内未支付则进行自动结算” 这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。
但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭 “,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下
2.3 RabbitMQ中的TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为” 死信”。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL
-
队列设置 TTL:在创建队列的时候设置队列的 “x-message-ttl” 属性
-
消息设置 TTL:是针对每条消息设置 TTL
两者的区别 -
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃 (如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
-
另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
2.3 代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
生产者根据不同的任务选择不同的routingkey来达到延迟不同时间的效果。
2.4 创建队列和交换机的配置类
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//死信队列
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明 死信队列交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
2.5 延迟队列实战
2.5.3 生产者编写
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
}
}
2.5.2 消费者编写
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列消息:{}", new Date().toString(), msg);
}
}
2.5.3 测试结果
发送消息:
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
2.5.4 存在的问题
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求
2.6 延迟队列优化
在这里新增了一个公用队列 QC,绑定关系如下,该队列不设置 TTL 时间
代码实战:
2.6.1 在配置类中添加QC队列
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
//声明队列 C 死信交换机
@Bean("queueC")
public Queue queueC() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
2.6.2 生产者编写
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
}
2.6.3 结果:
http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000
2.6.4 存在的问题:
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时 “死亡 “。
因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。这也就是为什么第二个延时 2 秒,却后执行。
此外,我们还可以通过 Rabbitmq 插件实现延迟队列。
2.7Rabbitmq插件实现延迟队列
2.7.1 安装
在官网下载:延迟队列插件
下载到rabbitmq中的plugins目录,然后进行安装:(注意不带后面的版本号)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装成功:
[root@yhx plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@yhx:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@yhx...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
重启rabbitmq,可以看到交换机多了第五种类型:
2.7.2 基于延迟交换机的延迟队列:
生产者将消息直接分给延迟交换机,延迟交换机在经过延迟时间后将消息分发。
只需要延迟交换机和队列就可以实现。
2.7.3 基于插件的延迟消息代码编写:
- 新增了一个队列delayed.queue,一个自定义交换机delayed.exchange,绑定关系如下:
- 在我们自定义的交换机中,这是一种新的 交换类型,该类型消息支持延迟投递机制:消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
- 在我们自定义的交换机中,这是一种新的 交换类型,该类型消息支持延迟投递机制:消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
- 配置文件代码:
@Configuration
public class DelayedQueueConfig {
// 延迟交换机
public static final String DELAYED_EXCHANGE = "delayed.exchange";
//队列
public static final String DELAYED_QUEUE = "delayed.queue";
// 路由
public static final String DELAYED_ROUTINGKEY = "delayed.routingKey";
// 声明交换机
@Bean("delayedExchange")
public CustomExchange delayedExchange() {
HashMap<String, Object> argument = new HashMap<>();
argument.put("x-delayed-message""direct");
/**
* 1. 交换机名称
* 2. 类型
* 3. 是否需要持久化
* 4. 是否需要自动删除
* 5. 其他的参数
*/
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, argument);
}
//声明队列
@Bean("delayedQueue")
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE);
}
//声明队列 绑定 交换机
@Bean
public Binding delayedBinding(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTINGKEY).noargs();
}
}
- 生产者:
@GetMapping("sendDelayedMsg/{message}/{ttlTime}")
public void sendDelayedMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTINGKEY, message, correlationData -> {
// 发送消息设置延迟时间
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 延迟信息给队列 C:{}", new Date(), delayTime, message);
}
- 消费者:编写消息监听
// 监听消息
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE)
public void receiveD(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列消息:{}", new Date().toString(), msg);
}
- 结果:发送2个消息,可以看到虽然消息2后发,但是消息2先被接收了
- http://localhost:8091/ttl/sendDelayedMsg/%E4%BD%A0%E5%A5%BD1/20000
- http://localhost:8091/ttl/sendDelayedMsg/%E4%BD%A0%E5%A5%BD2/2000
SendMsgController : 当前时间:Sun Mar 06 16:42:05 CST 2022,发送一条时长20000毫秒 延迟信息给队列 C:你好1
SendMsgController : 当前时间:Sun Mar 06 16:42:11 CST 2022,发送一条时长2000毫秒 延迟信息给队列 C:你好2
DelayedLetterQueueConsumer : 当前时间:Sun Mar 06 16:42:13 CST 2022,收到延迟队列消息:你好2
DelayedLetterQueueConsumer : 当前时间:Sun Mar 06 16:42:25 CST 2022,收到延迟队列消息:你好1
2.8 延迟队列总结:
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景