【微服务全家桶】-爱代码爱编程
【【微服务全家桶】-实用篇-3-MQ
1 初识MQ
1.1 同步通讯和异步通信
1.2 同步调用的问题
1.3 异步调用
异步通信的优点:
-
耦合度低
-
吞吐量提升
-
故障隔离
-
流量削峰
异步通信的缺点:
-
依赖于Broker的可靠性、安全性、吞吐能力
-
架构复杂了,业务没有明显的流程线,不好追踪管理
通常一般选择同步通信,对时效性要求较高
1.4 MQ
MQ(MessageQueue),,中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
1.5 RabbitMQ快速入门
加载rabbitMQ
docker load -i mq.tar
安装MQ
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
-e 用户名和密码
–hostname 配置主机名,集群部署用到
-p端口映射 15672管理平台端口,ui界面,5672消息通信端口
浏览器打开管理平台端口 http://192.168.204.129:15672
RabbitMQ中的几个概念:
-
channel:操作MQ的工具
-
exchange:路由消息到队列中
-
queue:缓存消息
-
virtual host:虚拟主机,是对queue、exchanges等资源的逻辑分组
1.6 MQ常见消息模型
1.6.1 HelloWorld案例
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channeli声明队列
- 利用channell向队列发送消息
生产者代码
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.204.129");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
消费者代码
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.204.129");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
相应的创建连接和channel以及queue
消费者消费完后,阅后即焚,queue中为空
2 SpringAMQP
2.1 初始SpringAMQP
2.2 入门案例
2.2.1 消息发送
案例要求
1.在父工程中引入spring-amqp依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.1 在publisher服务中编写application.yml,配置MQ连接信息
spring:
rabbitmq:
host: 192.168.204.129
port: 5672
virtual-host: /
username: itcast
password: 123321
2.2 在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("发送消息成功:【" + message + "】");
}
}
浏览器打开管理平台端口 http://192.168.204.129:15672
2.2.2 消息接收-@RabbitListener
1.在consumer服务中编写application.yml,添加MQ连接信息
spring:
rabbitmq:
host: 192.168.204.129
port: 5672
virtual-host: /
username: itcast
password: 123321
2.在consumer中编写一个类,编写消费逻辑
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void receiveMessage(String message) {
System.out.println("接收到消息:【" + message + "】");
}
}
@Component在Spring中注册成Bean,@RabbitListener中参数queues设置监听队列。
成功接收到,且rabbit阅后即焚
2.3 工作队列
有多个消费者,称之为工作队列。提高消息处理速率,避免消息堆积
2.3.1 工作队列案例
在SpringAmqpTest中添加为生产者添加新的测试方法,Thread.sleep(20)
生产者每秒产生50条消息。
@Test
public void test2SimpleQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
for(int count=0;count<=50;count++){
rabbitTemplate.convertAndSend(queueName, message+count);
Thread.sleep(20);
}
}
在SpringRabbitListener中添加新的消费者
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenWorkqueue1(String message) throws InterruptedException {
System.out.println("消费者1......接收到消息:【" + message + "】"+ LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkqueue2(String message) throws InterruptedException {
System.err.println("消费者2......接收到消息:【" + message + "】"+ LocalTime.now());
Thread.sleep(200);
}
}
预计:消费者1每秒处理50条,消费者2每秒处理5条。
启动ConsumerApplication,发现
消费者1处理所有的偶数,消费者2处理所有的奇数,这是因为在AMQP中有个预分配的策略,默认平均分配。
如果要更改AMQP的预取的分配策略,需要修改消费者中application.yml文件,设置preFetch的值,控制预取消息的上限
spring:
rabbitmq:
host: 192.168.204.129
port: 5672
virtual-host: /
username: itcast
password: 123321
listener:
simple:
prefetch: 1
修改过后重新启动,显示预分配策略正常
2.4 发布、订阅
2.4.1 Fanout广播
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
2.4.1.1 设置配置类
在消费者中创建config.FanoutConfig,设置配置类
@Configuration
public class FanoutConfig {
//itcast.fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
//fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//fanout.queue2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//绑定队列1到交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//绑定队列2到交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.4.1.2 重新启动
发现RabbitMQ已经实现
2.4.1.3 发送/接收消息
在消费者中SpringRabbitListener中添加消费者
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutqueue1(String message) {
System.out.println("消费者接收到Fanoutqueue1消息:【" + message + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutqueue2(String message) {
System.out.println("消费者接收到Fanoutqueue2消息:【" + message + "】");
}
在生产者中SpringAmqpTest添加新的测试方法
@Test
public void testSendFanoutExchange() throws InterruptedException {
String exchangeName = "itcast.fanout";
String message = "hello, spring amqp!";
for(int count=0;count<=50;count++){
rabbitTemplate.convertAndSend(exchangeName, "", message+count);
Thread.sleep(20);
}
}
测试结果
两个队列收到相同的消息
2.4.2 Direct订阅
2.4.2.1 添加订阅者
既可以通过设置配置类来完成消费者绑定,也可以直接通过注解绑定。
在SpringRabbitListener中添加消费者,利用注解完成
绑定队列和交换机***@QueueBinding***
队列***@Queue***
交换机***@Exchange***
@RabbitListener(bindings =@QueueBinding(
value=@Queue(name="direct.queue1"),
exchange=@Exchange(name="itcast.direct",type= ExchangeTypes.DIRECT),
key={"red","blue"}
))
public void listenDirectQueue1(String message){
System.out.println("消费者接收到DirectQueue1消息:【" + message + "】");
}
@RabbitListener(bindings =@QueueBinding(
value=@Queue(name="direct.queue2"),
exchange=@Exchange(name="itcast.direct",type= ExchangeTypes.DIRECT),
key={"red","yellow"}
))
public void listenDirectQueue2(String message){
System.out.println("消费者接收到DirectQueue2消息:【" + message + "】");
}
2.4.2.2 重新启动
2.4.2.3 发送/接收消息
在生产者中SpringAmqpTest添加新的测试方法
@Test//blue是队列1,yellow是队列2,red是队列1和队列2
public void testSendDirectExchange() throws InterruptedException {
String exchangeName = "itcast.direct";
String message = "hello, spring amqp!";
String routingKey="";
for(int count=0;count<=50;count++){
if(count%2==0){
routingKey="blue";
}
else{
routingKey="yellow";
}
if(count%10==0){
routingKey="red";
}
rabbitTemplate.convertAndSend(exchangeName, routingKey, message+count);
Thread.sleep(20);
}
}
2.4.3 Topic订阅
2.4.3.1 添加订阅者
既可以通过设置配置类来完成消费者绑定,也可以直接通过注解绑定。
在SpringRabbitListener中添加消费者,利用注解完成
@RabbitListener(bindings =@QueueBinding(
value=@Queue(name="topic.queue1"),
exchange=@Exchange(name="itcast.topic",type= ExchangeTypes.TOPIC),
key={"china.#"}
))
public void listenTopicQueue1(String message){
System.err.println("消费者接收到TopicQueue1消息:【" + message + "】");
}
@RabbitListener(bindings =@QueueBinding(
value=@Queue(name="topic.queue2"),
exchange=@Exchange(name="itcast.topic",type= ExchangeTypes.TOPIC),
key={"#.news"}
))
public void listenTopicQueue2(String message){
System.err.println("消费者接收到TopicQueue2消息:【" + message + "】");
}
2.4.3.2 重新启动
2.4.3.3 发送/接收消息
在生产者中SpringAmqpTest添加新的测试方法
@Test
public void testSendTopicExchange() throws InterruptedException {
String exchangeName = "itcast.topic";
String message = "hello, spring amqp!";
String routingKey="";
for(int count=0;count<=50;count++){
if(count%2==0){
routingKey="china."+count;
}
else{
routingKey=count+".news";
}
if(count%10==0){
routingKey="china.news";
}
rabbitTemplate.convertAndSend(exchangeName, routingKey, message+count);
Thread.sleep(20);
}
}
2.5 消息转换器
在FanoutConfig中添加新的Bean对象
@Bean
public Queue objectQueue1(){
return new Queue("object.queue");
}
成功加入mq,为其添加测试方法
@Test
public void testSendObjectQueue() throws InterruptedException {
String queueName = "object.queue";
HashMap<String, Object> msg = new HashMap<>();
msg.put("name", "张三");
msg.put("age", 18);
rabbitTemplate.convertAndSend(queueName, msg);
}
但是并不是能接受所有的object,只是序列化后的数据
2.5.1 发送者-修改Spring的消息对象处理
2.5.1.1在父工程中引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2.5.1.2 声明MessageConverter
在publisher的启动类中声明bean对象
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
2.5.1.3 重新启动
2.5.2 接收者-修改消息对象处理器
2.5.2.1 引入依赖
父工程引入过了,跳过
2.5.2.2 声明MessageConverter
在consumer的启动类中声明bean对象
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
2.5.2.3 添加接收者
直接绑定队列
@RabbitListener(queues = "object.queue")
public void listenObjectQueue1(Map<String,Object> msg){
System.out.println("消费者接收到ObjectQueue1消息:【" + msg + "】");
}
2.5.2.4 重新发送
成功接收到JSON格式对象