分布式开发之浅谈rabbitmq-爱代码爱编程
一、何为RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。Erlang是为电话交换机编写的语言,天然对分布式和高并发支持良好。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现非常优异。
二、管理WEB页面安装
rabbitmq-plugins.bat enable rabbitmq_management
登录URL:
用户名密码:guest/guest
问题:
登录用户是中文解决方案:
1、创建用户为英文,再安装相关环境
2、修改相应的目录
用管理员执行CMD
rabbitmq-service.bat remove set RABBITMQ_BASE=D:\rabbitmq_server\data rabbitmq-service.bat install rabbitmq-plugins enable rabbitmq_management
三. RabbitMQ快速入门
3.1 生产方工程搭建
1.添加相关依赖
修改pom.xml文件内容为如下:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
2.启动类
package com.woniu.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class);
}
}
3.配置RabbitMQ
创建application.yml,内容如下:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /woniu
username: woniu
password: woniu
创建队列参数说明:
参数 | 说明 |
---|---|
name | 字符串值,queue的名称。 |
durable | 布尔值,表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 queue 是否会恢复/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。 |
exclusive | 布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。 |
autoDelete | 布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。 |
不指定 durable、exclusive 和 autoDelete 时,默认为 true 、 false 和 false 。表示持久化、非排它、不用自动删除。
创建交换机参数说明
参数 | 说明 |
---|---|
name | 字符串值,exchange 的名称。 |
durable | 布尔值,表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 exchange 是否会恢复/仍存在。 |
autoDelete | 布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。 |
不指定 durable 和 autoDelete 时,默认为
true
和false
。表示持久化、不用自动删除
4.创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//队列名称
public static final String ITEM_QUEUE = "item_queue";
//声明队列
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
}
5.创建ProducerController类,发送消息到消息队列
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/senMq/{msg}")
public String senMq(@PathVariable String msg){
rabbitTemplate.convertAndSend(RabbitMQConfig.WONIU_QUEUE, msg);
return "OK";
}
}
6.创建ConsumerController类,接收消息到消息队列
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/getMq")
public Object getMq(){
return rabbitTemplate.receiveAndConvert(RabbitMQConfig.WONIU_QUEUE);
}
}
或者用消息监听处理类
编写消息监听器com.woniu.rabbitmq.listener.MyListener
package com..rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyListener {
@RabbitListener(queues = RabbitMQConfig.WONIU_QUEUE)
public void myListener1(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}
方式二
@Component
@RabbitListener(queues = RabbitMQConfig.WONIU_QUEUE)
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println(hello);
}
}
上述的入门案例中中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
-
P:生产者,也就是要发送消息的程序
-
C:消费者:消息的接受者,会一直等待消息到来。
-
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
四、RabbitMQ工作模式
1. Work queues工作队列模式
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。它们处于竞争者的关系,一条消息只会被一个消费者接收,rabbit采用轮询
的方式将消息是平均发送给消费者的;消费者在处理完某条消息后,才会收到下一条消息。应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。如生产者生产一千条消息,那么c1和c2各消费500条,队列消费消息是均衡分配
1、生产者
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/senMq/{msg}")
public String senMq(@PathVariable String msg){
rabbitTemplate.convertAndSend(RabbitMQConfig.WONIU_QUEUE, msg);
return "OK";
}
}
2、消费者
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/getMq")
public Object getMq(){
return rabbitTemplate.receiveAndConvert(RabbitMQConfig.WONIU_QUEUE);
}
}
复制消费方代码,重新编写一个消费端,然后启动两个消费端,进行测试
2. 订阅模式类型
订阅模式示例图:
在订阅模型中,多了一个exchange角色,而且过程略有变化:
-
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
-
C:消费者,消息的接受者,会一直等待消息到来。
-
Queue:消息队列,接收消息、缓存消息。
-
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
-
Fanout:广播,将消息交给所有绑定到交换机的队列
-
Direct:定向,把消息交给符合指定routing key 的队列
-
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
-
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
5.2.1 广播模式
1.创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig
package com.woniu.springconsumer.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
// 广播模式
@Bean
public Queue gb(){
return new Queue("gb");
}
@Bean
public Queue gb01(){
return new Queue("gb01");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("gbe");
}
// gb01队列绑定到广播交换机
@Bean
public Binding getGb01ed(){
return BindingBuilder.bind(gb()).to(fanoutExchange());
}
// gb02队列绑定到广播交换机
@Bean
public Binding getGb02ed(){
return BindingBuilder.bind(gb02()).to(fanoutExchange());
}
}
1、实现生产者
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendFM/{msg}")
public String sendFM(@PathVariable String msg){
rabbitTemplate.convertAndSend("gbe", "", msg);
return "消息发送成功";
}
}
创建交换机参数说明:
参数 | 说明 |
---|---|
exchange | 字符串值,交换机名称 |
type | 交换机的类型,有三种类型:FANOUT、DIRECT、TOPIC |
durable | 交换机是否持久化,表示当rabbitmq重启时或者意外宕机,这个交换机还在不在 |
autoDelete | 是否自动删除,表示当该交换机没人发消息时,是否会被自动删除。 |
internal | 内部使用,一般为false |
arguments | 其它参数 |
发送消息参数说明
参数 | 说明 |
---|---|
exchange | 字符串值,交换机名称 |
routingKey | 如果交换机类型是fanout,则routingKey为"" |
props | 消息基本属性配置 |
body | 要发送的消息的内容 |
2、消费方实现
GbListener类
package com.woniu.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "gb")
public class GbListener {
@RabbitHandler
public void getMsg(String msg){
System.out.println("广播消息:" + msg);
}
}
Gb01Listener 类
package com.woniu.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "gb01")
public class Gb01Listener {
@RabbitHandler
public void getMsg(String msg){
System.out.println("广播01消息:" + msg);
}
}
发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
5.2.2 Routing路由模式
路由模式特点:
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) -
消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 -
Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
在编码上与 Publish/Subscribe发布与订阅模式
的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
1、创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// Routing路由模式
@Bean
public Queue zl01(){
return new Queue("zl01");
}
@Bean
public Queue zl02(){
return new Queue("zl02");
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("zle");
}
@Bean
public Binding zlBinding01(){
return BindingBuilder.bind(zl01()).to(directExchange()).with("01");
}
@Bean
public Binding zlBinding02(){
return BindingBuilder.bind(zl02()).to(directExchange()).with("02");
}
}
2、生产方实现
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendD1M/{msg}")
public String sendD1M(@PathVariable String msg){
rabbitTemplate.convertAndSend("zle", "01", msg);
return "success";
}
@RequestMapping("/sendD2M/{msg}")
public String sendD2M(@PathVariable String msg){
rabbitTemplate.convertAndSend("zle", "02", msg);
return "success";
}
}
3.消费方实现
创建2个消费方并启动,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果
Zl01Listener类
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "zl01")
public class Zl01Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("zl01消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
Zl02Listener类
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "zl02")
public class Zl02Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("zl02消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
5.2.3. Topics通配符模式
Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配0个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert
item.*
:只能匹配item.insert
创建RabbitMQ队列与交换机绑定的配置类com..rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// Topics通配符模式
@Bean
public Queue tt01(){
return new Queue("tt01");
}
@Bean
public Queue tt02(){
return new Queue("tt02");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("tte");
}
@Bean
public Binding ttBinding01(){
return BindingBuilder.bind(tt01()).to(topicExchange()).with("#.error");
}
@Bean
public Binding ttBinding02(){
return BindingBuilder.bind(tt02()).to(topicExchange()).with("order.*");
}
}
1、生产方代码实现
使用topic类型的Exchange
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@RequestMapping("/sendT1MT1/{msg}")
public String sendT1MT1(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error", msg);
return "success";
}
@RequestMapping("/sendT1MT2/{msg}")
public String sendT1MT2(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.22.error", msg);
return "success";
}
@RequestMapping("/sendT1MF/{msg}")
public String sendT1MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error.22", msg);
return "success";
}
@RequestMapping("/sendT2MF/{msg}")
public String sendT2MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.11.22", msg);
return "success";
}
@RequestMapping("/sendT2MT/{msg}")
public String sendT2MT(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.1", msg);
return "success";
}
}
2、消费方实现
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "tt01")
public class Tt01Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("tt01消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "tt02")
public class Tt02Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("tt02消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
创建2个消费方并启动,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。