rabbitmq-爱代码爱编程
mq的使用
测试 一对一
生产者
//生产者
@Test
void testConsumer() {
Connection connection;
Channel channel ;
try {
//1 创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
//2 创建通道
//创建与RabbitMQ服务的TCP连接
connection=factory.newConnection();
//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
//3 声明队列
/**
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare("wzh",true,false,false,null);
//4 发送消息
/**
* 消息发布方法
* param1:Exchange的名称,如果没有指定,则使用DefaultExchange
* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
* param3:消息包含的属性
* param4:消息体
*/
channel.basicPublish("","wzh",null,"找个牢坐吧".getBytes(StandardCharsets.UTF_8));
System.out.println("消息已发送");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
生产者生产一个消息后 会在localhost:15673上面监控的到
消费者
//消费者
@Test
void testProducer() {
Connection connection;
Channel channel;
try {
//1 创建连接
ConnectionFactory factory=new ConnectionFactory();
//设置MQ所在服务器的ip和端口
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//2 创建通道
connection =factory.newConnection();
channel = connection.createChannel();
//3 声明队列
channel.queueDeclare("wzh",true,false,false,null);
//4 监听队列
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties 可为消息体赋予多种的功能特性
* @param body 接收的数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
String msg=new String(body, StandardCharsets.UTF_8);
System.out.println("receive message.."+msg);
System.out.println(properties.toString());
System.out.println(envelope.toString());
}
};
//5 接收消息
channel.basicConsume("wzh", true, consumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
消费后消息变为0
一对一的流程
这是点对点的消息队列
生产者
1 构建工厂
2 创建通道
3 创建队列
4 发送信息
消费者
1 构建工厂
2 创建通道
3 创建队列
5 回调函数 创建一个DefaultConsumer对象 实现handleDelivery方法
4 接收数据 执行回调
工作模式 发布与订阅
生产者
//订阅和发布
@Test
void diYueConsumer() throws IOException, TimeoutException {
//1创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2创建连接通道
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//3创建声明交换机
/**
* 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare("jh", BuiltinExchangeType.FANOUT);
//4创建队列
/**
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare("dl1", true, false, false, null);
channel.queueDeclare("dl2", true, false, false, null);
//5交换机和队列绑定
/**
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind("dl1", "jh", "");
channel.queueBind("dl2", "jh", "");
//6 发送
for (int i = 0; i < 10; i++) {
String message="今天是"+System.currentTimeMillis();
/**
* 参数明细
* 1、交换机名称,不指令使用默认交换机名称 DefaultExchange
* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消
息将发到此队列
* 3、消息属性
* 4、消息内容
*/
channel.basicPublish("jh","",null,message.getBytes());
System.out.println("SendMessageis:'"+message+"'");
}
}
生产者产生10个消息 所有订阅该主题的都要收到信息
消息查看
消费者1
//订阅与发布时的消费者1
@Test
void testDingYueProducer1() throws IOException, TimeoutException {
//1 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//2 连接通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//3 与交换机连接
/**
*参数明细
*1、交换机名称
*2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare("jh", BuiltinExchangeType.FANOUT);
//4 交换机与队列连接
/**
*参数明细
*1、队列名称
*2、交换机名称
*3、路由key
*/
channel.queueBind("dl1", "jh", "");
//6消费方法
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties 可为消息体赋予多种的功能特性
* @param body 接收的数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
String msg=new String(body, StandardCharsets.UTF_8);
System.out.println("receive message.."+msg);
System.out.println(properties.toString());
System.out.println(envelope.toString());
}
};
//监听队列
/**
*监听队列String queue, boolean autoAck,Consumer callback
*参数明细
*1、队列名称
*2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复
*3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume("dl1", true, consumer);
}
一个用户消费后获取到信息 监控台信息
消费者2
//订阅与发布时的消费者2
@Test
void testDingYueProducer2() throws IOException, TimeoutException {
//1 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//2 连接通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//3 与交换机连接
/**
*参数明细
*1、交换机名称
*2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare("jh", BuiltinExchangeType.FANOUT);
//4 交换机与队列连接
/**
*参数明细
*1、队列名称
*2、交换机名称
*3、路由key
*/
channel.queueBind("dl2", "jh", "");
//6 消费方法
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties 可为消息体赋予多种的功能特性
* @param body 接收的数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
String msg=new String(body, StandardCharsets.UTF_8);
System.out.println("receive message.."+msg);
System.out.println(properties.toString());
System.out.println(envelope.toString());
}
};
//5 监听队列
/**
*监听队列String queue, boolean autoAck,Consumer callback
*参数明细
*1、队列名称
*2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复
*3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume("dl2", true, consumer);
}
订阅与发布
订阅与发布 需要交换机的支持 且是一个消息 全体广播 都会收到该信息
生产者的流程
1 连接工厂
2 创建连接 Connection
3 创建通道 Channel
4 创建交换机 BuiltinExchangeType.FANOUT 交换机类型
5 创建队列
6 队列与交换机绑定
7 发送数据
消费者者的流程
1 连接工厂
2 创建连接 Connection
3 创建通道 Channel
4 创建交换机
5 交换机与队列绑定
7 回调函数
6 接收信息 监听队列 执行回调
mq 2 路由模式
1 生产者代码
@SpringBootTest
public class MqConsumerApplicationTests2 {
//交换机名称
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
//队列名称
private static final String QUEUE_INFORM_EMAIL="queue_inform_email";
private static final String QUEUE_INFORM_SMS="queue_inform_sms";
//路由生产者
@Test
void lyModel() throws IOException, TimeoutException {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false, null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false, null);
//交换机与队列绑定
/**
*参数明细
*1、队列名称
*2、交换机名称
*3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);
//发送短信
channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS,null,"找个牢坐吧".getBytes(StandardCharsets.UTF_8));
//发送邮件
channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL,null,"找个ban吧".getBytes(StandardCharsets.UTF_8));
}
}
2 消费者端
@SpringBootTest
public class MqProducerApplicationTests2 {
//交换机名称
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
//队列名称
private static final String QUEUE_INFORM_EMAIL="queue_inform_email";
private static final String QUEUE_INFORM_SMS="queue_inform_sms";
//路由消费者1 邮件
@Test
void testLy1() throws IOException, TimeoutException {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672); factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties 可为消息体赋予多种的功能特性
* @param body 接收的数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
String msg=new String(body, StandardCharsets.UTF_8);
System.out.println("消息是 "+msg);
}
};
//接收信息
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
}
//路由消费者2 信息
@Test
void testLy2() throws IOException, TimeoutException {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672); factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//绑定
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties 可为消息体赋予多种的功能特性
* @param body 接收的数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
String msg=new String(body, StandardCharsets.UTF_8);
System.out.println("消息是 "+msg);
}
};
//接收信息
channel.basicConsume(QUEUE_INFORM_SMS, true, consumer);
}
}
路由
但从流程来看
生产者的流程
1 连接工厂
2 创建连接 Connection
3 创建通道 Channel
4 创建交换机 BuiltinExchangeType.DIRECT 交换机类型
5 创建队列
6 队列与交换机绑定
7 发送数据
消费者者的流程
1 连接工厂
2 创建连接 Connection
3 创建通道 Channel
4 创建交换机
5 交换机与队列绑定
7 回调函数
6 接收信息 监听队列 执行回调
两者的区别就在于 对于消费端而言 不需要创建队列 直接使用队列名绑定就好
topics(通配符)路由模式
生产者
//路由生产者
@Test
void lyModel() throws IOException, TimeoutException {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("exchange_routing_TOPIC", BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare("queue.inform.email",true,false,false, null);
channel.queueDeclare("queue.inform.sms",true,false,false, null);
channel.basicPublish("exchange_routing_TOPIC","queue.inform.sms",null,"sms".getBytes(StandardCharsets.UTF_8));
//发送邮件
channel.basicPublish("exchange_routing_TOPIC","queue.inform.email",null,"email".getBytes(StandardCharsets.UTF_8));
}
消费者
@SpringBootTest
public class MqProducerApplicationTests3 {
//交换机名称
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_TOPIC";
//队列名称
private static final String QUEUE_INFORM_EMAIL="queue.inform.email";
private static final String QUEUE_INFORM_SMS="queue.inform.sms";
//路由消费者1 邮件
@Test
void testLy1() throws IOException, TimeoutException {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672); factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);
//绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"#.email");
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties 可为消息体赋予多种的功能特性
* @param body 接收的数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
String msg=new String(body, StandardCharsets.UTF_8);
System.out.println("消息是 "+msg);
}
};
//接收信息
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
}
//路由消费者2 信息
@Test
void testLy2() throws IOException, TimeoutException, InterruptedException {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672); factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);
//绑定
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"#.sms");
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingKey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties 可为消息体赋予多种的功能特性
* @param body 接收的数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
String msg=new String(body, StandardCharsets.UTF_8);
System.out.println("消息是 "+msg);
}
};
//接收信息
channel.basicConsume(QUEUE_INFORM_SMS, true, consumer);
}
总结
生产者的流程
1 连接工厂
2 创建连接 Connection
3 创建通道 Channel
4 创建交换机 BuiltinExchangeType.TOPIC 交换机类型
5 发送数据
消费者者的流程
1 连接工厂
2 创建连接 Connection
3 创建通道 Channel
4 创建交换机 BuiltinExchangeType.TOPIC 交换机类型
5 创建队列
6 交换机与队列绑定
8 回调函数
7 接收信息 监听队列 执行回调
在topic模式下 生产者会先创建好通道 交换机 队列等等 但是此时交换机与队列没有关系
如果没有消费者去调用连接的情况下 此时的生产者所创建的信息会丢失
但是如果当消费者先去启动时 因为消费者不会去创建队列 自然也就无法绑定 会造成程序异常
所有 我们可以先启动一次生产者 创造出生产者所需要的队列 然后消费者去完成绑定 当绑定完成时再去启动生产者 这样就可以完成
注意 第一次生产者生产的消息会丢失 因为消费者未完成绑定队列与交换机
在topic模式下 主要是监控队列来完成信息的消费 我们一般在消费者端新建队列是使用全名称 监控时也使用全名称
只有在队列与交换机绑定时使用通配符来完成分拣信息的操作
# 代表0-n个
* 代表一个
mq与sb整合
生产者消费者依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
</dependencies>
<!--jdk 1.8-->
配置
生产者配置
server.port=9090
spring.application.name= mq-sb-produce
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/
消费者配置
server.port=8080
spring.application.name= mq-sb-consume
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/
配置类
生产者配置类
package com.wzh.mqsbproduce.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 用户:hahao
* 日期:2023/5/26
* 时间:16:08
* 类名:RabbitMQConfig
* 生产者
*/
@Configuration
public class RabbitMQConfig {
/**
* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
* 这是一个交换机配置类 且该类在注入Bean 中时使用了 名字注入
* @return 返回一个交换机
*/
@Bean("mq-sb")
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.topicExchange("mq-sb").durable(true).build();
}
}
消费者配置类
package com.wzh.mqsbconsume.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 用户:hahao
* 日期:2023/5/26
* 时间:16:08
* 类名:RabbitMQConfig
* 消费端
*/
@Configuration
public class RabbitMQConfig {
/**
* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
* 这是一个交换机配置类 且该类在注入Bean 中时使用了 名字注入
* @return 返回一个交换机
*/
@Bean("mq-sb")
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.topicExchange("mq-sb").durable(true).build();
}
/**
* 声明队列 纳入spring boot管理时也是以名字纳入的 所有在配置时 要按照名字来注入 按照名字注入可以保证队列名字确定性
* 因为sb默认是单例的
* @return 返回一个队列
*/
@Bean("queue.sms")
public Queue QUEUE_INFORM_SMS() {
return new Queue("queue.sms");
}
//声明队列
@Bean("queue.email")
public Queue QUEUE_INFORM_EMAIL() {
return new Queue("queue.email");
}
/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
*绑定队列到交换机 .
*@param queue 队列
*@param exchange 交换机
*@return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier("queue.sms") Queue queue,
@Qualifier("mq-sb") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("#.sms.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier("queue.email") Queue queue,
@Qualifier("mq-sb") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("*.*.email").noargs();
}
}
测试
生产端
package com.wzh.mqsbproduce;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqSbProduceApplicationTests {
//生产
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendByTopics(){
for (int i=0;i<5;i++){
String message = "sms inform to user"+i;
rabbitTemplate.convertAndSend("mq-sb","queue.sms",message);
System.out.println("Send Message is:'" + message + "'");
}
}
@Test
public void testSendByTopics2(){
for (int i=0;i<5;i++){
String message = "email inform to user"+i;
rabbitTemplate.convertAndSend("mq-sb","queue.email",message);
System.out.println("Send Message is:'" + message + "'");
}
}
}
消费者
package com.wzh.mqsbconsume.mq;
import com.rabbitmq.client.Channel;
import com.wzh.mqsbconsume.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 用户:hahao
* 日期:2023/5/26
* 时间:17:20
* 类名:ReceiveHandle
*/
@Component
public class ReceiveHandle {
//监听email队列
@RabbitListener(queues = {"queue.email"})
public void receive_email(String msg, Message message, Channel channel){
System.out.println(msg);
}
//监听sms队列
@RabbitListener(queues = {"queue.sms"})
public void receive_sms(String msg,Message message,Channel channel){
System.out.println(msg);
}
}
总结
这个和上面单独使用单体开发一样
生产者只负责交换机的创建
消费端负责交换机和队列的创建以及两者的绑定
注意:
我们如果有多个队列 或者多个绑定关系 多个交换机时
应要求在注入spring管理时以名字来装配
因为spring默认是单例 且autowried是按照类型装配的
不然有可能造成某一个队列与交换机多次绑定
mq面试题
使用消息队列的优缺点
1.1、消息队列的优点:
(1)解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的。消息生产者和消费者都不知道对方的存在;
(2)异步:主流程只需要完成业务的核心功能;对于业务非核心功能,将消息放入到消息队列之中进行异步处理,减少请求的等待,提高系统的总体性能;
(3)削峰/限流:将所有请求都写到消息队列中,消费服务器按照自身能够处理的请求数从队列中拿到请求,防止请求并发过高将系统搞崩溃;
1.2、消息队列的缺点:
(1)系统的可用性降低:系统引用的外部依赖越多,越容易挂掉,如果MQ 服务器挂掉,那么可能会导致整套系统崩溃。这时就要考虑如何保证消息队列的高可用了
(2)系统复杂度提高:加入消息队列之后,需要保证消息没有重复消费、如何处理消息丢失的情况、如何保证消息传递的有序性等问题;
(3)数据一致性问题:A 系统处理完了直接返回成功了,使用者都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,就会导致数据不一致了
Exchange交换器的类型
direct:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
fanout:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
topic:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
如何保证消息不被重复消费?
正常情况下,消费者在消费消息后,会给消息队列发送一个确认,消息队列接收后就知道消息已经被成功消费了,然后就从队列中删除该消息,也就不会将该消息再发送给其他消费者了。不同消息队列发出的确认消息形式不同,RabbitMQ是通过发送一个ACK确认消息。但是因为网络故障,消费者发出的确认并没有传到消息队列,导致消息队列不知道该消息已经被消费,然后就再次消息发送给了其他消费者,从而造成重复消费的情况。
重复消费问题的解决思路是:保证消息的唯一性,即使多次传输,也不让消息的多次消费带来影响,也就是保证消息等幂性;幂等性指一个操作执行任意多次所产生的影响均与一次执行的影响相同。具体解决方案如下:
(1)改造业务逻辑,使得在重复消费时也不影响最终的结果。例如对SQL语句: update t1 set money = 150 where id = 1 and money = 100; 做了个前置条件判断,即 money = 100 的情况下才会做更新,更通用的是做个 version 即版本号控制,对比消息中的版本号和数据库中的版本号。
(2)基于数据库的的唯一主键进行约束。消费完消息之后,到数据库中做一个 insert 操作,如果出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
(3)通过记录关键的key,当重复消息过来时,先判断下这个key是否已经被处理过了,如果没处理再进行下一步。
如何保证消息的有序性?
针++++,解决方法就是保证生产者入队的顺序是有序的,出队后的顺序消费则交给消费者去保证。
(1)方法一:拆分queue,使得一个queue只对应一个消费者。由于MQ一般都能保证内部队列是先进先出的,所以把需要保持先后顺序的一组消息使用某种算法都分配到同一个消息队列中。然后只用一个消费者单线程去消费该队列,这样就能保证消费者是按照顺序进行消费的了。但是消费者的吞吐量会出现瓶颈。如果多个消费者同时消费一个队列,还是可能会出现顺序错乱的情况,这就相当于是多线程消费了
(2)方法二:对于多线程的消费同一个队列的情况,可以使用重试机制:比如有一个微博业务场景的操作,发微博、写评论、删除微博,这三个异步操作。如果一个消费者先执行了写评论的操作,但是这时微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行发微博的操作后,再执行,就可以成功。
如何保证消息不丢失,进行可靠性传输?
对于消息的可靠性传输,每种MQ都要从三个角度来分析:生产者丢数据、消息队列丢数据、消费者丢数据。以RabbitMQ为例:
1、生产者丢数据:
RabbitMQ提供事务机制(transaction)和确认机制(confirm)两种模式来确保生产者不丢消息。
(1)事务机制:
发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())
该方式的缺点是生产者发送消息会同步阻塞等待发送结果是成功还是失败,导致生产者发送消息的吞吐量降下降。
// 开启事务
channel.txSelect
try {
// 发送消息
} catch(Exception e){
// 回滚事务
channel.txRollback;
//再次重试发送这条消息
....
}
//提交事务
channel.txCommit;
(2)确认机制:
生产环境常用的是confirm模式。生产者将信道 channel 设置成 confirm 模式,一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确到达目的队列了。如果rabbitMQ没能处理该消息,也会发送一个Nack消息给你,这时就可以进行重试操作。
Confirm模式最大的好处在于它是异步的,一旦发布消息,生产者就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息。
处理Ack和Nack的代码如下所示:
`channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
}
});`
2、消息队列丢数据:
处理消息队列丢数据的情况,一般是开启持久化磁盘。持久化配置可以和生产者的 confirm 机制配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。这样的话,如果消息持久化磁盘之前,即使 RabbitMQ 挂掉了,生产者也会因为收不到Ack信号而再次重发消息。
持久化设置如下(必须同时设置以下 2 个配置):
(1)创建queue的时候,将queue的持久化标志durable在设置为true,代表是一个持久的队列,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化queue里的数据;
(2)发送消息的时候将 deliveryMode 设置为 2,将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
这样设置以后,RabbitMQ 就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
3、消费者丢数据:
消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费中者会自动发送一个确认,通知 RabbitMQ 已经收到消息了,这时 RabbitMQ 就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。
解决方案就是采用手动确认消息,设置 autoAck = False,等到消息被真正消费之后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那 RabbitMQ 就收不到发的ack,然后 RabbitMQ 就会将这条消息重新分配给其他的消费者去处理。
但是 RabbitMQ 并没有使用超时机制,RabbitMQ 仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ 会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:
如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息
4 需要注意的点:
1、消息可靠性增强了,性能就下降了,因为写磁盘比写 RAM 慢的多,两者的吞吐量可能有 10 倍的差距。所以,是否要对消息进行持久化,需要综合考虑业务场景、性能需要,以及可能遇到的问题。若想达到单RabbitMQ服务器 10W 条/秒以上的消息吞吐量,则要么使用其他的方式来确保消息的可靠传输,要么使用非常快速的存储系统以支持全持久化,例如使用 SSD。或者仅对关键消息作持久化处理,且应该保证关键消息的量不会导致性能瓶颈。
2、当设置 autoAck = False 时,如果忘记手动 ack,那么将会导致大量任务都处于 Unacked 状态,造成队列堆积,直至消费者断开才会重新回到队列。解决方法是及时 ack,确保异常时 ack 或者拒绝消息。
3、启用消息拒绝或者发送 nack 后导致死循环的问题:如果在消息处理异常时,直接拒绝消息,消息会重新进入队列。这时候如果消息再次被处理时又被拒绝 。这样就会形成死循环。
费者丢数据:
消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费中者会自动发送一个确认,通知 RabbitMQ 已经收到消息了,这时 RabbitMQ 就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。
解决方案就是采用手动确认消息,设置 autoAck = False,等到消息被真正消费之后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那 RabbitMQ 就收不到发的ack,然后 RabbitMQ 就会将这条消息重新分配给其他的消费者去处理。
但是 RabbitMQ 并没有使用超时机制,RabbitMQ 仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ 会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:
如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息
4 需要注意的点:
1、消息可靠性增强了,性能就下降了,因为写磁盘比写 RAM 慢的多,两者的吞吐量可能有 10 倍的差距。所以,是否要对消息进行持久化,需要综合考虑业务场景、性能需要,以及可能遇到的问题。若想达到单RabbitMQ服务器 10W 条/秒以上的消息吞吐量,则要么使用其他的方式来确保消息的可靠传输,要么使用非常快速的存储系统以支持全持久化,例如使用 SSD。或者仅对关键消息作持久化处理,且应该保证关键消息的量不会导致性能瓶颈。
2、当设置 autoAck = False 时,如果忘记手动 ack,那么将会导致大量任务都处于 Unacked 状态,造成队列堆积,直至消费者断开才会重新回到队列。解决方法是及时 ack,确保异常时 ack 或者拒绝消息。
3、启用消息拒绝或者发送 nack 后导致死循环的问题:如果在消息处理异常时,直接拒绝消息,消息会重新进入队列。这时候如果消息再次被处理时又被拒绝 。这样就会形成死循环。