代码编织梦想

延迟队列

背:也就是给队列设置个过期时间,然后到时间消息变成死信,消费死信队列中的消息就行,再没什么玩意,演示队列优化就是不给队列这只TTL,再生产者代码中消息里面设置消息TTL,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。所以就是消费顺序问题要安装个插件

延迟队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定是按处理的元素的队列

延迟队列使用场景

1. 订单在十分钟之内未支付则自动取消
2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

设置TTL的两种方式:

1.队列设置TTL

在创建对别的收设置队列的x-message-ttl属性,例如

Map<String, Object> map = new HashMap<>();
//设置队列有效期为10秒
map.put("x-message-ttl",10000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,map);

消息设置TTL
对每条消息设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
 channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msg body".getBytes());
两者之间的区别:
1.如果设置了队列的TTL属性,那么一旦消息过期,就会被队列的丢弃
2.如果是消息设置了TTL属性,那么即使消息过期,也不一定会马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息挤压情况,那么已
经过期的消息也许还能存活较长时间
3.如果我们没有设置TTL,就表示消息永远不会过期,如果TTL设置为0,则表示除非此时可以直接投递到消费者,否则该消息会被丢弃

整合springboot

添加依赖

 <!--RabbitMQ 依赖-->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.47</version>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <!--swagger-->
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
 </dependency>
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!--RabbitMQ 测试依赖-->
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

 修改配置文件

spring.rabbitmq.host=182.92.234.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

添加swagger配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
 @Bean
 public Docket webApiConfig(){
 return new Docket(DocumentationType.SWAGGER_2)
 .groupName("webApi")
 .apiInfo(webApiInfo())
 .select()
 .build();
 }
 private ApiInfo webApiInfo(){
 return new ApiInfoBuilder()
 .title("rabbitmq 接口文档")
 .description("本文档描述了 rabbitmq 微服务接口定义")
 .version("1.0")
 .contact(new Contact("enjoy6288", "http://atguigu.com", 
"1551388580@qq.com"))
 .build();
 }
}

队列TTL

代码架构图

创建两个队列 QA QB ,两者队列 TTL 分别设置为 10S 40S ,然后在创建一个交换机 X 和死信交
换机 Y ,它们的类型都是 direct ,创建一个死信队列 QD ,它们的绑定关系如下:

配置文件类代码

@Configuration
public class TtlQueueConfig {
 public static final String X_EXCHANGE = "X";
 public static final String QUEUE_A = "QA";
 public static final String QUEUE_B = "QB";
 public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 public static final String DEAD_LETTER_QUEUE = "QD";
 // 声明 xExchange
 @Bean("xExchange")
 public DirectExchange xExchange(){
 return new DirectExchange(X_EXCHANGE);
 }
 // 声明 xExchange
 @Bean("yExchange")
 public DirectExchange yExchange(){
 return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
 }
 //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
 @Bean("queueA")
 public Queue queueA(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //声明队列的 TTL
 args.put("x-message-ttl", 10000);
 return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
 }
 // 声明队列 A 绑定 X 交换机
 @Bean
 public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queueA).to(xExchange).with("XA");
 }
 //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
 @Bean("queueB")
 public Queue queueB(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //声明队列的 TTL
 args.put("x-message-ttl", 40000);
 return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
 }
 //声明队列 B 绑定 X 交换机
 @Bean
 public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
 }
 //声明死信队列 QD
 @Bean("queueD")
 public Queue queueD(){
 return new Queue(DEAD_LETTER_QUEUE);
 }
 //声明死信队列 QD 绑定关系
 @Bean
 public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
 @Qualifier("yExchange") DirectExchange yExchange){
 return BindingBuilder.bind(queueD).to(yExchange).with("YD");
 }
}

 消息生产类代码

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @GetMapping("sendMsg/{message}")
 public void sendMsg(@PathVariable String message){
 log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
 rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);
 rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
 } 
}

消费者消费代码

@Component
public class DeadLetterQueueConsumer {
 @RabbitListener(queues = "QD")
 public void receiveD(Message message, Channel channel) throws IOException {
 String msg = new String(message.getBody());
 log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
 }
}

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是 每增加一个新的时间需求,就要新增一个队列 ,这里只有 10S 40S
两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然
后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
延迟队列优化(队列不设置TTL时间)
代码架构图:
在这里新增了一个队列 QC, 绑定关系如下 , 该队列不设置 TTL 时间
配置文件代码类:
@Component
public class MsgTtlQueueConfig {
 public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 public static final String QUEUE_C = "QC";
 //声明队列 C 死信交换机
 @Bean("queueC")
 public Queue queueB(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //没有声明 TTL 属性
 return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
 }
 //声明队列 B 绑定 X 交换机
 @Bean
 public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queueC).to(xExchange).with("XC");
 }
}

消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
 rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
 correlationData.getMessageProperties().setExpiration(ttlTime);
 return correlationData;
 });
 log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}
发起请求
http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过
,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_61407147/article/details/130910642

rabbitmq 小白教程,从安装到使用-爱代码爱编程

主要内容 AMQP简介 RabbitMQ简介 RabbitMQ原理 Erlang安装 安装RabbitMQ RabbitMQ账户管理 交换器 学习目标 知识点要求AMQP简介掌握RabbmitMQ简介掌

浅谈spring结合rabbitmq的使用-爱代码爱编程

一、消息的持久化 确保消息在 RabbitMQ 中安全保存,必须开启消息持久化机制,即交换机持久化,队列持久化,消息持久化。 默认情况下,SpringAmqp 声明的交换机,队列,消息都是持久化的,并不需要我们特意指定,即 Durability 属性都为 Durable。 1.RabbitMQ 客户端持久化(三步缺一不可) 1.交换器的持久化 //

《微服务实战》 第十五章 rabbitmq 延迟队列-爱代码爱编程

前言 实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。 1、RabbitMQ延迟队列 1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能 TTL ,即 Ti

《消息队列高手课》课程笔记(二)-爱代码爱编程

消息模型:主题和队列有什么区别? 两类消息模型 早期的消息队列,就是按照“队列”的数据结构来设计的。 生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放

rabbitmq有什么优缺点-爱代码爱编程

为什么使用MQ?MQ的优点 简答 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。 应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。 日志处理 - 解决大量日志传输。 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以

rabbitmq的一些问题-爱代码爱编程

什么是RabbitMQ? RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件 rabbitmq 的使用场景 (1)服务间异步通信 (2)顺序消费 (3)定时任务 (4)请求削峰 RabbitMQ基本概念 Broker: 简单来说就是消息队列服务器实体Exchange: 消息交换

rabbitmq如何保证顺序性_rabbitmq怎么保证消息顺序消费-爱代码爱编程

1. RabbitMQ消息顺序性说明 顺序性: 消息的顺序性是指消费者消费到消息和发送者发布的消息的顺序是一致的 举个例子,不考虑消息重复的情况下,如果生产者发布的消息分别为msg1、msg2、msg3 那么消费者必然也是

学习rabbitmq高级特性_rabbittemplate.setmandatory-爱代码爱编程

目标: 了解熟悉RabbitMQ的高级特性 学习步骤: 高级特性主要分为以下几点, 官网介绍 1、消息可靠性投递 【confirm 确认模式、return 退回模式】 2、Consumer ACK 【ackn