spring的@rabbitlistener-爱代码爱编程
提示:该文档使用spring-amqp来集成rabbitMq
一、配置
maven配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
配置rabbitmq相关属性
spring:
#rabbit配置
rabbitmq:
addresses: 192.168.108.101:9999
username: admin
password: admin
virtual-host: /
template:
retry:
#开启重试,重试次数和重试间隔使用默认配置
enabled: true
listener:
#是否使用自定义线程池 ,默认的线程池是每次new一个线程,参见 SimpleAsyncTaskExecutor
tasks-executor: true
direct:
prefetch: 5
missing-queues-fatal: true
#每个队列的消费者数量
consumers-per-queue: 5
retry:
enabled: true
simple:
# 一次请求获取的消息数量,默认需要大于等于消费者数量
prefetch: 5
concurrency: 5
# 最大消费者数量,如果消息数量较多,simple类型容器会在concurrency基础上,缓慢增大消费者数量,消息消费完成后,缓慢减小到concurrency配置的数量
# simple类型的容器需要线程池的数量至少等于设置的消费者数量,如果线程池用尽,则不能增大消费者。
max-concurrency: 10
missing-queues-fatal: true
retry:
enabled: true
#消费者容器类型 支持 direct 和 simple
type: simple
二、定义消费者
RabbitListener:
public class RabbitMqTestService {
/**
* 队列不存在时,需要创建一个队列,并且与exchange绑定
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue.test", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "exchange.test", type = ExchangeTypes.TOPIC),
key = "k"), concurrency = "4")
public void consumers(Long idx) {
log.info("this idx is {}", idx);
}
}
一个注解,内部声明了队列,并建立绑定关系。
- value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化, autoDelete 表示没有消费者之后队列是否自动删除
- exchange: @Exchange 注解,用于声明 exchange,value 为交换机名称, type 指定消息投递策略,我们这里用的 topic 方式
- key: 表示接收消息时候的key,TOPIC模式必须配置上
- concurrency: 表示固定 4 个消费者
总结:消息发送到交换机,交换机通过路由key 发送到对应的队列。