代码编织梦想

前言

微服务的架构越来越流行, 很多老旧项目面临着解耦重构, 复杂项目的解耦通常会引入一些中间件来帮助我们更好的完成工作, 本章, 我们就来通过实例了解下消息中间件的用法.

市面上比较流行的消息中间件如下
image.png

因为鱼哥的项目上了阿里的云, 所以选择很简单, 就用RocketMQ即可, 看官们根据实际情况择优选择.

RocketMQ

注意, 本文使用了4.0sdk,截止到文章发表, ali已推出5.0SDK
关于阿里的RocketMQ的介绍官方有详细的文档, 这里就不啰嗦了

官方已SDK提供了简单的接入方式, 需要注意的是, 官方提供了两种协议的SDK

image.png

  • HTTP协议
    采用RESTful风格,方便易用,快速接入,跨网络能力强. 支持Java、C++、.NET、Go、Python、Node.js和PHP七种语言客户端

  • TCP协议
    区别于HTTP简单的接入方式,提供更为专业、可靠、稳定的TCP协议的SDK接入服务. 支持的语言包括Java、C/C++ 以及.NET

注意, 使用的话需要自费开通对应的服务, 通过后才能使用, 大部分管理功能都能在阿里云的控制台中找到.

实例

下面我们就springboot来实际使用下RocketMQ.
为了较好的使用, 我们同时演示tcp何http两种方式

pom引入依赖

<!-- tcp sdk -->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>${ali-mq-tcp.version}</version>
</dependency>
<!-- http sdk -->
<dependency>
    <groupId>com.aliyun.mq</groupId>
    <artifactId>mq-http-sdk</artifactId>
    <version>${ali-mq-http.version}</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

接入tcp方式

定义mq配置类

@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "aliyun.rocketmq.tcp")
public class MqTcpProperties {

    //AccessKey ID,身份验证,在RAM控制台创建
    private String accessKeyId;

    //AccessKey Secret,身份验证,在RAM控制台创建
    private String accessKeySecret;

    //实例TCP协议接入地址(内网)
    private String nameSrvAddr;

    //发送超时时间: 3s
    private String sendMsgTimeoutMillis = "3000";

    //线程数目:默认20
    private String consumeThreadNums = "20";

    //订阅方式: 集群
    private String messageModel = "CLUSTERING";


    public Properties getMqProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId);
        properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        // 设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
        // 设置消费者线程数为20个(默认20)
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);
        // 订阅方式设置
        properties.put(PropertyKeyConst.MessageModel, this.messageModel);
        return properties;
    }
}

定义Topic配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq.tcp")
public class MqTcpTopicProperties {
    private String topic;
    private String groupId;
    private String tag;
}

封装一个工具类

@Slf4j
public class MqTcpUtil {

    @Autowired
    private ProducerBean producer;

    @Autowired
    private MqTcpProperties mqTcpProperties;

    @Autowired
    private MqTcpTopicProperties mqTcpTopicProperties;

    private ConsumerBean consumerBean;

