RabbitMQ消息Ack确认机制+消息延迟队列-爱代码爱编程
目录
一:RabbitMQ消息Ack确认机制
1.确认种类
RabbitMQ的消息确认有两种。
- 消息发送确认:这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。
发送确认分为两步:
一是确认是否到达交换器,
二是确认是否到达队列。
- 消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
2.消息发送确认
2.1 声明队列及交换机
//声明队列
@Bean(name = "topic-queue1")
public Queue topicQueue1(){
return new Queue("topic-queue1");
}
@Bean(name = "topic-queue2")
public Queue topicQueue2(){
return new Queue("topic-queue2");
}
@Bean(name = "topic-queue3")
public Queue topicQueue3(){
return new Queue("topic-queue3");
}
//声明交换机
//通配符模式下的交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topic-exchange");
}
@Bean
public Binding bindQueue1ToTopicExchange(@Qualifier("topic-queue1")Queue queue,TopicExchange topicExchange){
//* 代表一个词
//# 代表零个或者多个词
return BindingBuilder.bind(queue).to(topicExchange).with("ex.123.123");
}
@Bean
public Binding bindQueue2ToTopicExchange(@Qualifier("topic-queue2")Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("ex.*");
}
@Bean
public Binding bindQueue3ToTopicExchange(@Qualifier("topic-queue3")Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("ex.#");
}
2.2 生产者配置application
#消息发送到交换机的确认
publisher-confirms: true
#消息由交换机转发到队列的确认
publisher-returns: true
2.3.生产者MyConfirmCallBack
通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发
@Component
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
{
@Autowired
RabbitTemplate rabbitTemplate;
//初始化加载方法
@PostConstruct
public void rabbit(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
//通过实现confirmCallback接口,确认消息是否投递到了交换机中
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息投递的结果:"+correlationData);
System.out.println("消息投递是否成功"+b);
System.out.println("失败的原因"+s);
}
//交换机转发到队列的确认
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息主体:"+message);
System.out.println("应答码:"+i);
System.out.println("原因描述:"+s);
System.out.println("交换机:"+s1);
System.out.println("路由健:"+s2);
}
}
3.生产者SendMessageController
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMail/{message}")
public String sendMail(@PathVariable("message")String message){
rabbitTemplate.convertAndSend("topic-exchange","ex.123",message);
return "发送";
}
}
3.消费者监听
//消息确认 /sendMail/{message}
@RabbitListener(queues = "topic-queue3")
@RabbitHandler
public void topicQueue3(String str, Channel channel, Message message){
try{
System.out.println("监听到了错误消息"+str);
//1.当前消息的唯一标识。deliveryTag
//2.消息的确认,false 代表确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception ex){
}
}
二:消息延迟队列
1.什么是延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
- 订单超时关闭:在支付场景中,一般上订单在创建后30分钟或1小时内未支付的,会自动取消订单。
- 短信或者邮件通知:在一些注册或者下单业务时,需要在1分钟或者特定时间后进行短信或者邮件发送相关资料的。本身此类业务于主业务是无关联性的,一般上的做法是进行异步发送。
- 重试场景:比如消息通知,在第一次通知出现异常时,会在隔几分钟之后进行再次重试发送。
2.使用要求
RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过过期时间TTL和死信队列来模拟延迟队列。
2.1 过期时间TTL
RabbitMQ
中可以对队列和消息分别设置TTL,TTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后死亡成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。
2.2 死信队列
设置了TTL
的消息或队列最终会成为Dead Letter
,当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定此DLX的队列就是死信队列。
2.3 具体实现
延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列
3.实例
3.1 rabbitConfig配置文件
//延迟队列
//正常的交换机
@Bean(name = "topic-Exchange")
public TopicExchange topicExchange(){
return new TopicExchange("topic-exchange");
}
//死信的交换机
@Bean(name="dead-topic-Exchange")
public TopicExchange DeadTopicExchange(){
return new TopicExchange("dead-topic-exchange");
}
//正常的队列
@Bean(name = "topic-queue")
public Queue queue(){
return new Queue("topic-queue");
}
//死信的队列
@Bean(name = "dead-topic-queue")
public Queue DeadQueue(){
Map map = new HashMap<>();
map.put("x-dead-letter-exchange","topic-exchange");
map.put("x-dead-letter-routing-key","ex.123");
//下面变量的含义:1.死信队列的名称,2.是否持久化,3.是否独享,排外,4.是否自动删除
return new Queue("dead-topic-queue",true,false,false,map);
}
//绑定正常的交换机与队列
@Bean
public Binding bindQueueToExchange(@Qualifier("topic-queue")Queue queue,@Qualifier("topic-Exchange")TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("ex.*");
}
//绑定死信交换机与死信队列
@Bean
public Binding bindDeadQueueToDeadExchange(@Qualifier("dead-topic-queue")Queue queue,@Qualifier("dead-topic-Exchange")TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("any");
}
3.2 生产者SendMessageController
@RequestMapping("/sendDeadMail/{message}")
public String sendDeadMail(@PathVariable("message")String message){
rabbitTemplate.convertAndSend("dead-topic-exchange", "any", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置30s过期,过期转发到指定路由 message.getMessageProperties().setExpiration("30000");
return message;
}
});
return "发送";
}
3.3 消费者监听
//监听延迟消息
@RabbitListener(queues = "topic-queue")
public void DeadTopicQueue(String str, Channel channel, Message message){
try {
System.out.println("监听到了延迟消息"+str);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}
等死信队列过期后,转发到正常队列,监听器监听正常队列
30s过后,监听到消息
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接: https://blog.csdn.net/qq_45145809/article/details/109329795