RocketMQ高性能原理剖析和可靠性保证

一、RocketMQ的读写队列

首先,我们要知道:

当我们生产者将消息发送到RocketMQ的某个Topic时,Topic只是一个逻辑概念,实际上是会分为几个MessageQueue存放的;

当我们消费者从RocketMQ的某个Topic消费数据时,也是从MessageQueue进行消费的;

这里存放时用到几个队列,就是我们配置的“写队列数”;读取时用到几个队列就是我们配置的“读队列数”

1、直观感受下RocketMQ的读写队列:

当我们在配置RocketMQ的Broker的参数时,有一个默认值:

defaultTopicQueueNums=4

很显然,这是为自动创建的 Topic 设置了队列数;

当然我们也可以手动修改:

1696901297789103.png

我们需要一个最直观的感受,队列数到底是什么?

1696901770841819.png

原来,队列数决定了,我们一个Topic(逻辑存储单元)在真正存储时,使用了几个MessageQueue!

对应的,在存储位置上,我可以看到存储了几个目录:

[root@rocketmq2 store]# pwd
/data/rocketmq/store

# 这里的4个目录,也就是对应的4个MessageQueue
[root@rocketmq2 store]# ls consumequeue/TestTopic/
0  1  2  3

2、读写队列数量配置不一致时的影响:

我们最理想的情况,肯定就是配置 writeQueueNums = readQueueNums,因为:

当 writeQueueNums > readQueueNums 时:有一部分被写入数据的队列,不能被分配个消费者,导致消息丢失!

当 writeQueueNums < readQueueNums 时:有一部分消费者,被分配的队列,其实是一个空队列,导致消费者资源浪费!

3、读写队列数量不一致的使用场景:

主要的使用场景是用于队列扩缩容,以队列缩容举个栗子:

某个Topic当前读写队列数都为128,现在需要将队列缩容到64个,怎么做才能100%不会丢失消息,并且无需重启应用程序?

最佳实践:先缩容写队列128->64,写队列由0 1 2 ……127缩至 0 1 2 ……..63。等到64 65 66……127中的消息全部消费完后,再缩容读队列128->64;

(同时缩容写队列和读队列可能会导致部分消息未被消费)。

  • 缩容场景:先缩“写队列”,再缩“读队列”,防止消息丢失;

  • 扩容场景:先扩“写队列”,再扩“读队列”,防止资源浪费,但是反向,问题也不大。


二、消息数据的持久化(重点)

直观感受下数据存储目录结构:

[root@rocketmq2 store]# pwd
/data/rocketmq/store
[root@rocketmq2 store]# ll
total 8
drwxr-xr-x.  2 root root   34 commitlog       # 消息写入时的第一站
drwxr-xr-x. 11 root root  232 consumequeue    # 消息消费队列存储目录
drwxr-xr-x.  2 root root   31 index           # 消息索引文件存储目录
drwxr-xr-x.  2 root root  280 config          # 运行期间一些配置信息
-rw-r--r--.  1 root root    0 abort           # 如果存在该文件说明Broker非正常关闭
-rw-r--r--.  1 root root    4 lock            # 表明该目录已被使用,防止同一个目录被多个Broker使用
-rw-r--r--.  1 root root 4096 checkpoint      # 文件检查点,记录CommitLog文件/ConsumeQueue文件/index索引文件最后一次刷盘时间戳

深入理解下面这张图:

1696903016941275.png

1、CommitLog文件:

CommitLog文件存储所有消息实体,所有生产者发过来的消息,都会无差别的依次存储到Commitlog文件当中。

这样的好处是可以减少查找目标文件的时间,让消息以最快的速度落盘。

对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入,当Topic比较多时,这样的Partition寻址就会浪费比较多的时间,所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式在多Topic场景下,优势就比较明显。

CommitLog的文件结构:CommitLog的文件大小是固定的,但是其中存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
    int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
    int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + bornhostLength //BORNHOST
        + 8 //STORETIMESTAMP
        + storehostAddressLength //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}

正因为消息的记录大小不固定,所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件。文件名为当前消息的偏移量。

随着数据量的增加,必然会存在多个CommitLog文件,但是同一个时刻,只有文件名最大的CommitLog文件才允许被写入!

2、ConsumeQueue文件 + config/consumerOffset.json文件:

ConsumeQueue文件主要是加速消费者的消息索引,它是以Topic ——> MessageQueue 进行管理的,每个文件夹对应RocketMQ中的一个MessageQueue;

文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。

[root@rocketmq2 store]# cd consumequeue/
[root@rocketmq2 consumequeue]# ll
total 0
drwxr-xr-x. 6 root root 42 Oct  8 06:16 OrderTopicTest
drwxr-xr-x. 3 root root 15 Oct  8 07:06 RMQ_SYS_TRANS_HALF_TOPIC
drwxr-xr-x. 3 root root 15 Oct  8 07:06 RMQ_SYS_TRANS_OP_HALF_TOPIC
drwxr-xr-x. 3 root root 15 Oct  8 05:44 SCHEDULE_TOPIC_XXXX
drwxr-xr-x. 6 root root 42 Oct  8 06:49 SqlFilterTest
drwxr-xr-x. 6 root root 42 Oct  8 06:45 TagFilterTest
drwxr-xr-x. 6 root root 42 Oct  9 03:29 TestTopic
drwxr-xr-x. 6 root root 42 Oct  7 14:27 TopicTest
drwxr-xr-x. 3 root root 15 Oct  9 03:44 TRANS_CHECK_MAX_TIME_TOPIC

