rabbitmq之死信队列_codepanda@gpf的博客-爱代码爱编程
1. 死信队列概念
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列
死信来源:
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false,不回放到消息队列中)
2. 消息 TTL 过期
生产者代码:
package cn.edu.xd.five;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置消息的TTL时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i=1;i<=10;i++){
String msg="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes());
System.out.println("生产者发送消息:"+msg);
}
}
}
消费者1代码。消费者1代码中将正常交换机和死信交换机建立了联系,当正常队列中的消息无法被消费超时时,消息就会被移动到死信队列中
package cn.edu.xd.five;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
private static final String NORMAL_EXCHANGE="normal_exchange";
private static final String DEAD_EXCHANGE="dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
//声明死信交换机和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue="dead_queue";
channel.queueDeclare(deadQueue,false,false,false,null);
channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
//声明普通队列
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("Consumer01等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Consumer01从Q1接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
channel.basicConsume(normalQueue,true,deliverCallback,cancelCallback);
}
}
运行:先运行Consumer1, 然后将其关闭,此时打开web管理界面可以看到产生了两个队列,然后开启Consumer02, 然后再开启Producer, 等待10s钟,消息由正常队列移动到了死信队列中
3. 队列达到最大长度
生产者代码修改:
消费者1代码修改:
运行:启动消费者1,关闭,然后依次启动消费者2和生产者,从运行结果中可以看到消费者2从死信队列中消费了4条消息
4. 消息被拒
生产者代码和3中一样
修改生产者1代码,取消自动应答,并修改为拒绝消息info5, 同时取消3中设置的队列长度为6
运行:启动消费者1,然后启动消费者2,然后启动生产者