    /**
     * 同步发送消息 - 配置默认topic
     *
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes());
        return this.send(msg, Boolean.FALSE);
    }

    /**
     * 同步发送消息 - 配置默认topic - 重试次数
     *
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @param retryTimes  重试次数,注意实际请求次数为 retryTimes + 1
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String msgTag, String messageBody, String msgKey, Integer retryTimes) {
        Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes());
        SendResult result = this.send(msg, Boolean.FALSE);
        if (ObjectUtil.isNotEmpty(result) || retryTimes == 0) {
            return result;
        }
        return this.sendMsg(msgTag, messageBody, msgKey, --retryTimes);
    }

    /**
     * 同步发送消息
     *
     * @param topic       topic名
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String topic, String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes());
        return this.send(msg, Boolean.FALSE);
    }

    /**
     * 同步发送单向消息
     *
     * @param topic       topic名
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     */
    public void sendOneWayMsg(String topic, String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes());
        this.send(msg, Boolean.TRUE);
    }


    /**
     * 发送普通消息
     *
     * @param msg      消息
     * @param isOneWay 是否单向发送
     */
    private SendResult send(Message msg, Boolean isOneWay) {
        try {
            if (isOneWay) {
                //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
                //若数据不可丢,建议选用同步或异步发送方式。
                producer.sendOneway(msg);
                success(msg, "单向消息MsgId不返回");
                return null;
            } else {
                //可靠同步发送
                SendResult sendResult = producer.send(msg);
                //获取发送结果,不抛异常即发送成功
                assert sendResult != null;
                success(msg, sendResult.getMessageId());
                return sendResult;
            }
        } catch (Exception e) {
            error(msg, e);
            return null;
        }
    }

    /**
     * 成功日志打印
     *
     * @param msg
     * @param messageId
     */
    private void success(Message msg, String messageId) {
        log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody()));
    }

    /**
     * 异常日志打印
     *
     * @param msg
     * @param e
     */
    private void error(Message msg, Exception e) {
        log.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
                , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody()));
        log.error("errorMsg", e);
    }


    @PostConstruct
    public void init() {
        log.info("[Init] tcp consumerBean init");
        consumerBean = new ConsumerBean();
    }

    public ConsumerBean getConsumer() {
        return consumerBean;
    }

    public ConsumerBean getDefaultConsumer(MessageListener messageListener) {
        //配置文件
        Properties properties = mqTcpProperties.getMqProperties();
        //消费者
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqTcpTopicProperties.getGroupId());
        //设置消费者线程数为20个(默认20)
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, mqTcpProperties.getConsumeThreadNums());
        // 广播订阅方式设置
        properties.put(PropertyKeyConst.MessageModel, mqTcpProperties.getMessageModel());
        consumerBean.setProperties(properties);

        //订阅消息
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        //订阅消息
        Subscription smsSubscription = new Subscription();
        smsSubscription.setTopic(mqTcpTopicProperties.getTopic());
        smsSubscription.setExpression(mqTcpTopicProperties.getTag());
        subscriptionTable.put(smsSubscription, messageListener);
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

自动注入类

/**
 * rocketMq服务自动配置类
 * 业务系统要使用rocketMq服务服务,需要在配置文件中增加如下配置
 * 【
 * aliyun.rocketmq.tcp.enable=true
 * aliyun.rocketmq.tcp.accessKeyId=
 * aliyun.rocketmq.tcp.accessKeySecret=
 * aliyun.rocketmq.tcp.nameSrvAddr=
 * aliyun.rocketmq.tcp.default.topic=
 * aliyun.rocketmq.tcp.default.groupId=
 * aliyun.rocketmq.tcp.default.tag=
 */
@Slf4j
@Configuration
@EnableConfigurationProperties({MqTcpProperties.class, MqTcpTopicProperties.class})
@ConditionalOnClass({ProducerBean.class, ConsumerBean.class})
@ConditionalOnProperty(prefix = "aliyun.rocketmq.tcp", value = "enable", havingValue = "true")
public class MqTcpDefaultAutoConfiguration {

    @Autowired
    private MqTcpProperties mqTcpProperties;

    @PostConstruct
    public void init() {
        log.info("[Auto Config] MqTcpDefaultAutoConfiguration loading......");
    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    @ConditionalOnMissingBean
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqTcpProperties.getMqProperties());
        return producer;
    }

    @Bean
    public MqTcpUtil mqTcpUtil() {
        return new MqTcpUtil();
    }

}

核心功能基本完成, 我们可以通过MqTcpUtil执行消息的发送了
新增个测试类

@Test
public void mqTcpTest() {
    // 自定义一条body内容
    JSONObject body = new JSONObject();
    UUID uuid = UUID.randomUUID();
    body.put("notice", "这是一条tcp通知类信息");
    //同步发送消息-不带返回值的(一般使用该方法)
    log.info(String.valueOf(mqTcpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid))));
}

