消息驱动Stream

一、基础概要

1、消息驱动Stream是什么?

屏蔽底层消息中间件MQ的差异,降低切换成本,统一消息的编程模型;

SpringCloud Stream是一个构件消息驱动的微服务框架;

应用程序通过inputs或者outputs来与SpringCloud Stream中binder对象交互

通过我们配置来binding(绑定),而SpringCLoud Stream的binder对象负责与消息中间件交互

所以,我们只需要搞清楚如何与SpringCloud Stream交互就可以方便使用消息驱动的方式;

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动,

SpringCloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念;

目前仅支持RabbitMQ、Kafka!

2、SpringCloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,他建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组及消息分区这三个核心概念!

以下是一个SpringCloud Stream的中文指导手册:

https://m.wang1314.com/doc/webapp/topic/20971999.html

3、传统标准MQ设计思想:

51.jpg

  • Message: 生产者/消费者之间靠消息媒介传递信息内容;

  • MessageChannel:消息必须走特定的通道;

  • 消息通道里的消息如何被消费呢,谁负责收发消息?——消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅!

4、为什么引入Cloud Stream?

比方我们系统中同时用到了两个MQ,RabbitMQ和Kafka,由于这两种消息中间件的架构上有所不同,

比如RabbitMQ有exchange,kafka有topic和Partitions分区;

52.jpg

这些中间件的差异性,给我们实际项目开发造成了一定的困扰,我们如果用了两个消息中间件中的一种,后面的业务需求,我们想往另外一种消息中间件迁移时,这时候无疑是灾难性的,一大堆东西都要推倒重做,因为它与我们系统高耦合了,而Springcloud Stream就给我们提供一种解耦合的方式!

5、Stream凭什么可以屏蔽底层差异?

在没有绑定器这个概念的时候,我们的SpringBoot应用要直接与消息中间件进行消息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性;

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离;

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现;

6、Binder(绑定)

  • Input    对应于消费者;

  • Output 对应于生产者

Stream对消息中间件的进一步封装,可以做到代码层对中间件的无感知,甚至于动态的切换中间件(RabbitMQ切换为Kafka),使得微服务开发的高度解耦,服务可以更多地关注自己的业务流程;

53.jpg

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离

7、消息通讯方式:

Stream中的消息通讯方式遵循了 发布-订阅 模式

使用topic主题进行广播在RabbitMQ中就是Exchange,在Kafka中就是Topic;


二、SpringCloud Stream的标准流程套路及常用API、注解

54.jpg

  • Binder:很方便的连接中间件,屏蔽差异;

  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置;

  • Source和Sink:简单的可理解为:参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接收消息即使输入;

常用的注解:

  • Middleware:中间件,目前只支持RabbitMQ和Kafka;

  • Binder:是应用与消息中间件之间的封装,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic、RabbitMQ的exchange),这些都是通过配置文件来实现的;

  • @Input:标识输入通道;

  • @Output:标识输出通道;

  • @StreamListener:监听队列,用于消费者从队列接收消息;

  • @EnableBinding:指信道Channel和exchange绑定在一起;


三、项目实战——消息生产者

1、创建消息生产者模块 cloud-stream-rabbitmq-provider8801

pom.xml:

<dependencies>
    <!--stream-rabbitmq-->    
    <dependency>    
        <groupId>org.springframework.cloud</groupId>    
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>    
    </dependency>    

    <!--eureka-client-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <!--公共服务模块依赖-->
    <dependency>
        <groupId>com.jiguiquan.springcloud</groupId>
        <artifactId>cloud-api-commons</artifactId>
        <version>${project.version}</version>
    </dependency>

    <!--springboot项目web和actuator最好一起走-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!--热部署devtools-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>

    <!--lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!--测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

2、配置文件application.yml:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此处配置要绑定的RabbitMQ的服务信息
        defaultRabbit: #表示定义的名称,用于与binding整合
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        output:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2   #心跳时间间隔
    lease-expiration-duration-in-seconds: 5   #服务断开时间
    instande-id: send-8801.com     #在信息列表显示主机名称
    prefer-ip-address: true        #访问路径变为IP地址

3、启动类:

@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

4、业务类之Service层:IMessageProvider和IMessageProviderImpl

