rabbitmq入门-爱代码爱编程
生产者:
就是投递消息的一方。
消息一般可以包含2个部分:消息体和标签(Label)。消息体可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑的结构的数据,比如一个Json字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表达这条消息,比如一个交换器的名称和一个路由键。生产都把消息将由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者。
消费者:
就是接收消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
Broker:消息中间件的服务节点。
对于RabbitMQ来说,一个RabbitMQ Brokerr可以简单的看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。也可看成为一台RabbitMQ服务器。
队列:是RabbitMQ的内部对象,用于存储消息。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
交换器:Exchange
生产者将消息发送到Exchange交换器,由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许或直接丢弃。
RabbitMQ运转流程
生产者发送消息的时候
1)生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
2)生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等。
3)生产者声明一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等
4)生产者通过路由键将交换器和队列绑定起来。
5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
6)相应的交换器根据接收到的路由键查找相匹配的队列。
7)如果找到,则将从生产者发送过来的消息存入相应的队列中。
8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者。
9)关闭信道。
10)关闭连接。
消费者接收消息的过程:
1)消费者连接到RabbitMQ Broker,建立一个连接Connection,开启一个信道Channel。
2)消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及作一些准备工作。
3)等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息。
4)消费者确认Ack接收到的消息。
5)RabbitMQ从队列中删除相应已经被确认的消息。
6)关闭信道。
7)关闭连接。
AMQP协议介绍
RabbitMQ就是AMQP协议的Erlang的实现。
AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送到交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey与绑定BindingKey相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。
AMQP协议包括三层
1)Module Layer:
位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅消费一个队列中的消息。
2)Session Layer:
位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端子,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
3)Transport Layer:
位于最底翅,主要传输二进制数据流,提供提供帧的处理、信道复用、错误检测和数据表示等。
简洁版消费者代码:
Connection connection = factory .n ewConnection() // 创建连接
Channel channel = connection.createChannel() // 创建信道
String message = "Hello World! ";
channel.basicPublish(EXCHANGE NAME , ROUTING KEY ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//关闭资源
channel.close() ;
connection . close();
简洁版消费者代码:
Connection connection = factory.newConnection(addresses);// 创建连接
final Channel channel = connection createChannel() // 创建信道
Consumer consumer = new DefaultConsumer(channel) ()//_省略实现
channel . basicQos(64) ;
channel.basicConsume(QUEUE NAME , consumer) ;
//等待回调函数执行完毕之后,关闭资源
TimeUnit . SECONDS . sleep(5) ;
channel . close();
connection.close();