kafka源码解析之三:Log模块读写源码分析——(四)

网友投稿 959 2022-05-30

FileMessageSet.append的代码比较简单,直接写到起FileChannel中

/**

* Append these messages to the message set

*/

def append(messages: ByteBufferMessageSet) {

val written = messages.writeTo(channel, 0, messages.sizeInBytes)

_size.getAndAdd(written)

}

OffsetIndex.append代码如下:

/**

* Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.

*/

def append(offset: Long, position: Int) {

inLock(lock) {

require(!isFull, "Attempt to append to a full index (size = " + size + ").")

if (size.get == 0 || offset > lastOffset) {

debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))

this.mmap.putInt((offset - baseOffset).toInt)

this.mmap.putInt(position)

this.size.incrementAndGet()

kafka源码解析之三:Log模块读写源码分析——(四)

this.lastOffset = offset

require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")

} else {

throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."

.format(offset, entries, lastOffset, file.getAbsolutePath))

}

}

}

代码里面就比较明显了,按照章节2中OffsetIndex的写入描述,先写4字节的offset-baseOffset,然后再写入日志在log文件中的位置

5.    Log.read日志流程

流程图如下:

这个函数通过指定开始读的startOffset和最大读长度等参数,返回FetchDataInfo信息。原理很简单,就是从保存的segments map中找到baseOffset与startOffset最接近的segment,开始查找和读取数据。

/**

* Read messages from the log

*

* @param startOffset The offset to begin reading at

* @param maxLength The maximum number of bytes to read

* @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).

*

* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.

* @return The fetch data information including fetch starting offset metadata and messages read

*/

def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {

trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

// check if the offset is valid and in range

//检查一下startoffset是否就是nextOffset,如果是表明日志还不存在则返回空消息集。

val next = nextOffsetMetadata.messageOffset

if(startOffset == next)

return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)

//返回一个Map.Entry(baseOffset,LogSegment),其baseOffset是最大的小于等于startOffset的。

var entry = segments.floorEntry(startOffset)

// attempt to read beyond the log end offset is an error

if(startOffset > next || entry == null)

throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))

// do the read on the segment with a base offset less than the target offset

// but if that segment doesn't contain any messages with an offset greater than that

// continue to read from successive segments until we get some messages or we reach the end of the log

while(entry != null) {

//调用LogSegment.read获得fetchInfo

val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)

//如果为空,则找其连续的下一个Logsegment去读取

if(fetchInfo == null) {

entry = segments.higherEntry(entry.getKey)

} else {

return fetchInfo

}

}

// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,

// this can happen when all messages with offset larger than start offsets have been deleted.

// In this case, we will return the empty set with log end offset metadata

FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)

}

5.1   LogSegment.read

/**

* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include

* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.

*

* @param startOffset A lower bound on the first offset to include in the message set we read

* @param maxSize The maximum number of bytes to include in the message set we read

* @param maxOffset An optional maximum offset for the message set we read

*

* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,

*         or null if the startOffset is larger than the largest offset in this log

*/

@threadsafe

def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {

if(maxSize < 0)

throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

//获得改logsegment的size

val logSize = log.sizeInBytes // this may change, need to save a consistent copy

//把逻辑offset转为实际物理地址,并返回一个OffsetPosition,其offset是大于等于startOffset的。后面会对这个函数详细介绍

val startPosition = translateOffset(startOffset)

// if the start position is already off the end of the log, return null

if(startPosition == null)

return null

//生成一个LogOffsetMetadata对象,包含逻辑offset和物理地址等信息

val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

// if the size is zero, still return a log segment but with zero size

if(maxSize == 0)

return FetchDataInfo(offsetMetadata, MessageSet.Empty)

//因为这个接口可以指定最大读取到的maxOffset,下面会计算实际最大能读取的长度

// calculate the length of the message set to read based on whether or not they gave us a maxOffset

val length =

maxOffset match {

case None =>

// no max offset, just use the max size they gave unmolested

maxSize

case Some(offset) => {

// there is a max offset, translate it to a file position and use that to calculate the max read size

if(offset < startOffset)

throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))

//再一次把max offset转为物理地址,然后取maxSize与maxOffset到startOffset之间的小值作为读取长度

val mapping = translateOffset(offset, startPosition.position)

val endPosition =

if(mapping == null)

logSize // the max offset is off the end of the log, use the end of the file

else

mapping.position

min(endPosition - startPosition.position, maxSize)

}

}

//返回fetchDataInfo对象

FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

}

代码中一个主要函数是translateOffset把逻辑地址转为实际物理地址:

/**

* Find the physical file position for the first message with offset >= the requested offset.

*

* The lowerBound argument is an optimization that can be used if we already know a valid starting position

* in the file higher than the greatest-lower-bound from the index.

*

* @param offset The offset we want to translate

* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and

* when omitted, the search will begin at the position in the offset index.

*

* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.

*/

@threadsafe

private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {

//在index buffer中查找最大的小于等于offset的OffsetPosition关系

val mapping = index.lookup(offset)

//根据OffsetPosition得到的物理地址,在log日志查找更接近的OffsetPosition(大于等于目标offset的位置)

log.searchFor(offset, max(mapping.position, startingFilePosition))

}

Index.lookup代码:

def lookup(targetOffset: Long): OffsetPosition = {

maybeLock(lock) {

//把byteBuffer复制一份,复制的副本和mmap之间是共享内容的,新缓冲区的position,limit,mark和capacity都初始化为原始缓冲区的索引值,然而,它们的这些值是相互独立的

val idx = mmap.duplicate

//用二分法在index buffer中找到最大的小于等于targetOffset的位置。

val slot = indexSlotFor(idx, targetOffset)

//找到索引在那个位置后,返回OffsetPosistion类

if(slot == -1)

OffsetPosition(baseOffset, 0)

else

OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))

}

}

其中relativeOffset与physical函数,就是根据Bytebuffer中的位置slot,返回相对逻辑offset与物理地址:

/* return the nth offset relative to the base offset */

private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)

/* return the nth physical position */

private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)

再看FileMessageSet.searchFor: 主要做的事情是从startingPosition开始,按顺序找到第一个offset大于等于目标offset的消息位置,并返回OffsetPosition类。

/**

* Search forward for the file position of the last offset that is greater than or equal to the target offset

* and return its physical position. If no such offsets are found, return null.

* @param targetOffset The offset to search for.

* @param startingPosition The starting position in the file to begin searching from.

*/

def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {

var position = startingPosition

//分配一个LogOverHead大小的buffer,用于读取消息头,LogOverHead大小为12 byte(MessageSize 4, OffsetLength 8)

val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)

val size = sizeInBytes()

while(position + MessageSet.LogOverhead < size) {

buffer.rewind()

channel.read(buffer, position)

//如果消息头都读不出来,抛出异常

if(buffer.hasRemaining)

throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"

.format(targetOffset, startingPosition, file.getAbsolutePath))

buffer.rewind()

val offset = buffer.getLong()

//返回大于等于targetOffset的OffsetPosition

if(offset >= targetOffset)

return OffsetPosition(offset, position)

val messageSize = buffer.getInt()

if(messageSize < Message.MessageOverhead)

throw new IllegalStateException("Invalid message size: " + messageSize)

//position移位到下一个消息,具体消息布局请看上面章节。

position += MessageSet.LogOverhead + messageSize

}

null

}

至此,Log模块的基本读写函数已经分析完毕

开发者

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:抖音、腾讯、阿里、美团春招服务端开发岗位硬核面试(上)
下一篇:如何才能在IoT深水区挖到第一桶金?
相关文章