代码编织梦想

一、什么是 Stream 消息驱动

官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/index.html

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过 inputs(消息消费者) 或者 outputs (消息提供者)来与 Spring Cloud Stream 中 binder 对象(绑定器)交互。通过配置来 binding(绑定) ,而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。所以,只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

总的一句话:Spring Cloud Stream 是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

同时,Spring Cloud Stream 目前仅支持 RabbitMQ、Kafka。

二、Spring Cloud Stream 消息驱动的设计思想

讲到这里就不得不提到 标准的 MQ即没有引入 Spring Cloud 之前的最原始的消息中间件 ActiveMQ、RabbitMQ等等:

Spring Cloud Stream 消息驱动

生产者/消费者之间靠消息媒介 Message 传递信息内容。

消息必须走特定的 消息通道 MessageChannel

消息通道 MessageChannel 里的子接口 Subscribable Channel 来消费消息,由 MessageHandler 负责收发处理。

Stream 的消息通信方式遵循的发布-订阅模式。(Outpu:消息生产者,Input:消息消费者)

为什么用 Spring Cloud Stream 消息驱动

比如我们所用到的 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同,像 RabbitMQ 有 exchange 交换机,kafka 有 Topic 主题和 Partitions 分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。

Stream 凭什么可以统一或屏蔽底层差异?

在没有 binder 绑定器这个概念下,我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。而通过定义绑定器 binder 作为中间层,就完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件实现。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

Spring Cloud Stream 的处理架构

Spring Cloud Stream 消息驱动的工作流程

Stream 的工作流程

Source、Sink:可简单的理解为参照对象是 Spring Cloud Stream 自身,从 Stream 发布消息就是输出(作为 Source 源),接收消息就是输入(Sink)。

Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,同时可通过 Channel 对队列进行配置。

Binder:用来连接 MQ 消息中间件(RabbitMQ、Kafka)屏蔽差异。

三、快速构建消息驱动生产者、消费者(集群)

先介绍一下常用的API、注解

Stream 消息驱动

组成说明
MiddleWare中间件,目前只支持 RabbitMQ 和 Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了 Kafka 和 RabbitMQ 的 Binder,
通过 Binder 可以很方便的连接中间件,可以动态的改变消息类型(对应 Kafka 的 Topic
,RabbitMQ 的 exchange),这些都可以通过配置文件来实现。
@Input注解标识输入通道,通过该输入通道接收到消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinder指信道 exchange 和 channel 绑定在一起

1、构建消息驱动之生产者

1)创建一个 cloud-stream-rabbitmq-provider-8801 微服务模块

2)编写 pom.xml 文件

<dependencies>
    <!-- Stream 整合 RabbitMQ -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3)编写 application.yml 配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于 binding整合
          type: rabbit # 消息组件类型
          environment: # 设置 rabbitmq 的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:         # 服务的整合处理
        output:         # 这个名字是一个通道的名称
          destination: studyExchange        # 表示要使用的 Exchange名称定义
          content-type: application/json        # 设置消息类型,本次为 json,文本则设置“text/plain”
          binder: defaultRabbit         # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

4)编写主启动类 StreamMQProviderMain8801.class

/**
 * @Author Herz
 * @Date 2022/1/20 15:03
 */
@SpringBootApplication
public class StreamMQProviderMain8801 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMQProviderMain8801.class, args);
    }
}

5)编写 service 发送消息接口

/**
 *
 * 发送消息的接口
 *
 * @Author Herz
 * @Date 2022/1/20 15:04
 */
public interface IMessageProvider {

    public String send();
}

6)编写 service 的实现类

/**
 *
 * 发送消息的接口的实现类,用于和 消息中间件打交道
 *
 *
 * @Author Herz
 * @Date 2022/1/20 15:04
 */
@EnableBinding(Source.class) // 可以理解为是一个消息的发送管道 消息源 的定义
public class IMessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output; // 消息的发送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();

        // 创建并发送消息
        this.output.send(MessageBuilder.withPayload(serial).build());

        System.out.println("***serial: "+serial);

        return serial;
    }
}

注意!!这里的 MessageChannel 的对象名必须和 yml 配置文件的通道名称相同否则就会报错:expected single matching bean but found 3: output,nullChannel,errorChannel

注意事项

配置注意事项

这两个名称必须相同!!!

7)测试消息生产者是否配置成功:启动主启动类,启动 RabbitMQ,在地址栏输入 http://localhost:15672/并登录

如果在 exchange 里面发现与自己 yml 配置文件中 spring.cloud.stream.binders.output.destination 属性配置的名称一致的交换机名称说明已经配置成功。