# 注意,每一个consumerqueue文件,都是固定的
[root@rocketmq2 consumequeue]# cd TestTopic/
[root@rocketmq2 TestTopic]# ll 0
total 4
-rw-r--r--. 1 root root 6000000 Oct  9 04:26 00000000000000000000
[root@rocketmq2 TestTopic]# ll 1
total 4
-rw-r--r--. 1 root root 6000000 Oct  9 04:26 00000000000000000000
[root@rocketmq2 TestTopic]# ll 2
total 4
-rw-r--r--. 1 root root 6000000 Oct  9 04:26 00000000000000000000
[root@rocketmq2 TestTopic]# ll 3
total 4
-rw-r--r--. 1 root root 6000000 Oct  9 04:26 00000000000000000000

每个消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中

(记录着每个Topic下,每个消费者组ConsumerGroup,在每个MessageQueue上的消费进度)

[root@rocketmq2 store]# cd config/
[root@rocketmq2 config]# cat consumerOffset.json
{
	"offsetTable":{
		"OrderTopicTest@please_rename_unique_group_name_3":{0:12,1:12,2:6,3:6
		},
		"%RETRY%please_rename_unique_group_name@please_rename_unique_group_name":{0:0
		},
		"TopicTest@please_rename_unique_group_name_4":{0:351,1:350,2:351,3:352
		},
		"%RETRY%MyConsumerGroup@MyConsumerGroup":{0:0
		},
		"RMQ_SYS_TRANS_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:145
		},
		"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
		},
		"RMQ_SYS_TRANS_OP_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:29
		},
		"%RETRY%please_rename_unique_group_name_3@please_rename_unique_group_name_3":{0:0
		},
		"SqlFilterTest@please_rename_unique_group_name":{0:0,1:0,2:0,3:0
		},
		"TestTopic@MyConsumerGroup":{0:10,1:12,2:10,3:9
		},
		"TagFilterTest@please_rename_unique_group_name":{0:2,1:2,2:2,3:2
		}
	}
}

ConsumeQueue文件结构:每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成(正好就是600w字节),数据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。

在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小

3、IndexFile文件:

IndexFile文件主要是辅助消息检索的,如果消费者只是按照offset正常进行消息消费,通过ConsumeQueue文件就足够完成消息检索了;

但是如果要按照MeessageId或者MessageKey来检索文件,比如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。

但是其实,它也是一个固定大小的文件。

IndexFile文件结构:他的文件结构由 indexHeader(固定40byte)+ slot(固定500W个,每个固定20byte) + index(最多500W*4个,每个固定20byte) 三个部分组成。

4、abort文件:

异常终止标识:如果Broker程序正常停止,那么就会删除掉此abort文件!

5、lock文件:

目录锁定标识:一个Broker程序启动后,会创建此lock文件,正常停止后,会删除此文件;其它Broker在启动时,如果发现此lock文件,就会启动失败,防止多个Broker公用一个目录

6、checkpoint文件:

记录CommitLog文件/ConsumeQueue文件/index索引文件最后一次刷盘时间戳,便于快速判断是否被更新过,如是否有新消息到达等;


三、过期文件删除机制

有两个重要配置:

#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120

1、如何判断过期文件:

RocketMQ中,CommitLog文件和ConsumeQueue文件都是以偏移量命名,对于非当前写的文件,如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除。

这个保留时间就是在broker.conf中配置的fileReservedTime属性。

注意,RocketMQ判断文件是否过期的唯一标准就是非当前写文件的保留时间,而并不关心文件当中的消息是否被消费过。所以,RocketMQ的消息堆积也是有时间限度的,默认48小时。

2、何时删除过期文件:

RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作,用户可以指定文件删除操作的执行时间。在broker.conf中deleteWhen属性指定,默认就是凌晨四点

另外,RocketMQ还会检查服务器的磁盘空间是否足够,如果磁盘空间的使用率达到一定的阈值,也会触发过期文件删除。所以RocketMQ官方就特别建议,broker的磁盘空间不要少于4G


四、高效的文件写效率

1、顺序写(不同于kafka):

  • 随机写:由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能;

  • 顺序写:是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。

顺序写的性能基本能够达到内存级别。而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存。

而RocketMQ很大程度上借鉴了Kafka的这种思想,但是实现方式却又不相同:

  • kafka:每个topic下有多个partition分区,每个partition又分了很多segment(分区分段),当topic或者partition过多时,存在大量寻址操作,顺序写降级为随机写,效率会大大降低

  • RocketMQ:同一时间,只有一个commitLog文件允许写入,所有的生产者的消息都写入者一个commitLog文件中,效率非常高;

(写入commitLog文件时,会加锁写入,防止冲突,但是写入效率高,很快就能释放锁,会有另外的线程在扫描commitLog文件是否发生过改变,如果改变说明有新消息,此时会分发任务到:ConsumerQueue文件处理 + IndexFile文件处理)

# 分发任务:
dispatcher.dispatch(req)

// ConsumeQueue文件分发的构建器
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}
// IndexFile文件分发的构建器
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

2、零拷贝技术

3、异步刷盘(默认)

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