最后我们加上Ali RocketMq的配置即可

## AK
aliyun.rocketmq.tcp.enable=true
aliyun.rocketmq.tcp.accessKeyId=
aliyun.rocketmq.tcp.accessKeySecret=

## TCP
aliyun.rocketmq.tcp.nameSrvAddr=
aliyun.rocketmq.tcp.sendMsgTimeoutMillis=3000
aliyun.rocketmq.tcp.consumeThreadNums=20
aliyun.rocketmq.tcp.messageModel=CLUSTERING

## topic
aliyun.rocketmq.tcp.topic=
aliyun.rocketmq.tcp.groupId=
aliyun.rocketmq.tcp.tag=

消费者

以上完成了核心功能的接入与开发, 我们已经能正常发送MQ消息了, 通俗点讲就是生产者就绪了, 我们还需要消费者去消费这些消息.

合理的拆分是生产者和消费者分服务部署, 这里为了演示, 鱼哥就直接自产自销了.

定义消费者监听器MqMessageListener

@Slf4j
@Component
public class MqMessageListener implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        log.info("接收到MQ详细信息:{}", message);
        log.info("解析MQ-Body自定义内容:{}", new String(message.getBody()));
        try {
            //do something..
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("消费MQ消息失败,msgId:" + message.getMsgID() + ",ExceptionMsg:" + e.getMessage());
            return Action.ReconsumeLater;
        }
    }
}

申明消费者类, 这里我们定义两种消费者, 一种是默认配置的, 一种是主定义配置的,两者选一即可

@Component
public class MqConsumerClient {

//    @Autowired
//    private MqTcpProperties mqTcpProperties;

    @Autowired
    private MqMessageListener mqMessageListener;

//    @Autowired
//    private MqTcpTopicProperties mqTcpTopicProperties;

    @Autowired
    private MqTcpUtil mqTcpUtil;

//    //自定义消费者
//    @Bean(initMethod = "start", destroyMethod = "shutdown")
//    public ConsumerBean messageBuildConsumer() {
//        ConsumerBean consumerBean = mqTcpUtil.getConsumer();
//        //配置文件
//        Properties properties = mqTcpProperties.getMqProperties();
//        //消费者
//        properties.setProperty(PropertyKeyConst.GROUP_ID, mqTopicProperties.getGroupId());
//        //设置消费者线程数为20个(默认20)
//        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
//        consumerBean.setProperties(properties);
//        //订阅消息
//        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
//        // 广播订阅方式设置
//        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
//        //订阅消息
//        Subscription smsSubscription = new Subscription();
//        smsSubscription.setTopic(mqTopicProperties.getTopic());
//        smsSubscription.setExpression(mqTopicProperties.getTag());
//        subscriptionTable.put(smsSubscription, mqMessageListener);
//        consumerBean.setSubscriptionTable(subscriptionTable);
//        return consumerBean;
//    }

    // 默认消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean defaultConsumer() {
        ConsumerBean consumerBean = mqTcpUtil.getDefaultConsumer(mqMessageListener);
        return consumerBean;
    }
}

最后启动服务, 即可看到消息消费的输出信息

image.png
image.png

接入http方式

定义mq配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq.http")
public class MqHttpProperties {

    //AccessKey ID,身份验证,在RAM控制台创建
    private String accessKeyId;

    //AccessKey Secret,身份验证,在RAM控制台创建
    private String accessKeySecret;

    //实例TCP协议接入地址(内网)
    private String nameSrvAddr;

    //实例TCP协议接入地址(内网)
    private String sendMsgTimeoutMillis = "3000";

    //线程数目:默认20
    private String consumeThreadNums = "20";

    //线程数目:默认20
    private String messageModel = "CLUSTERING";

