源码分析 RocketMQ DLedger 多副本存储实现

网友投稿 695 2022-05-28

本节目录

1、DLedger 存储相关类图

1.1 DLedgerStore

1.2 DLedgerMemoryStore

1.3 DLedgerMmapFileStore

2、DLedger 存储 对标 RocketMQ 存储

3、DLedger 数据存储格式

4、DLedger 索引存储格式

5、思考

其文件组成形式如下:

正如上图所示,多个 commitlog 文件组成一个逻辑上的连续文件,使用 MappedFileQueue 表示,单个 commitlog 文件使用 MappedFile 表示。

温馨提示:如果想详细了解 RocketMQ 关于存储部分的讲解,可以关注笔者的《RocketMQ 技术内幕》一书。

1、DLedger 存储相关类图

1.1 DLedgerStore

存储抽象类,定义如下核心方法:

public abstract DLedgerEntry appendAsLeader(DLedgerEntry entry)

向主节点追加日志(数据)。

public abstract DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId)

向从节点同步日志。

public abstract DLedgerEntry get(Long index)

根据日志下标查找日志。

public abstract long getCommittedIndex()

获取已提交的下标。

public abstract long getLedgerEndTerm()

获取 Leader 当前最大的投票轮次。

public abstract long getLedgerEndIndex()

获取 Leader 下一条日志写入的下标(最新日志的下标)。

public abstract long getLedgerBeginIndex()

获取 Leader 第一条消息的下标。

public void updateCommittedIndex(long term, long committedIndex)

更新commitedIndex的值,为空实现,由具体的存储子类实现。

protected void updateLedgerEndIndexAndTerm()

更新 Leader 维护的 ledgerEndIndex 和 ledgerEndTerm 。

public void flush()

刷写,空方法,由具体子类实现。

public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId)

删除日志,空方法,由具体子类实现。

public void startup()

启动存储管理器,空方法,由具体子类实现。

public void shutdown()

关闭存储管理器,空方法,由具体子类实现。

1.2 DLedgerMemoryStore

Dledger 基于内存实现的日志存储。

1.3 DLedgerMmapFileStore

基于文件内存映射机制的存储实现。其核心属性如下:

long ledgerBeginIndex = -1

日志的起始索引,默认为 -1。

l- ong ledgerEndIndex = -1

下一条日志下标,默认为 -1。

long committedIndex = -1

已提交的日志索引。

long ledgerEndTerm

当前最大的投票轮次。

DLedgerConfig dLedgerConfig

DLedger 的配置信息。

MemberState memberState

状态机。

MmapFileList dataFileList

日志文件(数据文件)的内存映射Queue。

MmapFileList indexFileList

源码分析 RocketMQ DLedger 多副本存储实现

索引文件的内存映射文件集合。(可对标 RocketMQ MappedFIleQueue )。

ThreadLocal< ByteBuffer> localIndexBuffer

本地线程变量,用来缓存索引ByteBuffer。

ThreadLocal< ByteBuffer> localEntryBuffer

本地线程变量,用来缓存数据索引ByteBuffer。

FlushDataService flushDataService

数据文件刷盘线程。

CleanSpaceService cleanSpaceService

清除过期日志文件线程。

boolean isDiskFull = false

磁盘是否已满。

long lastCheckPointTimeMs

上一次检测点(时间戳)。

AtomicBoolean hasLoaded

是否已经加载,主要用来避免重复加载(初始化)日志文件。

AtomicBoolean hasRecovered

是否已恢复。

2、DLedger 存储 对标 RocketMQ 存储

存储部分主要包含存储映射文件、消息存储格式、刷盘、文件加载与文件恢复、过期文件删除等,由于这些内容在 RocketMQ 存储部分都已详细介绍,故本文点到为止,其对应的参考映射如下:

在 RocketMQ 中使用 MappedFile 来表示一个物理文件,而在 DLedger 中使用 DefaultMmapFIle 来表示一个物理文件。

在 RocketMQ 中使用 MappedFile 来表示多个物理文件(逻辑上连续),而在 DLedger 中则使用MmapFileList。

在 RocketMQ 中使用 DefaultMessageStore 来封装存储逻辑,而在 DLedger 中则使用DLedgerMmapFileStore来封装存储逻辑。

在 RocketMQ 中使用 Commitlog F l u s h C o m m i t L o g S e r v i c e 来 实 现 c o m m i t l o g 文 件 的 刷 盘 , 而 在 D L e d g e r 中 则 使 用 D L e d g e r M m a p F i l e S t o r e FlushCommitLogService 来实现 commitlog 文件的刷盘,而在 DLedger 中则使用DLedgerMmapFileStore FlushCommitLogService来实现commitlog文件的刷盘,而在DLedger中则使用DLedgerMmapFileStoreFlushDataService来实现文件刷盘。

在 RocketMQ 中使用 DefaultMessageStore C l e a n C o m m i t l o g S e r v i c e 来 实 现 c o m m i t l o g 过 期 文 件 的 删 除 , 而 D L e d g e r 中 则 使 用 D L e d g e r M m a p F i l e S t o r e CleanCommitlogService 来实现 commitlog 过期文件的删除,而 DLedger 中则使用 DLedgerMmapFileStore CleanCommitlogService来实现commitlog过期文件的删除,而DLedger中则使用DLedgerMmapFileStoreCleanSpaceService来实现。

由于其实现原理相同,上述部分已经在《RocketMQ 技术内幕》第4章中详细剖析,故这里就不重复分析了。

3、DLedger 数据存储格式

存储格式字段的含义如下:

magic

魔数,4字节。

size

条目总长度,包含 Header(协议头) + 消息体,占4字节。

entryIndex

当前条目的 index,占8字节。

entryTerm

当前条目所属的 投票轮次,占8字节。

pos

该条目的物理偏移量,类似于 commitlog 文件的物理偏移量,占8字节。

channel

保留字段,当前版本未使用,占4字节。

chain crc

当前版本未使用,占4字节。

body crc

body 的 CRC 校验和,用来区分数据是否损坏,占4字节。

body size

用来存储 body 的长度,占4个字节。

body

具体消息的内容。

源码参考点:DLedgerMmapFileStore#recover、DLedgerEntry、DLedgerEntryCoder。

4、DLedger 索引存储格式

即一个索引条目占32个字节。

5、思考

1、DLedger 如果整合 RocketMQ 中的 commitlog 文件,使之支持多副本?

2、从老版本如何升级到新版本,需要考虑哪些因素呢?

尊敬的读者朋友们,都阅读到这里了,麻烦帮忙点个赞鼓励一下我,谢谢。

见文如面,我是威哥,热衷于成体系剖析JAVA主流中间件,关注公众号『中间件兴趣圈』,回复专栏可获取成体系专栏导航,回复资料可以获取笔者的学习思维导图。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SQLite更换为MySQL数据库,提升数据库速度的2个技巧
下一篇:白话Elasticsearch50-深入聚合数据分析之doc values机制
相关文章