RocketMQ基础+集群搭建

一、准备工作

1、集群规划:

机器名 ip nameserver部署节点
broker部署节点
rocketmq1 192.168.56.11 nameserver
rocketmq2 192.168.56.12 nameserver broker-a,broker-b-s
rocketmq3 192.168.56.13 nameserver broker-b,broker-a-s

2、配置host:

# vim /etc/hosts

192.168.56.11 rocketmq1
192.168.56.12 rocketmq2
192.168.56.13 rocketmq3

3、配置rocketmq1机器对另外2台机器的免密登录:

yum install openssh # 很多机器可以已经安装

# 生成公私钥
ssh-keygen

# 将公钥拷贝到其他机器
ssh-copy-id rocketmq1
ssh-copy-id rocketmq2
ssh-copy-id rocketmq3

4、关闭防火墙:

systemctl stop firewalld
systemctl status firewalld

5、完成jdk环境的安装:

略!


二、RocketMQ的下载与集群部署

RocketMQ的下载地址:我本次选择的版本为:4.9.1

1、将下载好的RocketMQ压缩包解压到 /usr/local 目录下:

3台机器同时执行:

[root@rocketmq1 ~]# unzip rocketmq-all-4.9.1-bin-release.zip

[root@rocketmq1 ~]# mv rocketmq-all-4.9.1-bin-release /usr/local/rocketmq4.9.1

[root@rocketmq1 ~]# cd /usr/local/rocketmq4.9.1/

[root@rocketmq1 rocketmq4.9.1]# ls
benchmark  bin  conf  lib  LICENSE  NOTICE  README.md

2、添加rocketmq的环境变量:

# vim /etc/profile.d/myenv.sh

export JAVA_HOME=/usr/local/jdk8
export ROCKETMQ_HOME=/usr/local/rocketmq4.9.1
export PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/jre/lib

3、rocketmq集群的核心配置文件都在 conf/ 目录下:

[root@rocketmq1 rocketmq4.9.1]# cd conf/

# 官方很人性地提供了常用场景的示例配置
## 显然,我门应该选择 2m-2s-async “双主-双从-异步刷盘”的配置
[root@rocketmq1 conf]# ls
2m-2s-async  2m-2s-sync  2m-noslave  broker.conf  dledger  logback_broker.xml  logback_namesrv.xml  logback_tools.xml  plain_acl.yml  tools.yml
  • 2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),

  • 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),

  • 2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。

  • 而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

4、分别在rocketmq2、rocketmq3上配置2主2从的broker配置:

2m-2s-async/broker-a.properties(rocketmq2机器):

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876
#对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址
brokerIP1=192.168.56.12
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/store
#commitLog存储路径
storePathCommitLog=/data/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/consumequeue 
#消息索引存储路径
storePathIndex=/data/rocketmq/store/index
#checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort文件存储路径
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2m-2s-async/broker-a-s.properties(rocketmq3机器):

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876
#对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址
brokerIP1=192.168.56.13
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/storeSlave
#commitLog存储路径
storePathCommitLog=/data/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/storeSlave/consumequeue 
#消息索引存储路径
storePathIndex=/data/rocketmq/storeSlave/index
#checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/storeSlave/checkpoint
#abort文件存储路径
abortFile=/data/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=SLAVE
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2m-2s-async/broker-b.properties(rocketmq3机器):

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876
#对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址
brokerIP1=192.168.56.13
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/store
#commitLog存储路径
storePathCommitLog=/data/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/consumequeue 
#消息索引存储路径
storePathIndex=/data/rocketmq/store/index
#checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort文件存储路径
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false #发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2m-2s-async/broker-b-s.properties(rocketmq2机器):

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876
#对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址
brokerIP1=192.168.56.12
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/storeSlave
#commitLog存储路径
storePathCommitLog=/data/rocketmq/storeSlave/commitlog 
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/storeSlave/consumequeue 
#消息索引存储路径
storePathIndex=/data/rocketmq/storeSlave/index
#checkpoint文件存储路径
storeCheckpoint=/data/rocketmq/storeSlave/checkpoint
#abort文件存储路径
abortFile=/data/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#-ASYNC_MASTER异步复制Master
#-SYNC_MASTER同步双写Master
#-SLAVE
brokerRole=SLAVE
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

