springcloudalibaba架构(21):rocketmq消息生产和消费_逆水行舟没有退路的博客-爱代码爱编程
前言
RocketMQ生产消息和消费消息
环境准备
安装RocketMQ: springboot基础(48): RocketMQ的安装
安装RocketMQ console :消息队列(3):RocketMQ控制台安装
第一节 生产消息
- 引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
- 编写测试用例
package com.lcz;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RocketMqTest {
@Test
public void testSend() throws Exception {
// 创建消息生产者,设置生产者组名
DefaultMQProducer producer=new DefaultMQProducer("myproducergroup");
// 为生产者设置NameServer
producer.setNamesrvAddr("127.0.0.1:9876");
//启动生产者
producer.start();
//构建消息
Message message=new Message("mytopic","tag222","hello world".getBytes());
//发送消息
SendResult result = producer.send(message, 5000);
System.out.println(result);
//关闭生产者
producer.shutdown();
}
}
- 运行测试用例,生产消息(注意,RocketMQ console控制台显示两条消息是因为运行了两次)
查看RocketMQ console主题和消息。
第二节 消费消息
- 编写测试用例
@Test
public void test() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("mytopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("收到的消息:" + list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回状态是消费成功还是消费失败
}
});
consumer.start();
System.out.println("启动consumer成功");
TimeUnit.HOURS.sleep(1);
}
运行启动,收到一条消息。
传送门
真实开发中,上面的代码显的很繁琐和不优雅。所以基于springboot整合RocketMq实现更加简单方便和便捷。
springboot基础(49): 整合RocketMQ