RabbitMQ 交换机列表

同时,多次访问 http://localhost:8801/send/message,可在 RabbitMQ 首页波峰的变化。

RabbitMQ 监控界面

2、消息驱动之消费者

1)创建微服务模块 cloud-stream-rabbitmq-consumer-8802

2)编写 pom.xml 文件

<dependencies>
    <!-- Stream 整合 RabbitMQ -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3)编写 application.yml 配置文件

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于 binding整合
          type: rabbit # 消息组件类型
          environment: # 设置 rabbitmq 的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:         # 服务的整合处理
        input:         # 这个名字是一个通道的名称,input 代表消息输入
          destination: studyExchange        # 表示要使用的 Exchange名称定义
                                            # 一定要和 消息驱动生产者的 配置文件中的名称相同
          content-type: application/json        # 设置消息类型,本次为 json,文本则设置“text/plain”
          binder: defaultRabbit         # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

4)编写主启动类 StreamMQConsumerMain8802.class

/**
 * @Author Herz
 * @Date 2022/1/20 16:40
 */
@SpringBootApplication
public class StreamMQConsumerMain8802 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMQConsumerMain8802.class, args);
    }
}

5)编写 用来接收消息的 service

/**
 * @Author Herz
 * @Date 2022/1/20 16:41
 */
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(
            // this.output.send(MessageBuilder.withPayload(serial).build()); 消息驱动生产者的 service 中创建并发送消息
            // Message<T>  的泛型就取决于 withPayload 的消息类型。
            Message<String> message){
        System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t  port: "+serverPort);
    }
}

6)启动主启动类并测试:浏览器输入地址 http://localhost:8801/send/message,刷新 5 次,看控制台结果

消息驱动生产者:

消息驱动生产者的控制台打印

消息驱动消费者:

消息驱动消费者的控制台打印

3、故障1:重复消费问题(重点)

当消费者是集群的时候,面临一个很严重的问题:重复消费问题。

根据 cloud-stream-rabbitmq-consumer-8802 模块再构建一个 cloud-stream-rabbitmq-consumer-8803 消息驱动消费者 模块 构成消费者集群。

为什么说 重复消费 是一个很严重的问题呢???

比如,在以下场景:订单系统为集群部署,都需要从 RabbitMQ 中获取订单消息,如果在商城系统中下的一个订单信息同时被两个服务获取到,就会导致该订单在支付金额时重复扣款,关于钱这方面肯定是很严重的问题!!所以必须要避免这种情况。

商城订单案例

那么怎样避免这种情况呢?这个时候 Stream 中的消息分组就起到了作用。因为在 Stream 中处于同一个 Group 中的多个消费者是竞争关系,就能够保证同一条消息只会被其中一个应用消费一次。而不同组是可以全面消费的(即重复消费)。

Stream 消息分组(Group)解决重复消费问题

原理:微服务应用放置于同一个 group 中,就能够保证消息只会被其中一个应用消费一次。

配置实现:只需要在 yml 配置文件中添加如下一行配置即可(group 名相同即可实现同一分组)

group: testA

添加位置如图所示:

给 Stream消息驱动消费者(集群)添加分组

两个消费者的 yml 配置文件添加完后重启后,可在 RabbitMQ 中查看:

RabbitMQ 面板信息

4、故障2:消息错过

Stream 之消息持久化(消息分组实现)

一方面消息分组不仅可以解决消息的重复消费问题,另一方面也可以实现消息持久化,解决消息错过的问题。

那么,何为消息错过呢???即当消息驱动的消费者中途服务出现故障或者其他的原因暂停服务后,等待可重启后不会接收到这个期间消息驱动的生产者所提供的所有消息,这就是消息错过。(在未配置分组时)

解决办法:给消息驱动的消费者添加分组,这样也就实现了消息的持久化。

即在该微服务的 yml 配置文件中添加如下一行配置信息:

# 分组名可自定义
group: testA

添加的位置如图所示:
给 Stream 消息驱动消费者(集群)添加分组

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/tf835991342/article/details/122720850

springcloud stream消息驱动模块(使用kafka)_芸灵fly的博客-爱代码爱编程

说明 之前我们使用的是RabbitMQ与Stream的结合完成消息驱动模块,这次使用Kafka与Stream。 目标 本文的目的在于结合Kafka与Stream来处理消息通信,采取自定义编写Sink(input)和Source(output)来设置多通道消息和消费组、消费分区等操作实现基本的消息驱动的微服务架构。 快速开始 环境:服务器端java 1.8、