5、创建对应的文件保存目录:

# 创建集群节点的日志相关文件路径
## broker-a主节点
mkdir -p /data/rocketmq/store/commitlog
mkdir -p /data/rocketmq/store/consumequeue
mkdir -p /data/rocketmq/store/index

## broker-a-s从节点
mkdir -p /data/rocketmq/storeSlave/commitlog
mkdir -p /data/rocketmq/storeSlave/consumequeue
mkdir -p /data/rocketmq/storeSlave/index


## broker-b主节点
mkdir -p /data/rocketmq/store/commitlog
mkdir -p /data/rocketmq/store/consumequeue
mkdir -p /data/rocketmq/store/index

##broker-b-s从节点
mkdir -p /data/rocketmq/storeSlave/commitlog
mkdir -p /data/rocketmq/storeSlave/consumequeue
mkdir -p /data/rocketmq/storeSlave/index

6、启动3台nameserver服务:

[root@rocketmq1 rocketmq4.9.1]# nohup bin/mqnamesrv &
[1] 3932

[root@rocketmq1 rocketmq4.9.1]# jps
3948 NamesrvStartup
24734 Jps

7、分别启动broker-a、broker-b、broker-a-s、broker-b-s四个broker服务:

[root@rocketmq2 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-a.properties &

[root@rocketmq3 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-b.properties &

[root@rocketmq3 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &

[root@rocketmq2 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &

8、查看节点运行情况:

[root@rocketmq1 rocketmq4.9.1]# bin/mqadmin clusterList -n rocketmq1:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
rocketmq-cluster  broker-a                0     192.168.56.12:10911    V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 471302.31 0.0959
rocketmq-cluster  broker-a                1     192.168.56.13:11011    V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 471302.31 0.0959
rocketmq-cluster  broker-b                0     192.168.56.13:10911    V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 471302.31 0.0959
rocketmq-cluster  broker-b                1     192.168.56.12:11011    V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 471302.31 0.0959

9、查看日志:

[root@rocketmq2 rocketmq4.9.1]# ls ~/logs/rocketmqlogs/
broker_default.log  commercial.log  lock.log             namesrv.log     remoting.log         stats.log       store.log          tools.log        watermark.log
broker.log          filter.log      namesrv_default.log  protection.log  rocketmq_client.log  storeerror.log  tools_default.log  transaction.log

[root@rocketmq2 rocketmq4.9.1]# tail -f -n 100 ~/logs/rocketmqlogs/namesrv.log

[root@rocketmq2 rocketmq4.9.1]# tail -f -n 100 ~/logs/rocketmqlogs/broker.log

10、使用官方提供的tools工具验证集群功能:

为了让tools正常运行,我门需要创建一个环境变量(因为在2个类中,都没有指定nameserver地址):

export NAMESRV_ADDR='rocketmq1:9876;rocketmq2:9876;rocketmq3:9876'

然后,在任意节点运行tools中的生产者程序:

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

再在另一个节点中运行tools中的消费者程序:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

测试正常,集群运行正常!


三、为RocketMQ集群搭建管理控制台

1、我们将使用社区开源的rocketmq-dashboard:

地址:https://github.com/apache/rocketmq-dashboard

我门需要将代码下载到任意位置;

2、使用maven对代码进行编译打包:

mvn clean package -DskipTests

打包成功后,我门将得到一个jar包: rocketmq-dashboard-1.0.1-SNAPSHOT.jar 

3、将此jar包拷贝到任意位置,并在同级创建配置文件:

[root@rocketmq1 rocketmq-dashboard]# ls
application.yaml  rocketmq-dashboard-1.0.1-SNAPSHOT.jar

# application.yaml配置文件内容如下:
rocketmq:
  config:
    namesrvAddrs:
    - rocketmq1:9876
    - rocketmq2:9876
    - rocketmq3:9876

4、后台运行此jar包程序:

[root@rocketmq1 rocketmq-dashboard]# nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar >rocketmq-dashboard.log 2>&1 &

5、此时通过 ip:8080 即可访问此 dashboard 界面:

1696691105585669.png


四、RocketMQ的事务消息介绍与使用场景

RocketMQ与Kafka、RabbitMQ等中间件的对比,最大的一个特点就是支持了事务消息,这个功能是其他消息中间件不具备的!

1、事务消息的定义:

  • 事务消息是指:在分布式系统中保证最终一致性的两阶段提交的消息实现,他可以保证本地事务执行消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

  • 事务消息只保证:消息发送者的本地事务发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别;

简单一点:本地事务成功,才会将消息推送给消费者,本地事务失败,消息将被丢弃(进入死信队列)。

2、事务消息的实现核心:

事务消息的关键是在 TransactionMQProducer 中指定了一个 TransactionListener 事务监听器,这个事务监听器就是事务消息的关键控制器。

package org.apache.rocketmq.example.transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionListenerImpl implements TransactionListener {
    // 在提交完事务消息后执行。
    // 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
    // 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
    // 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg,
                                                         Object arg) {
        String tags = msg.getTags();
        //TagA的消息会立即被消费者消费到
        if (StringUtils.contains(tags, "TagA")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagB的消息会被丢弃
        } else if (StringUtils.contains(tags, "TagB")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //其他消息会等待Broker进行事务状态回查。
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

    //在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tags = msg.getTags();
        //TagC的消息过一段时间会被消费者消费到
        if (StringUtils.contains(tags, "TagC")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagD的消息也会在状态回查时被丢弃掉
        } else if (StringUtils.contains(tags, "TagD")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //剩下TagE的消息会在多次状态回查后最终丢弃
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }
}

3、事务消息的使用有哪些限制?

  • 事务消息不支持 延迟消息 和 批量消息

  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次

    • 但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制;

    • 如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志;

    • 用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

  • 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查;

    • 当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionMsgTimeout 参数。

  • 事务性消息可能不止一次被检查或消费;

  • 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定;

    • 它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制;

  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享

    • 与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 反向查询到生产者

4、图解事务消息的流程(重点):

1696752250504996.png

  • 第1、2步,发送 half 消息并得到返回的目的,是为了确保 Broker 可用;

  • Broker 在接收到 half 消息时,不会立即对这些消息进行处理(发送给消费者 or 丢弃);

  • half 消息成功发送到 Broker 后,本地正常执行事务操作:TransactionListener#executeLocalTransaction() 方法中,该方法中可以执行业务操作,最终给 Broker 返回3种结果:

    • COMMIT_MESSAGE:事务正常提交,消息可推送给消费者;

    • ROLLBACK_MESSAGE:事务回滚,消息丢弃,发送到死信队列;

    • UNKNOW:是无状态未知,暂时还没有完成,之后Broker将会定期找生产者询问事务状态:TransactionListener#checkLocalTransaction();

  • 后期,当 TransactionListener#checkLocalTransaction() 给与Broker明确答复时,Broker决定后续操作,如果还是UNKNOW,那么继续定期调用此方法,几个重要的参数:

    • 默认Broker先等待6秒后(transactionTimeOut,也可以主动设置属性:CHECK_IMMUNITY_TIME_IN_SECONDS),开始主动检查事务状态;

    • 默认检查的时间间隔 60秒(transactionCheckInterval)

    • 默认最多询问 15次(transactionCheckMax)

  • 事务消息在commit最前(half消息),对消费者是完全不可见的!

5、事务消息的使用场景(有助于深入理解此功能)

比如有如下场景(简化):用户下了个订单,需要等待5min之内完成支付,正常支付,则安排后期的物流等业务,如果超时不支付,我们就进行释放库存等操作!

  • 1. 我们在用户下单后,我们往Broker中发送一个事务消息;

  • 2. 然后我们在 TransactionListener#checkLocalTransaction() 中去查询此订单的支付情况;

  • 3. 如果成功支付,则返回 COMMIT_MESSAGE,Broker就会将订单消息发送给下游物流等服务;

  • 4. 如果用户取消支付,则返回 ROLLBACK_MESSAGE,Broker将会将订单丢弃到死信队列,释放库存的服务将消费此数据释放库存;

  • 5. 如果用户支付超时,则执行和 4 中一样的后续流程;

此方案用其他中间件会比较麻烦,但是用RocketMQ的事务消息就变得非常得简单!

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