    public Properties getMqProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId);
        properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        // 设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
        // 设置消费者线程数为20个(默认20)
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);
        // 广播订阅方式设置
        properties.put(PropertyKeyConst.MessageModel, this.messageModel);
        return properties;
    }

定义Topic配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq.http")
public class MqHttpTopicProperties {
    private String topic;
    private String groupId;
    private String tag;
}

定义操作工具类

@Slf4j
public class MqHttpUtil {

    @Autowired
    private MqHttpProperties mqHttpProperties;

    @Autowired
    private MqHttpTopicProperties mqHttpTopicProperties;

    private MQProducer producer;

    private MQConsumer consumer;

    private MQClient mqClient;

    @PostConstruct
    public void init() {
        log.info("[Init] http producer init");
        mqClient = new MQClient(
                // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
                mqHttpProperties.getNameSrvAddr(),
                // AccessKey ID,身份验证,在阿里云RAM控制台创建。
                mqHttpProperties.getAccessKeyId(),
                // AccessKey Secret,身份验证,在阿里云RAM控制台创建。
                mqHttpProperties.getAccessKeySecret()
        );
    }

    /**
     * 同步发送消息
     *
     * @param topic       topic名
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return success:SendResult or error:null
     */
    public TopicMessage sendMsg(String topic, String msgTag, String messageBody, String msgKey) {
        producer = mqClient.getProducer(topic);
        TopicMessage msg = new TopicMessage(
                messageBody.getBytes(),
                msgTag);
        msg.setMessageId(msgKey);

        try {
            //可靠同步发送
            TopicMessage sendResult = producer.publishMessage(msg);
            //获取发送结果,不抛异常即发送成功
            assert sendResult != null;
            success(topic, msg);
            return sendResult;

        } catch (Exception e) {
            error(topic, msg, e);
            return null;
        }
    }

    public MQClient getMqClient() {
        return mqClient;
    }

    public MQConsumer getDefaultConsumer() {
        consumer = mqClient.getConsumer(mqHttpTopicProperties.getTopic(), mqHttpTopicProperties.getGroupId(), mqHttpTopicProperties.getTag());
        return consumer;
    }


    /**
     * 成功日志打印
     *
     * @param msg
     */
    private void success(String topic, TopicMessage msg) {
        log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , tag:{}, body:{}"
                , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString());
    }

    /**
     * 异常日志打印
     *
     * @param msg
     * @param e
     */
    private void error(String topic, TopicMessage msg, Exception e) {
        log.error("发送MQ消息失败-- Topic:{}, msgId:{}, tag:{}, body:{}"
                , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString());
        log.error("errorMsg", e);
    }

自动注入

/**
 * rocketMq服务自动配置类
 * 业务系统要使用rocketMq服务服务,需要在配置文件中增加如下配置
 * 【
 * aliyun.rocketmq.http.enable=true
 * aliyun.rocketmq.http.accessKeyId=
 * aliyun.rocketmq.http.accessKeySecret=
 * aliyun.rocketmq.http.nameSrvAddr=
 * aliyun.rocketmq.http.default.topic=
 * aliyun.rocketmq.http.default.groupId=
 * aliyun.rocketmq.http.default.tag=
 */
@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "aliyun.rocketmq.http", value = "enable", havingValue = "true")
public class MqHttpDefaultAutoConfiguration {

    @PostConstruct
    public void init() {
        log.info("[Auto Config] MqHttpDefaultAutoConfiguration loading......");
    }

    @Bean
    public MqHttpUtil mqHttpUtil() {
        return new MqHttpUtil();
    }

}

最后测试下

    @Test
    public void mqHttpTest() {
        // 自定义一条body内容
        JSONObject body = new JSONObject();
        UUID uuid = UUID.randomUUID();
        body.put("notice", "这是一条http通知类信息");
        //同步发送消息-不带返回值的(一般使用该方法)
        log.info(String.valueOf(mqHttpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid))));
    }