spring cloud 应用篇 之 spring cloud stream(消息驱动)_hubo_88的博客-爱代码爱编程_spring.cloud.stream.bindings

(一)简介 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring C

[stream发送] springcloudstream消息驱动的消息发送者配置_朱同学的博客-爱代码爱编程_根据配置查到消息发送者

搭配使用 logback日志配置: https://blog.csdn.net/a755199443/article/details/92208902 Eureka单机服务端配置: https://blog.csdn.ne

[stream接收] springcloudstream消息驱动的消息接收者配置_朱同学的博客-爱代码爱编程

搭配使用 logback日志配置: https://blog.csdn.net/a755199443/article/details/92208902 Eureka单机服务端配置: https://blog.csdn.ne

Spring Cloud Stream消息驱动之RocketMQ入门(一)-爱代码爱编程

SpringCloudStream目前支持的中间件有RabbitMQ、Kafka,还有我最近在学习的RocketMQ,以下是我学习的笔记学习Spring cloud Stream 可以先学习一下了解 Spring Messaging 和 Spring Integration, 先看看Spring Message 消息的模型 Messaging 对应

Spring Cloud Stream消息驱动@SendTo和消息降级-爱代码爱编程

参考程序员DD大佬的文章,自己新建demo学习学习,由于需要消息回执,看到了@SendTo这个注解能够实现,下面开始学习demo,新建两个项目 cloud-stream-consumer 消费端 和 cloud-stream-consumer 生产端 public interface StreamReceive { @Input("MQRe

SpringCloud Stream消息驱动之生产者-爱代码爱编程

所有代码都在github上:https://github.com/demonruin/cloud2020/tree/master   本文演示是的SpringCloud Stream的消息生产者,和RabbitMQ进行配合 1、构建生产者项目cloud-stream-rabbitmq-provider8801,添加pom文件依赖 <depen

SpringCloud Stream消息驱动之消费者-爱代码爱编程

所有代码都在github上:https://github.com/demonruin/cloud2020/tree/master   本文演示是的SpringCloud Stream的消息生产者,和RabbitMQ进行配合,基本上与生产者一样 1、构建生产者项目cloud-stream-rabbitmq-consumer8802,添加pom文件依赖

SpringCloud Stream消息驱动之分组消费与持久化-爱代码爱编程

所有代码都在github上:https://github.com/demonruin/cloud2020/tree/master   为了演示分组消费和持久化,现在clone一份消息驱动消费者consumer8003,是基于consumer8802来clone的,只需要该端口号等信息即可。等consumer8803构建完毕后,启动如下项目: Rabb

SpringCloud2020学习笔记13——SpringCloud Stream消息驱动-爱代码爱编程

目录 一、消息驱动概述1、简介2、官网2、设计思想① 标准MQ② 为什么用Cloud Stream③ Stream中的消息通信方式遵循了发布-订阅模式3、Spring Cloud Stream标准流程套路① Binder② Channel③ Source和Sink4、编码API和常用注解二、案例说明1、工程中新建三个子模块① cloud-strea

SpringCloud核心组件之 Spring Cloud Stream消息驱动组件-爱代码爱编程

SpringCloud核心组件之 Spring Cloud Stream消息驱动组件 在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中

【SpringCloud】Spring Cloud Stream消息驱动-爱代码爱编程

文章目录 Spring Cloud Stream是什么设计思想Spring Cloud Stream的使用消息驱动之生产者消息驱动之消费者分组消费与持久化 Spring Cloud Stream是什么 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。 应用程序通过inputs或者 outputs来与Sprin

SpringCloud 2020-SpringCloud Stream消息驱动(笔记)-爱代码爱编程

上一篇:SpringCloud 2020-SpringCloud config分布式配置中心(笔记) SpringCloud Stream消息驱动 1、消息驱动概述1.1 简介1.2 设计思想1.3 Spring Cloud Stream标准流程套路1.4 编码API和常用注解2、案例说明3、消息驱动之生产者4、消息驱动之消费者5、分组消费与持久化

SpringCloud-SpringCloud Stream消息驱动之多消费者(Day10)-爱代码爱编程

架构解析 我们目前的架构是一个消费者对应一个生产者,我们需要设置多个消费者,其中需要确定的问题是,是否会有消息重复消费的问题以及消息持久化的问题。由于我们之前已经创建了一个信息消费者cloud-stream-rabbitmq-consumer8802,我们参照这个消费模块创建cloud-stream-rabbitmq-consumer8803。由于过程不