rabbitmq之死信队列-爱代码爱编程
1 概念
死信,就是无法被消费的消息。
一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,如果没有后续的处理,就变成了死信。
应用场景:为了保证订单业务的数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。或者用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
2 来源
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false
3 实战
3.1 代码架构图
3.2 消息TTL过期
import com.lv.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
/**
* @Author:晓风残月Lx
* @Date: 2022/10/25 20:31
*
* 死信队列 之 生产者
*/
public class Producer {
// 普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 死信消息 设置TTL时间 10s
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();
}
}
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)
import com.lv.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @Author:晓风残月Lx
* @Date: 2022/10/25 20:53
* 死信队列
* 消费者02
*
*/
public class Consumer02 {
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息。。。。");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer02接受的消息是:"+ new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
}
}
6.3.3 队列达到最大长度
-
消息生产者代码去掉TTL属性
import com.lv.rabbitmq.util.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; /** * @Author:晓风残月Lx * @Date: 2022/10/25 20:31 * * 死信队列 之 生产者 */ public class Producer { // 普通交换机名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 死信消息 设置TTL时间 10s // AMQP.BasicProperties properties = // new AMQP.BasicProperties() // .builder().expiration("10000").build(); for (int i = 0; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes()); } } }
-
C1 消费者修改后的代码(启动后关闭消费者,模拟其接收不到消息)
import com.lv.rabbitmq.util.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.HashMap; import java.util.Map; /** * @Author:晓风残月Lx * @Date: 2022/10/25 19:39 * <p> * 死信队列 * 消费者 1 */ public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; // 普通队列的名称 private static final String NORMAL_QUEUE = "normal_queue"; // 死信队列的名称 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] argv) throws Exception { Channel channel = RabbitUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); // 设置正常队列的长度的限制 params.put("x-max-length",6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, params); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接收到消息" + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
注意此时需要把原先队列删除 因为参数改变了
-
C2 不变
import com.lv.rabbitmq.util.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * @Author:晓风残月Lx * @Date: 2022/10/25 20:53 * 死信队列 * 消费者02 * */ public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息。。。。"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接受的消息是:"+ new String(message.getBody(),"UTF-8")); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {}); } }
6.3.4 消息被拒
-
消息生产者代码不变
import com.lv.rabbitmq.util.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; /** * @Author:晓风残月Lx * @Date: 2022/10/25 20:31 * * 死信队列 之 生产者 */ public class Producer { // 普通交换机名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 死信消息 设置TTL时间 10s // AMQP.BasicProperties properties = // new AMQP.BasicProperties() // .builder().expiration("10000").build(); for (int i = 0; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes()); } } }
-
C1消费者代码(启动后关闭该消费者 模拟其接收不到消息)
import com.lv.rabbitmq.util.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.HashMap; import java.util.Map; /** * @Author:晓风残月Lx * @Date: 2022/10/25 19:39 * <p> * 死信队列 * 消费者 1 */ public class Consumer01 { // 普通交换机名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; // 死信交换机名称 public static final String DEAD_EXCHANGE = "dead_exchange"; // 普通队列的名称 public static final String NORMAL_QUEUE = "normal_queue"; // 死信队列的名称 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明死信和普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明普通队列 Map<String, Object> params = new HashMap<>(); // 过期时间 10s //arguments.put("x-message-ttl",10000); 由生产者设置过期时间 更加灵活 // 普通队列设置死信交换机 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 设置死信RoutingKey params.put("x-dead-letter-routing-key", "lisi"); // 设置正常队列的长度的限制 // params.put("x-max-length",6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, params); // 声明死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); // 绑定普通的交换机与普通的队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); // 绑定死信的交换机与死信的队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收消息。。。。。"); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); if (msg.equals("info5")) { System.out.println("Consumer01接收的消息是:" + msg + ":此消息是被C1拒绝的"); // 被拒到死信队列 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } System.out.println("Consumer01接收的消息是:" + msg); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> { }); } }
-
C2不变(先启动 1 在启动 2)
import com.lv.rabbitmq.util.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * @Author:晓风残月Lx * @Date: 2022/10/25 20:53 * 死信队列 * 消费者02 * */ public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息。。。。"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接受的消息是:"+ new String(message.getBody(),"UTF-8")); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {}); } }