最后增加配置

#***********************MQ-http*********************************
## AK
aliyun.rocketmq.http.enable=true
aliyun.rocketmq.http.accessKeyId=
aliyun.rocketmq.http.accessKeySecret=

## HTTP
aliyun.rocketmq.http.nameSrvAddr=
aliyun.rocketmq.http.sendMsgTimeoutMillis=3000
aliyun.rocketmq.http.consumeThreadNums=20
aliyun.rocketmq.http.messageModel=CLUSTERING

## topic
aliyun.rocketmq.http.topic=topic
aliyun.rocketmq.http.groupId=group_dev
aliyun.rocketmq.http.tag=tag_dev

运行测试
image.png
image.png

项目地址

注意: 案例需要配置你自己申请的RocketMQ配置,否则无法启动.

https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-alibaba-rocketmq

请关注我的订阅号

订阅号.png

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/MrCoderStack/article/details/126287453

springboot的学习记录_3075763007的博客-爱代码爱编程_http error 500 开发 localhost

微服务的介绍 源码链接 更多整合 微服务:每一个功能元素最终都是一个可独立替换和独立升级的软件单元; 开始的helloworld: 1创建一个maven工程: 2添加依赖 <parent> <groupId>org.springframework.boot</groupId> <ar

33.1从零开始学springboot-一文读懂消息队列-发布订阅(附redis实现)-爱代码爱编程

前言 实际生产中,我们经常会碰到这样的场景: 业务方触发了某些预料之中的bug,(比如项目中调用了第三方的服务,但是第三方的服务出问题导致无法访问,这类错,我们往往不会直接提示用户,而是选择屏蔽此类错误,写入错误日志),我们处理此类bug往往需要去生产导出日志记录,然后排查,最后找到第三方服务的提供者去解决问题. 那么,与其等“被动”业务反馈,能不能让

RocketMQ4.X消息队列详细笔记-爱代码爱编程

人不能没有批评和自我批评 那样一个人就不能进步。 目录 JMS和消息中间件介绍JMS消息服务和使用场景消息中间件常见概念和编程模型主流消息队列和技术选型讲解基础介绍和阿里云服务器快速部署RocketMQ4.x消息队列介绍阿里云Linux服务器安装RocketMQ步骤整合Springboot2.X实战实战发送消息实战接受消息高级篇幅之集群

【ReactJs+springBoot项目——租房】第7章:RocketMQ核心概念+RocketMQ的api使用+producer和consumer进行详解-爱代码爱编程

了解什么RocketMQ 了解RocketMQ的核心概念 动手安装RocketMQ服务 快速入门,掌握RocketMQ的api使用 对producer、consumer进行详解 了解RocketMQ的存储特点 1、RocketMQ简介与安装 1.1、RocketMQ简介 Apache RocketMQ是一个采用Java语言开发的分布式的消息系统,

SpringCloudAlibaba看的某马视频笔记-爱代码爱编程

----------------------------SpringCloudAlibaba---------------------------- SpringCloudAlibaba组件说明: ·服务注册与发现: 适配SpringCloud服务注册与发现标准,默认集成了Ribbon的支持 ·服务限流降级: 默认支持Servlet、Feign、RestT

视频教程-用Java从零开始开发一个物联网项目-物联网技术-爱代码爱编程

用Java从零开始开发一个物联网项目 多年的产品设计和开发经验,带领团队完成多个知名产品。历任多家大型公司的Java架构师,对知名框架的源码均有深入研究。拥有IT一线开发、教学10多年的实战经验,能充分利用自己的优势,把学员应该掌握的知识和企业需求人

视频教程-SpringBoot2.X版本优惠券实战整合Dubbo+Rocketmq+Redis-其他-爱代码爱编程

SpringBoot2.X版本优惠券实战整合Dubbo+Rocketmq+Redis 7年的开发架构经验,曾就职于国内一线互联网公司,开发工程师,现在是某创业公司技术负责人, 擅长语言有node/java/python,专注于服务端研发,人工智能相关

