rabbitmq(五)死信队列_lvhaoit的博客-爱代码爱编程
RabbitMQ(五)死信队列
6. 死信队列
死信:无法被消费的消息,在某些时候由于特定的原因到导致了queue中某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信,有了死信,就自然有了死信队列
应用场景,为了保证订单业务的数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息发生消费异常时,将消息投入死信队列中。场景:用户在商城下单成功并点击支付后在指定时间未支付时自动失效。
6.1 死信产生的原因
1. 消息TTL过期
2. 队列达到了最大长度(队列已经排满了,无法再添加到MQ中)
3. 消息被拒绝(basic.reject 或 basic.nack)并且requeue = false
6.2 死信队列实战
6.2.1 代码架构图
6.2.2 消息TTL过期
我们可以在交换机声明处,或者队列声明处设置消息过期时间TTL,当消息超过这个时间还没有被消费,则会转入死信队列。
我们在这次实战中通过关闭消费端,由生产者发送十条信息,消息发送后没有被消费,超时后就会发送到死信队列中,由死信队列消费。
-
生产者 Producer (设置过期时间TTL,发送消息)
public class Producer { public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtil.getChannel(); //死信消息 设置TTL时间 单位是ms AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i = 1; i <= 10; i++) { String message = "info"+i; channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",properties,message.getBytes("UTF-8")); System.out.println("发送消息 "+message+" 成功"); } } }
-
普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)
public class Consumer01 { public static void main(String[] args)throws Exception { Channel channel = RabbitMqUtil.getChannel(); //声明普通交换机和死信交换机 //普通交换机 channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT); channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT); HashMap<String, Object> arguments = new HashMap<>(); //过期时间 10s 队列处也可以设置ttl 但是一般在交换机处设置 // arguments.put("x-message-ttl",10000); //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName()); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //声明普通队列 channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments); / //声明死信队列 channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null); //绑定普通队列 channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan"); //绑定死信队列 channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi"); System.out.println("等待接收消息。。。。"); //ttl接收消息 channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println("ConsumerO1接收到消息:" + msg); System.out.println("处理完成"); }, (consumerTag) -> { System.out.println("Consumer01 错误消息被中断:" + consumerTag); }); } }
-
死信队列消费者 Consumer02 (启动后关闭此队列,观察消息去向后打开消费掉死信)
/** * 死信队列, 消费者2 */ public class Consumer02 { public static void main(String[] args)throws Exception { Channel channel = RabbitMqUtil.getChannel(); //声明死信交换机 channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT); //声明死信队列 channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null); //绑定死信队列 channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi"); System.out.println("等待接收消息。。。。"); //接收消息 channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->{ System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8")); },tag->{}); } }
操作步骤
-
查看未发送消息状态下 队列状态
-
启动生产者代码 发送十条消息 此时正常队列中有十条未消费信息
-
等待时间过去10s,等待消息过期,正常队列中的消息由于没有被消费,消息进入死信队列
-
启动死信队列,可以看到过期的消息被一条一条消费。
6.2.3 队列达到最大长度
我们可以在队列声明处设置队列的长度,当队列中消息超过这个长度时,则会转入死信队列。
我们在这次实战中通过给消费者设置队列长度,然后将它关闭,不消费信息,消息发送后超过这个长度,超过这个长度的消息就会发送到死信队列中,由死信队列消费。
-
生产者 Producer
public class Producer { public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtil.getChannel(); for (int i = 1; i <= 10; i++) { String message = "info"+i; channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8")); System.out.println("发送消息 "+message+" 成功"); } } }
-
普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)
public class Consumer01 { public static void main(String[] args)throws Exception { Channel channel = RabbitMqUtil.getChannel(); //声明普通交换机和死信交换机 //普通交换机 channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT); channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT); HashMap<String, Object> arguments = new HashMap<>(); //设置队列最大长度,达到这个长度将后续消息转发给c2消费者 arguments.put("x-max-length",6); //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName()); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //声明普通队列 channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments); / //声明死信队列 channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null); //绑定普通队列 channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan"); //绑定死信队列 channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi"); System.out.println("等待接收消息。。。。"); //ttl接收消息 channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println("ConsumerO1接收到消息:" + msg); System.out.println("处理完成"); }, (consumerTag) -> { System.out.println("Consumer01 错误消息被中断:" + consumerTag); }); } }
-
死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)
/** * 死信队列, 消费者2 */ public class Consumer02 { public static void main(String[] args)throws Exception { Channel channel = RabbitMqUtil.getChannel(); //声明死信交换机 channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT); //声明死信队列 channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null); //绑定死信队列 channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi"); System.out.println("等待接收消息。。。。"); //接收消息 channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->{ System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8")); },tag->{}); } }
操作步骤
注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。
-
启动两个消费者 然后关闭,打开生产者(可以发现超出长度的消息自动转入死信队列中)
-
启动两个消费者,可以看到死信队列消费了其中四条消息
6.2.5 消息被拒绝
我们可以不使用自动应答,而采用手动应答,在手动应答中的拒绝应答,并且拒绝消息重新入队,若存在死信队列,则消息会转入死信队列。
我们在这次实战中通过在消息应答里拒绝一部分应答,并阻止消息重新入队,这条消息就会发送到死信队列中,由死信队列消费。
-
生产者 Producer
public class Producer { public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtil.getChannel(); for (int i = 1; i <= 10; i++) { String message = "info"+i; channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8")); System.out.println("发送消息 "+message+" 成功"); } } }
-
普通消费者 Consumer01
public class Consumer01 { public static void main(String[] args)throws Exception { Channel channel = RabbitMqUtil.getChannel(); //声明普通交换机和死信交换机 //普通交换机 channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT); channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT); HashMap<String, Object> arguments = new HashMap<>(); //设置队列最大长度,达到这个长度将后续消息转发给c2消费者 arguments.put("x-max-length",6); //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName()); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //声明普通队列 channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments); / //声明死信队列 channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null); //绑定普通队列 channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan"); //绑定死信队列 channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi"); System.out.println("等待接收消息。。。。"); channel.basicConsume(QueueName.Normal_Queue.getName(), false,(consumerTag,message)->{ String msg = new String(message.getBody(),"UTF-8"); if ("info5".equals(msg)){ System.out.println(msg+"此消息是被拒绝的!"); //拒绝应答 1为消息标签,2为是否放回普通队列 channel.basicReject(message.getEnvelope().getDeliveryTag(),false); }else { System.out.println("Consumer01 接收到消息:"+new String(message.getBody(),"UTF-8")); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } System.out.println("处理完成"); },(consumerTag)->{ System.out.println("Consumer01 错误消息被中断:"+consumerTag); }); } }
-
死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)
/** * 死信队列, 消费者2 */ public class Consumer02 { public static void main(String[] args)throws Exception { Channel channel = RabbitMqUtil.getChannel(); //声明死信交换机 channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT); //声明死信队列 channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null); //绑定死信队列 channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi"); System.out.println("等待接收消息。。。。"); //接收消息 channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->{ System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8")); },tag->{}); } }
操作步骤
注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。
-
启动两个消费者 ,打开生产者(可以发现 info5 的消息自动转入死信队列中)
思考:如果拒绝了某一条消息,但是并没有拒绝它重新入队会导致什么?
if ("info5".equals(msg)) {
System.out.println(msg + "此消息是被拒绝的!");
//拒绝应答 1为消息标签,2为是否放回普通队列
//channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
channel.basicReject(message.getEnvelope().getDeliveryTag(), true); //拒绝但是允许重新入队
}
答案是:如果允许重新入队,队列会一直循环处理这一条消息,这条消息也会被循环入队