一、准备工作
1、集群规划:
机器名 | ip | nameserver部署节点 |
broker部署节点 |
rocketmq1 | 192.168.56.11 | nameserver | |
rocketmq2 | 192.168.56.12 | nameserver | broker-a,broker-b-s |
rocketmq3 | 192.168.56.13 | nameserver | broker-b,broker-a-s |
2、配置host:
# vim /etc/hosts 192.168.56.11 rocketmq1 192.168.56.12 rocketmq2 192.168.56.13 rocketmq3
3、配置rocketmq1机器对另外2台机器的免密登录:
yum install openssh # 很多机器可以已经安装 # 生成公私钥 ssh-keygen # 将公钥拷贝到其他机器 ssh-copy-id rocketmq1 ssh-copy-id rocketmq2 ssh-copy-id rocketmq3
4、关闭防火墙:
systemctl stop firewalld systemctl status firewalld
5、完成jdk环境的安装:
略!
二、RocketMQ的下载与集群部署
RocketMQ的下载地址:我本次选择的版本为:4.9.1
1、将下载好的RocketMQ压缩包解压到 /usr/local 目录下:
3台机器同时执行:
[root@rocketmq1 ~]# unzip rocketmq-all-4.9.1-bin-release.zip [root@rocketmq1 ~]# mv rocketmq-all-4.9.1-bin-release /usr/local/rocketmq4.9.1 [root@rocketmq1 ~]# cd /usr/local/rocketmq4.9.1/ [root@rocketmq1 rocketmq4.9.1]# ls benchmark bin conf lib LICENSE NOTICE README.md
2、添加rocketmq的环境变量:
# vim /etc/profile.d/myenv.sh export JAVA_HOME=/usr/local/jdk8 export ROCKETMQ_HOME=/usr/local/rocketmq4.9.1 export PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/jre/lib
3、rocketmq集群的核心配置文件都在 conf/ 目录下:
[root@rocketmq1 rocketmq4.9.1]# cd conf/ # 官方很人性地提供了常用场景的示例配置 ## 显然,我门应该选择 2m-2s-async “双主-双从-异步刷盘”的配置 [root@rocketmq1 conf]# ls 2m-2s-async 2m-2s-sync 2m-noslave broker.conf dledger logback_broker.xml logback_namesrv.xml logback_tools.xml plain_acl.yml tools.yml
2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),
2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),
2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。
而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。
4、分别在rocketmq2、rocketmq3上配置2主2从的broker配置:
2m-2s-async/broker-a.properties(rocketmq2机器):
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a #brokerid,0就表示是Master,>0的都是表示Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876 #对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址 brokerIP1=192.168.56.12 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4 #是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨4点 deleteWhen=04 #文件保留时间,默认48小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store #commitLog存储路径 storePathCommitLog=/data/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store/index #checkpoint文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint #abort文件存储路径 abortFile=/data/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker的角色 #-ASYNC_MASTER异步复制Master #-SYNC_MASTER同步双写Master #-SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #-ASYNC_FLUSH异步刷盘 #-SYNC_FLUSH同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
2m-2s-async/broker-a-s.properties(rocketmq3机器):
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a #brokerid,0就表示是Master,>0的都是表示Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876 #对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址 brokerIP1=192.168.56.13 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4 #是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨4点 deleteWhen=04 #文件保留时间,默认48小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/storeSlave #commitLog存储路径 storePathCommitLog=/data/rocketmq/storeSlave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/storeSlave/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/storeSlave/index #checkpoint文件存储路径 storeCheckpoint=/data/rocketmq/storeSlave/checkpoint #abort文件存储路径 abortFile=/data/rocketmq/storeSlave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker的角色 #-ASYNC_MASTER异步复制Master #-SYNC_MASTER同步双写Master #-SLAVE brokerRole=SLAVE #刷盘方式 #-ASYNC_FLUSH异步刷盘 #-SYNC_FLUSH同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
2m-2s-async/broker-b.properties(rocketmq3机器):
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b #brokerid,0就表示是Master,>0的都是表示Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876 #对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址 brokerIP1=192.168.56.13 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨4点 deleteWhen=04 #文件保留时间,默认48小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store #commitLog存储路径 storePathCommitLog=/data/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store/index #checkpoint文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint #abort文件存储路径 abortFile=/data/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker的角色 #-ASYNC_MASTER异步复制Master #-SYNC_MASTER同步双写Master #-SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #-ASYNC_FLUSH异步刷盘 #-SYNC_FLUSH同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
2m-2s-async/broker-b-s.properties(rocketmq2机器):
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b #brokerid,0就表示是Master,>0的都是表示Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=rocketmq1:9876;rocketmq2:9876;rocketmq3:9876 #对于多网卡的机器,需要增加brokerIP1属性,指定所在机器的外网网卡地址 brokerIP1=192.168.56.12 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨4点 deleteWhen=04 #文件保留时间,默认48小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/storeSlave #commitLog存储路径 storePathCommitLog=/data/rocketmq/storeSlave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/storeSlave/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/storeSlave/index #checkpoint文件存储路径 storeCheckpoint=/data/rocketmq/storeSlave/checkpoint #abort文件存储路径 abortFile=/data/rocketmq/storeSlave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker的角色 #-ASYNC_MASTER异步复制Master #-SYNC_MASTER同步双写Master #-SLAVE brokerRole=SLAVE #刷盘方式 #-ASYNC_FLUSH异步刷盘 #-SYNC_FLUSH同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
5、创建对应的文件保存目录:
# 创建集群节点的日志相关文件路径 ## broker-a主节点 mkdir -p /data/rocketmq/store/commitlog mkdir -p /data/rocketmq/store/consumequeue mkdir -p /data/rocketmq/store/index ## broker-a-s从节点 mkdir -p /data/rocketmq/storeSlave/commitlog mkdir -p /data/rocketmq/storeSlave/consumequeue mkdir -p /data/rocketmq/storeSlave/index ## broker-b主节点 mkdir -p /data/rocketmq/store/commitlog mkdir -p /data/rocketmq/store/consumequeue mkdir -p /data/rocketmq/store/index ##broker-b-s从节点 mkdir -p /data/rocketmq/storeSlave/commitlog mkdir -p /data/rocketmq/storeSlave/consumequeue mkdir -p /data/rocketmq/storeSlave/index
6、启动3台nameserver服务:
[root@rocketmq1 rocketmq4.9.1]# nohup bin/mqnamesrv & [1] 3932 [root@rocketmq1 rocketmq4.9.1]# jps 3948 NamesrvStartup 24734 Jps
7、分别启动broker-a、broker-b、broker-a-s、broker-b-s四个broker服务:
[root@rocketmq2 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-a.properties & [root@rocketmq3 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-b.properties & [root@rocketmq3 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties & [root@rocketmq2 rocketmq4.9.1]# nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
8、查看节点运行情况:
[root@rocketmq1 rocketmq4.9.1]# bin/mqadmin clusterList -n rocketmq1:9876 RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap). RocketMQLog:WARN Please initialize the logger system properly. #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE rocketmq-cluster broker-a 0 192.168.56.12:10911 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 471302.31 0.0959 rocketmq-cluster broker-a 1 192.168.56.13:11011 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 471302.31 0.0959 rocketmq-cluster broker-b 0 192.168.56.13:10911 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 471302.31 0.0959 rocketmq-cluster broker-b 1 192.168.56.12:11011 V4_9_1 0.00(0,0ms) 0.00(0,0ms) 0 471302.31 0.0959
9、查看日志:
[root@rocketmq2 rocketmq4.9.1]# ls ~/logs/rocketmqlogs/ broker_default.log commercial.log lock.log namesrv.log remoting.log stats.log store.log tools.log watermark.log broker.log filter.log namesrv_default.log protection.log rocketmq_client.log storeerror.log tools_default.log transaction.log [root@rocketmq2 rocketmq4.9.1]# tail -f -n 100 ~/logs/rocketmqlogs/namesrv.log [root@rocketmq2 rocketmq4.9.1]# tail -f -n 100 ~/logs/rocketmqlogs/broker.log
10、使用官方提供的tools工具验证集群功能:
为了让tools正常运行,我门需要创建一个环境变量(因为在2个类中,都没有指定nameserver地址):
export NAMESRV_ADDR='rocketmq1:9876;rocketmq2:9876;rocketmq3:9876'
然后,在任意节点运行tools中的生产者程序:
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
再在另一个节点中运行tools中的消费者程序:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
测试正常,集群运行正常!
三、为RocketMQ集群搭建管理控制台
1、我们将使用社区开源的rocketmq-dashboard:
地址:https://github.com/apache/rocketmq-dashboard
我门需要将代码下载到任意位置;
2、使用maven对代码进行编译打包:
mvn clean package -DskipTests
打包成功后,我门将得到一个jar包: rocketmq-dashboard-1.0.1-SNAPSHOT.jar
3、将此jar包拷贝到任意位置,并在同级创建配置文件:
[root@rocketmq1 rocketmq-dashboard]# ls application.yaml rocketmq-dashboard-1.0.1-SNAPSHOT.jar # application.yaml配置文件内容如下: rocketmq: config: namesrvAddrs: - rocketmq1:9876 - rocketmq2:9876 - rocketmq3:9876
4、后台运行此jar包程序:
[root@rocketmq1 rocketmq-dashboard]# nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar >rocketmq-dashboard.log 2>&1 &
5、此时通过 ip:8080 即可访问此 dashboard 界面:
四、RocketMQ的事务消息介绍与使用场景
RocketMQ与Kafka、RabbitMQ等中间件的对比,最大的一个特点就是支持了事务消息,这个功能是其他消息中间件不具备的!
1、事务消息的定义:
-
事务消息是指:在分布式系统中保证最终一致性的两阶段提交的消息实现,他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。
-
事务消息只保证:消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别;
简单一点:本地事务成功,才会将消息推送给消费者,本地事务失败,消息将被丢弃(进入死信队列)。
2、事务消息的实现核心:
事务消息的关键是在 TransactionMQProducer 中指定了一个 TransactionListener 事务监听器,这个事务监听器就是事务消息的关键控制器。
package org.apache.rocketmq.example.transaction; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class TransactionListenerImpl implements TransactionListener { // 在提交完事务消息后执行。 // 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。 // 返回ROLLBACK_MESSAGE状态的消息会被丢弃。 // 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String tags = msg.getTags(); //TagA的消息会立即被消费者消费到 if (StringUtils.contains(tags, "TagA")) { return LocalTransactionState.COMMIT_MESSAGE; //TagB的消息会被丢弃 } else if (StringUtils.contains(tags, "TagB")) { return LocalTransactionState.ROLLBACK_MESSAGE; //其他消息会等待Broker进行事务状态回查。 } else { return LocalTransactionState.UNKNOW; } } //在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String tags = msg.getTags(); //TagC的消息过一段时间会被消费者消费到 if (StringUtils.contains(tags, "TagC")) { return LocalTransactionState.COMMIT_MESSAGE; //TagD的消息也会在状态回查时被丢弃掉 } else if (StringUtils.contains(tags, "TagD")) { return LocalTransactionState.ROLLBACK_MESSAGE; //剩下TagE的消息会在多次状态回查后最终丢弃 } else { return LocalTransactionState.UNKNOW; } } }
3、事务消息的使用有哪些限制?
-
事务消息不支持 延迟消息 和 批量消息;
-
为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次;
-
但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制;
-
如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志;
-
用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
-
事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查;
-
当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionMsgTimeout 参数。
-
事务性消息可能不止一次被检查或消费;
-
提交给用户的目标主题消息可能会失败,目前这依日志的记录而定;
-
它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制;
-
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享;
-
与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 反向查询到生产者。
4、图解事务消息的流程(重点):
-
第1、2步,发送 half 消息并得到返回的目的,是为了确保 Broker 可用;
-
Broker 在接收到 half 消息时,不会立即对这些消息进行处理(发送给消费者 or 丢弃);
-
half 消息成功发送到 Broker 后,本地正常执行事务操作:TransactionListener#executeLocalTransaction() 方法中,该方法中可以执行业务操作,最终给 Broker 返回3种结果:
-
COMMIT_MESSAGE:事务正常提交,消息可推送给消费者;
-
ROLLBACK_MESSAGE:事务回滚,消息丢弃,发送到死信队列;
-
UNKNOW:是无状态未知,暂时还没有完成,之后Broker将会定期找生产者询问事务状态:TransactionListener#checkLocalTransaction();
-
后期,当 TransactionListener#checkLocalTransaction() 给与Broker明确答复时,Broker决定后续操作,如果还是UNKNOW,那么继续定期调用此方法,几个重要的参数:
-
默认Broker先等待6秒后(transactionTimeOut,也可以主动设置属性:CHECK_IMMUNITY_TIME_IN_SECONDS),开始主动检查事务状态;
-
默认检查的时间间隔 60秒(transactionCheckInterval)
-
默认最多询问 15次(transactionCheckMax)
-
事务消息在commit最前(half消息),对消费者是完全不可见的!
5、事务消息的使用场景(有助于深入理解此功能)
比如有如下场景(简化):用户下了个订单,需要等待5min之内完成支付,正常支付,则安排后期的物流等业务,如果超时不支付,我们就进行释放库存等操作!
-
1. 我们在用户下单后,我们往Broker中发送一个事务消息;
-
2. 然后我们在 TransactionListener#checkLocalTransaction() 中去查询此订单的支付情况;
-
3. 如果成功支付,则返回 COMMIT_MESSAGE,Broker就会将订单消息发送给下游物流等服务;
-
4. 如果用户取消支付,则返回 ROLLBACK_MESSAGE,Broker将会将订单丢弃到死信队列,释放库存的服务将消费此数据释放库存;
-
5. 如果用户支付超时,则执行和 4 中一样的后续流程;
此方案用其他中间件会比较麻烦,但是用RocketMQ的事务消息就变得非常得简单!