数据结构的定义是什么(数据结构指的是什么)
940
2022-05-29
27. read: 先说说这个方法的返回对象: FetchDataInfo——这是一个case类,包含了日志位移元数据信息以及一个消息集合。这个方法也很简单,就是从日志中读取消息,将起始位移和读取到的消息集合封装进一个FetchDataInfo中。此方法接收3个参数: startOffset表示读取操作执行的开始位移点; maxLength表示最多读取的字节数; maxOffset表示读取操作不能超过的位移点,即返回的消息集合中不能包含该位移。具体逻辑如下: 首先检查下一条消息的位移与给定的起始位移,如果两者相等直接返回,只不过是空的消息集合。否则的话,找到小于等于startOffset的最大位移所在的日志段。如果startOffset比当前最大位移还大或者压根就没有找到刚才的日志段,那么说明要读取的内容已经超出了日至当前的结束offset,直接报错退出。okay,如果到这里很运行正常的话,那么下面就开始循环读取消息: 如果读取的消息集合不为空直接返回,否则跳到下一个日志段继续读取直到下一个日志段为空退出循环。此时的情况是我们已经跨过了最后一个日志段但给定的startOffset确实合法的值——这是有可能的,比如所有比startOffset大的消息都已经被删除了。如果是这样的话,程序简单地返回一个包含空消息集合的FetchDataInfo对象。(后面有更详细的分析)
26. trimInvalidBytes: 消除消息集合尾部的无效字节。在学习这个方法之前,我们要了解Kafka在这个scala文件中定义一个case class类: LogAppendInfo类——这个类保存了每个消息集合的各种信息: 包括这个集合的起始位移、结束位移、未压缩消息(shallow message)数,合法字节数、用到的压缩算法以及表明该消息集合位移是否是单调增加的布尔值。现在再来看trimInvalidBytes方法,这个方法接收2个参数: 一个要是做trim的消息集合,另一个是用LogAppendInfo对象标识的消息集合的通用信息,结果自然是被trim过的消息集合——可能与原消息集合相同。具体做法是: 首先计算出这个消息集合的合法字节数(这要通过analyzeAndValidateMessageSet方法给出,后面会说这个方法)——如果合法字节数小于0,直接报退出。如果该字节数就是消息集合的字节数,那么说明不用做trim直接返回传入的messages即可。否则即说明有非法的字节,那么就新建一个ByteBufferMessageSet消息集合,将limit设置为刚才计算的合法字节数,然后返回。
27. analyzeAndvalidateMessageSet: 上面的方法提到了一个消息集合的合法字节与非法字节,那么如何定义合法性呢?答案就有这个方法给出。从字面上来说,这个方法做的工作就是分析验证消息集合,主要的工作有: a. 验证消息CRC码; b. 验证消息长度合法性; c. 计算消息集合的起始位移; d. 计算结束位移; e. 消息集合中的消息数; f. 计算合法字节数(就是合法消息字节数的累加和); g. 验证位移是否单调增加; h. 验证是否使用了压缩,如果指定了多个,只以最后一条消息的压缩算法为准
28. loadSegments: 该方法就是加载磁盘上的日志文件。具体逻辑如下: a. 如果给定的路径不存在则创建出来; b. 遍历该目录路径下的所有文件删除掉那些临时文件(包括后缀名是.deleted和.cleaned); c. 如果发现是以.swap结尾的文件,说明在上一次的swap过程中Kafka失败了,需要执行恢复操作。针对上面的情况,先去掉结尾的.swap然后判断是.log还是.index结尾。如果是索引文件(.index结尾)则直接删除,反正后面可以重建; 如果是日志数据文件(.log结尾),那么先删除对应的索引文件,然后将.swap去掉表示修复成功; d. 第一遍遍历之后再次进行第二遍遍历。对目录下的每个文件,如果它是索引文件,则寻找对应的.log文件,如果不存在抛出告警信息并直接该索引文件; 如果存在的话不做任何处理; 但如果该文件本身就是日志数据文件,则必然是000000...0000【offset】.log这样的形式; e. 提取基础offset,并判断是否存在对应的索引文件,然后就创建新的日志段对象。f. 创建日志段之后判断是否存在索引文件,如果没有的话重建索引; g. 最后将新创建的日志段加入到日志段map中,至此第二遍遍历完成; h. 此时判断日志段map中是否存在任何日志段,如果没有的话则创建一个offset为0的空日志段——因为每个日志都至少要有一个日志段。如果map中的确有日志段,先调用recoverLog方法(稍后会说)恢复日志段然后重设activetSegment的索引长度(否则容易引发日志段切分);j. 最后为每个日志段检查对应的索引文件(确保索引文件为空以及索引长度一定要是8的倍数,因为索引项长度总是位移的整数倍)
29. recoverLog: 主要为日志段map中自恢复点起的每个日志段重建索引文件并且砍掉那些位于日志和索引尾部的无效字节。如果发现确实存在无效字节,那么就把那些日志段全部删除掉
30. append: 添加给定的消息集合到当前激活的日志段中,如果满足条件的话做切分。
2.3 LogSegment
Segment是个逻辑概念,为了防止log文件过大, 将log分成许多的LogSegments
Segment又分为两部分,MessageSet文件和Index文件,分别命名为[base_offset].log和[base_offset].index,base_offset就是该Segment的起始offset,比前一个segment里面的offset都要大。MessageSet文件是用FileMessageSet类实现的,Index文件是由类OffsetIndex实现,这两个类后面有介绍。同时index文件是可以根据MessageSet文件重新rebuild的
Segment提供对于MessageSet的读写接口
写,需要间隔的更新index文件,应该为了尽量减小index的size,所以只是当写入数据大于indexIntervalBytes时,才增加一条索引
读,由于user传入的是逻辑offest,需要先转化为物理地址才能从文件中读到数据,如何转化参考下面的介绍。
其对应的代码文件是LogSegment.scala,该scala文件有一个非线程安全的类:LogSegment,用于表示日志段。该类的构造函数有6个参数,分别是:
1. log: FileMessageSet定义的消息集合
2. index: OffsetIndex定义的位移索引,包含了位移到物理文件位置的映射
3. baseOffset: 日志文件的基础位移,也就是这个日志段中最低的位移
4. indexIntervalBytes: 索引文件中索引项的间隔,即Kafka查找下一个物理位置时进行线性查找的最大字节数。
5. rollJitterMs: 指定日志段切分时的jitter time,避免日志切分时出现惊群
6. time: 一个时间变量,主要提供时间方面的服务
下面对LogSegment的一些关键代码进行分析:
1. created变量: 创建一个日志段的时间信息是很有用的,所以需要有个变量保存这个信息
2. size方法: 既然是保存消息的日志段,也必然有个方法保存当前日志段占用的字节数,具体实现方法就是调用LogSegment包含的日志对象的size方法
3. bytesSinceLastIndexEntry变量: 这个变量主要的作用就是用于判断在追加写日志的同时是否需要增加一条索引项。由于log.index.interval.bytes默认是4KB,因此每写4KB就会在索引文件中增加一条索引记录。增加索引项之后需要将该变量置为0重新计算
4. lastModified以及lastModified_方法: Kafka在清理日志段的时候根据当前时间与该方法返回值比较清理那些陈旧的日志段并且根据UAP原则提供了同名的setter方法用于更新日志段对象中日志文件和索引文件的最近修改时间。
5. delete方法: 逻辑很简单的方法,就是删除日志文件和索引文件
6. close方法: 关闭日志段的方法,具体就是关闭底层的日志文件和索引文件
7. changeFileSuffixes方法: 同时更改日志文件和索引文件的后缀名。例如在删除日志段的时候把a.log和a.index更名为a.log.delete和a.index.delete
8. flush方法: 将buffer中的消息和索引项写入磁盘
9. nextOffset方法: 计算这个日志段中下一条消息的位移。这个方法运行起来是有很高的代价的,因为它需要从索引文件中最后一项标识的位移处开始读出一个消息集合。特别注意的是如果索引文件为空的话,它就需要将整个日志段的数据都读出来并返回一个FetchDataInfo对象。这个对象由一个位移元数据加上一个消息集合组成。如果这个FetchDataInfo为空,或者它包含的消息集合为空就只返回baseOffset——即这个日志段开始offset,否则返回offset+1 (主要是因为消息集合本身也就是一组MessageAndOffset对象)
10. truncateTo方法: 给定一个位移,将位于该位移之后的所有索引项和日志项全部清除,如果给定的位移大于日志段本身的最大位移就什么都不做。最后函数返回日志数据总共截断的字节数。值得注意的是,如果把所有日志数据都截断了,那么需要更新这个日志段的创建日期。同时还会将检查是否增加索引项的指针清零。
11. append方法: 将一组消息追加写入到以给定offset开始的日志段中。如果写入超过了4KB(默认的log.index.interval.bytes属性值)则额外写入一条新的索引项记录到索引文件中。这个方法不是线程安全的,所以后面调用的时候需要有锁同步机制的保护
12. translateOffset方法: 给定一个offset,找出该日志段中不小于该offset的第一条消息对应的物理文件位置。这个方法还有一个参数可以用来调优,不必从查询到的索引项中包含的位置开始,可以直接从给定的文件位置开始查找。当然这样做的前提是你必须已经知道这是文件中的一个合法的开始位置并且比最靠近的索引项中包含的位置要大。
13. read方法: 给定一个offset,从不小于这个offset处的第一条开始读消息,不能超过maxSize个字节,也必须在maxOffset(如果提供了maxOffset)处结束——读到的这些消息封装到一个FetchDataInfo对象返回。FetchDataInfo由一个日志位移元数据LogOffsetMetadata对象和一个消息集合组成,所谓的LogOffsetMetadata就是由消息offset加上该日志段的基础位移再加上日志段内的相对物理位置组成。这个方法有一个关键的问题是,要读取消息集合到底多少字节?如果给定的maxSize是0,那么就返回一个空的消息集合。如果maxSize大于0且没有指定maxOffset,那么就表示我们能够读取最多maxSize字节的消息;而如果maxSize>0且指定了maxOffset,程序就需要计算一下maxOffset所表示的物理文件位置与起始位置的差距和maxSize谁大谁小——同时也只能选取小的作为最终的可读取字节数
14. recover方法: 恢复一个日志段——即根据日志文件重建索引并砍掉那些无效的字节,所谓的无效字节就是由参数限定的,任何在maxMessageSize之外的字节都是为无效状态。该方法实现也很简单,就是先将索引项全部截断并将索引文件重置为原来的大小,然后遍历该消息集合,超过indexIntervalBytes之后就追加一条索引记录从而达到重建索引的目的
2.4 FileMessageSet
FileMessageSet继承MessageSet类,通过FileChannel可以读写文件,是Segment中实际存放ByteMessageSet消息的文件对象。 FileChannel是Java NIO提供的类,实际上写日志时用的是RandomAccessFile来打开文件,允许来回读写文件,指定位置读写。FileMessageSet类有一个起始和结束的指针标识消息集合的起始位置和结束位置——这样就能实现从整个消息集合中切片的功能。该类有5个构造函数参数:
1. file: 日志文件
2. channel: 底层使用到的文件通道(file channel)
3. start/end: 消息集合在文件中的绝对起始位置/绝对结束位置
4. isSlice: 是否从整个消息集合中切分处一个切片
除了主构造函数之外还提供了很多便利的辅助构造函数。另外FileMessageSet类定义了一个_size变量,用于保存消息集合的字节数(同时考虑了是否支持切片)。如果不是一个切片,则将底层文件通道的指针移到最后一个字节。该类定义的方法如下:
1. read: 从日志文件中的指定位置读取指定大小的buffer并封装到一个FileMessageSet对象返回。
2. sizeInBytes: 该文件消息集合字节数
3. searchFor: 从给定位置处开始向后寻找不小于targetOffset的位移,并返回实际的物理文件位置。如果没有找到的话,直接返回null
4. writeTo: 写入这个消息集合到指定的channel,允许从指定的位置写入指定大小的字节数,并返回真实写入的字节数
5. iterator: 获取遍历该消息集合的迭代器,只做一层迭代
6. append: 将保存在一个ByteBuffer中的一组消息追加到指定的该消息集合所在的channel尾部并增加总的消息集合字节数
7. flush: 提交所有已写数据到物理磁盘
8. close: 先调用flush存磁盘,然后关闭channel
9. delete: 从文件系统中删除消息集合
10. truncateTo: 将文件消息集合截断成指定的字节大小
11. readInto: 将底层的文件从给定的位置开始读取内容到一个ByteBuffer中
12. renameTo: 更名消息集合底层的文件名
除了FileMessageSet类,该scala还定义了一个object: LogFlushStats——里面只定义了一个定时器,用于统计写入日志段到文件的时间
2.5 OffsetIndex
Segment的index文件, 这是0.8后加上的,之前message直接使用物理offset标识。新版本中还是改成了使用逻辑offset,让物理地址对用户透明, 这样就需要一个index来匹配逻辑offset和物理地址。index考虑到效率,最好放在内存中,但是考虑到size问题, 所以使用MappedByteBuffer(参考,Java RandomAccessFile用法 )
注释里面说, Index是sparse的,不保证每个message在index都有索引的entry。Index由entry组成,每个entry为8-byte,逻辑offset4-byte,物理地址4-byte。
逻辑offset是基于base offset的相对offset,否则无法保证只使用4-byte,例如baseoffset为50,那么offset 55的逻辑offset为5。
物理地址是该offset所对应的message所在日志文件的位置。因为只有4个字节,这么看日志文件每个最大只能为4G?
下面开始分析代码,首先从构造函数开始,该构造函数接收三个变量:一个表示索引文件的文件变量;一个基础offset和一个变量表示最大的索引文件字节数。该类还定义了一些类成员变量和很多方法,我们一个一个分析:
1. lock: 私有字段,使用ReentrantLock实现,用于同步访问MappedByteBuffer。ReentrantLock提供与synchronized相同的内存和并发性语义,另外性能也更好。
2. roundToExactMultiple: 私有方法,就是计算小于第一个参数的第二个参数的最大整数倍,比如roundToExactMultiple(67, 8)返回64
3. mmap: 私有字段,负责初始化包含该索引的内存映射对象。首先检查给定的File对象,如果不存在的话预先创建出来并设定长度为maxIndexSize,并设定好开始的位置之后返回。
4. size: 私有字段,索引文件中当前保存的索引项(每项都是8字节)
5. maxEntries: 成员变量,索引文件能包含的最大索引项个数
6. relativeOffset: 返回根据base offset的第n个位移。假设n是5,每项是8个字节,那么相对位移的值(使用4个字节)必然是保存在buffer的第40个字节到第43个字节。
7. physical: 获取第n个位移对应的物理文件位置(依然是4个字节)——还是假设n=5,那么返回的值就是从44字节~47字节处保存的值。
8. readLastEntry: 读取索引文件中最后一项对应的OffsetPosition
9. lastOffset: 返回索引文件中最后一个索引项的位移
10. maybeLock: 在一个锁保护的情况下执行给定的方法
11. indexSlotFor: 以二分查找的方式寻找比给定offset小的最大offset。当然了,如果最小的位移都比给定的offset大或者索引文件干脆就是空的话直接返回-1
12. lookup: 计算比给定offset小的最大位移,找到后返回offset-对应物理文件位置的映射对
13. entry: 返回索引文件中的第n个位移映射对
14. append: 从给定的offset-position处插入一索引项。既然叫append,该项给定的offset必须比现有的所有索引项都要大
15. isFull: 判断该索引文件是否已满
16. truncate: 删除所有索引项
17. truncateToEntries: 删除索引项到给定的数目
18. truncateTo: 删除那些位移不小于给定offset的所有索引项
19. resize: 重设索引文件的大小——主要用于新的日志segment切分时候调用。需要注意的是代码中区分了操作系统,因为Windows平台不允许调整内存映射文件的长度
20. forceUnmap: 主要为Windows平台上使用。因为在Windows平台上修改文件长度时需要先释放内存映射对象
21. trimToValidSize: 调整为当前索引文件的真实占用字节大小
22. flush: 调用MappedByteBuffer的force方法将对buffer的修改写入底层的文件
23. delete: 删除该索引文件
24. entries: 返回索引文件中的索引项数
25. sizeInBytes: 索引文件当前使用的索引项字节总数
26. close: 调用trimToValidSize方法关闭索引
27. renameTo: 重命名索引文件名称
28. sanityCheck: 对索引文件进行完整性检查,包括索引文件字节数是否为8的整数倍、当前最大位移是否小于基础位移等
2.6 OffsetPosition
是一个case classe类,是逻辑offset与物理地址直接的映射关系类。
/**
* The mapping between a logical log offset and the physical position
* in some log file of the beginning of the message set entry with the
* given offset.
*/
case class OffsetPosition(val offset: Long, val position: Int)
3. 其他包相关组件
3.1 LogOffsetMetadata
/*
* A log offset structure, including:
* 1. the message offset
* 2. the base message offset of the located segment
* 3. the physical position on the located segment
*/
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = LogOffsetMetadata.UnknownSegBaseOffset,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)
该scala文件时一组伴生对象,定义了日志位移的元数据信息。该类定义了kafka的位移元数据结构,它包括:
1. 消息位移
2. 位移所在日志段的基础位移(起始位移)
3. 所在段的物理位置
该类定义了一些方法用于获取这些信息以及使用这些信息执行一些判断操作:
1. messageOffsetOnly:判断位移元数据信息是否只包括消息位移部分的数据,而其他两部分为空
2. offsetOnOlderSegment:与给定的位移元数据实例相比较判断这个位移是否是在一个比较旧的日志段上。
3. offsetOnSameSegment:与上个方法类似,只是这次比较两个位移元数据信息是否在同一个日志段上
4. precedes:比较这个位移是否在给定位移之前
5. offsetDiff:计算此位移与给定位移之间所含的消息数
6. positionDiff:计算此位移与给定位移之间所差的字节数——前提是两个位移位于同一日志段且此位移在给定位移之前出现。实现方法就是元数据信息中的段内相对物理位置相减。
再说说LogOffsetMetadata object。它定义了三个常量分别代表未知位移的元数据、未知的段起始位移和位置的段内物理文件位置。最后该object还定义了一个OffsetOrdering嵌套类实现了scala的Ordering接口因而支持两个位移元数据实例的比较。compare方法就是调用两个元数据的offsetDiff方法获取两个元数据之间的消息差值。
3.2 FetchDataInfo
就是一个简单的case类,由一个LogOffsetMetadata和一个MessageSet组成
case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet)
3.3 ByteBufferMessageSet
ByteBufferMessageSet之前在Producer分析中也有提到,继承了MessageSet类,是实际写入log文件的结构。我们先来看看这几个带MessageSet后序的类的关系:
ByteBufferMessageSet直接继承MessageSet,而MessageSet继承scala.collection.Iterable:
MessageSet extends Iterable[MessageAndOffset]
即MessageSet是MessageAndOffset类的集合。
Kafka
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。