@EnableBinding(Source.class)  //消息的推送管道(接收管道)
public class IMessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output;   //消息发送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*********serial:" + serial);
        return serial;
    }
}

5、业务类之Controller层:SendMessageController

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider provider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return provider.send();
    }
}

6、启动测试——看看是不是每调用一次,都会向RabbitMQ发送一条消息

首先,如果启动顺利,在exchange中就能看到我们新增的一个exchange:

55.jpg

然后我们多访问几次: http://localhost:8801/sendMessage

56.jpg

我们可以看到RabbitMQ的Overview界面出现了以上波形,说明有消息被推送到了RabbitMQ,消息生产者测试成功;


四、项目实战——消息接受者2个

1、创建消息接收者模块 cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803

pom.xml和8801一模一样:

2、配置文件 application.yml:

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2   #心跳时间间隔
    lease-expiration-duration-in-seconds: 5   #服务断开时间
    instande-id: receive-8802.com     #在信息列表显示主机名称
    prefer-ip-address: true        #访问路径变为IP地址

3、主启动类:

4、业务类之ReceiveMessageListenerController

@RestController
@EnableBinding(Sink.class)  //输出通道
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1号——————>接收到的消息为:" + message.getPayload()+ "\t port:" + serverPort);
    }
}

5、启动测试,先只启动一个8802即可:

测试的期望结果是,我们访问8801接口,会在8801的控制台打印出UUID,同时8802控制台也能打印出对应内容,且两者UUID相同,同时RabbitMQ的监视器能观察到消息的出现;

访问: http://localhost:8801/sendMessage

58.jpg

57.jpg

59.jpg

显然,测试成功,8802消息消费者通过RabbitMQ消费了生产者生产的消息!

五、Stream之消息重复消费

1、首先启动7001/8801/8802/8803服务

60.jpg

2、问题一:重复消费:

使用8801生产者发生2条消息,但是8802消费者消费了2条消息,8803也消费了2条消息;这就是所谓的重复消费问题:

生产案例,有如下场景:

订单系统我们做集群部署,都会从RabbitMQ中获取订单信息进行处理,那如果一个订单同时被两个服务获取到,那么必然造成数据错误,这样问题就严重了;

所以我们必须要处理重复消费的问题,这时我们就可以使用Stream中的消息分组来解决:

61.jpg

注意:在Stream中处于同一个Group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用只消费一次,不同组是可以全面消费的(即不同组是可以重复消费的——这点也是很有用的)

默认情况下,集群(8802和8803)是属于不同的分组的,即他们是可以全面消费(重复消费的);

62.jpg


六、Stream之Group解决消息重复消费问题

1、自定义分组,将8802和8803的分组设置为相同分组,配置文件增加 group:分组配置

bindings:
  input:
    destination: studyExchange
    content-type: application/json
    binder: defaultRabbit
    group: jgqGroup

2、重启,查看RabbitMQ中的分组:

63.jpg

只有我们自定义的分组jgqGroup了;

3、继续使用8801生产4条消息:  http://localhost:8801/sendMessage 

64.jpg

然后查看8802和8803的消费情况:

65.jpg

66.jpg

很显然,集群间以类似于负载均衡(轮询)的方式,在平均消费着消息;


七、Stream之消息持久化

1、我们先做一个测试:

停掉8802/8803服务并且去掉8802配置文件中的分组 group:jgqGroup,保留8803配置文件中的 group:jgqGroup

然后使用8801生产4条消息到RabbitMQ;

67.jpg

2、接着,我们启动服务8802,注意8802的分组group已经被我们去掉了;

我们发现,从头到尾,8802并没有去消费8801生产的那4条消息,他们还乖乖地在RabbitMQ中躺着呢;

68.jpg

3、然后我们再启动服务8803看看:

69.jpg

显然8803即使睡了一觉,但是由于它有配置分组信息,所以它重启后,依然会去将对应的Exchange中的消息进行消费处理;

由此,我们得到一个很重要的结论就是,group分组配置信息是一个非常重要的配置,它既可以解决重复消费问题,也可以解决消息持久化的问题!

到这里,消息驱动SpringCloud Stream就基本结束了!

个人此项目代码地址(持续更新):

https://github.com/jiguiquan/cloud2020

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