消息队列 ~ RocketMQ ~ 从入门到入坑。-爱代码爱编程

消息队列 ~ RocketMQ ~ 从入门到入坑。 文章目录 消息队列 ~ RocketMQ ~ 从入门到入坑。下载。使用。shutdown。测试发送消息。集群。集群模式。单 Master模式。多 Master 模式。多 Master 多 Slave 模式(异步)。多 Master 多 Slave 模式(同步)。双主双从集群搭建。集群特点。集群工

RocketMQ详细配置与使用-爱代码爱编程

文章目录 一、MQ介绍1.1 为什么要用MQ1)应用解耦2)流量削峰3)数据分发1.2 MQ的优点和缺点1.3 各种MQ产品的比较二、RocketMQ快速入门2.1 准备工作2.1.1 下载RocketMQ2.2.2 环境要求2.2 安装RocketMQ2.2.1 安装步骤2.2.2 目录介绍2.3 启动RocketMQ2.4 测试RocketMQ

消息中间件ActiveMQ/RabbitMQ/RocketMQ/Kafka从入门到精通-爱代码爱编程

消息队列是一种系统间相互协作的通信机制,使用消息队列的场景一般有异步处理、解耦、流量削峰、日志收集、事务最终一致性这几种,主要解决诸如消息堆积、消息持久化、可靠投递、消息重复、严格有序、集群等各种问题,目前的消息中间件种类有很多,比如ActiveMQ/RabbitMQ/RocketMQ/Kafka,我司使用的中间件为包裹RockerMQ的ZMQ,以及

rocketmq 消息指定_消息队列之 RocketMQ-爱代码爱编程

简介 RocketMQ 特点 RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。其主要特

消息队列:基于 RocketMQ 实现服务异步通信-爱代码爱编程

消息队列与 RocketMQ 消息队列 MQ 消息队列(Message Queue)简称 MQ,是一种跨进程的通信机制,通常用于应用程序间进行数据的异步传输,MQ 产品在架构中通常也被叫作“消息中间件”。它的最主要职责就是保证服务间进行可靠的数据传输,同时实现服务间的解耦。 看一个项目的实际案例,假设市级税务系统向省级税务系统上报本年度税务汇总数据,

RocketMQ(分布式消息队列)-爱代码爱编程

RocketMQ MQ(Message Queue):消息队列 基本概念 消息模型(Message Model): RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个

SpringBoot讲义-爱代码爱编程

SpringBoot 文档更新日志 版本更新日期操作描述v1.02021/11/14A基础篇前言 ​ 很荣幸有机会能以这样的形式和互联网上的各位小伙伴一起学习交流技术课程,这次给大家带来的是Spring家族中比较重要的一门技术课程——SpringBoot。一句话介绍这个技术,应该怎么说呢?现在如果开发Spring程序不用SpringBoot那就是给自

黑马程序员SpringBoot2全套教程学习笔记-爱代码爱编程

介绍 代码仓库地址:https://gitee.com/CandyWall/spring-boot-study 跟着黑马程序员spring boot教程做的学习笔记,本笔记跟视频内容的项目名称和代码略有不同,都是基于我自己的考量,代码都已经过运行验证过的,仅供参考。 视频教程地址:https://www.bilibili.com/video/BV15b

面试java高级工程师之rocketmq总结_code space的博客-爱代码爱编程

一、rocketmq是什么?为什么要用? 1、消息中间件,具有高性能、低延迟和高可靠特性 2、主要用来系统解耦、流量削锋、异步处理以提升系统性能 二、使用rocketmq有哪些缺点? 1、系统可用性降低 2、系统复杂度提高 3、事务一致性问题 三、rocketmq和kafka有什么区别? 1、单机吞吐量:两个都是10万级别,高吞吐量