kafka源码解析之三:Log模块读写源码分析——(三)

网友投稿 786 2022-05-28

即MessageSet是MessageAndOffset类的集合。

case class MessageAndOffset(message: Message, offset: Long)

MessageAndOffset是一个case类,带有Message和offset这两个成员。

从名字就知道是带ByteBuffer的MessageSet类,其构造函数类会调用create函数,里面就会创建一个ByteBuffer:

val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))

for(message <- messages)

writeMessage(buffer, message, offsetCounter.getAndIncrement)

buffer.rewind()

buffer

上面的writeMessage代码如下:

private[Kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {

buffer.putLong(offset)

buffer.putInt(message.size)

buffer.put(message.buffer)

message.buffer.rewind()

}

从上面的函数我们看出buffer里是先写offset和message.size后面再写消息,这样我们就可以看出不压缩时消息的存储格式为:

ByteBufferMessageSet的消息格式:

MessageSet => [Offset MessageSize Message]

Offset => int64

MessageSize => int32

Message 的消息格式:

Message => Crc MagicByte Attributes Key Value

Crc => int32

MagicByte => int8

Attributes => int8

Key => bytes

Value => bytes

4.    Log.append 日志流程

流程图如下:

这个函数把消息集合写入到真正的日志文件中,并返还LogAppendInfo:

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {

val appendInfo = analyzeAndValidateMessageSet(messages)

// if we have any valid messages, append them to the log

if(appendInfo.shallowCount == 0)

return appendInfo

//去掉一些不合法的字节,这些不合法的字节是通过检查CRC值来的

// trim any invalid bytes or partial messages before appending it to the on-dis log

var validMessages = trimInvalidBytes(messages, appendInfo)

try {

// they are valid, insert them in the log

lock synchronized {

// nextOffsetMetadata是一个LogOffsetMetadata,通过updateLogEndOffset函数每次更新messageOffset字段,就能得到当前日志的lastOffset。下一次写从这个offset查找就能知道下一次写入的offset是什么

appendInfo.firstOffset = nextOffsetMetadata.messageOffset

if(assignOffsets) {

// assign offsets to the message set

val offset = new AtomicLong(nextOffsetMetadata.messageOffset)

try {

//对validMessages(ByteBufferMessageSet)消息组里面的每个消息的第一个字段offset进行赋值,这样每条写到日志里面的消息头就有offset了

validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

} catch {

case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)

}

// offset在validMessages.assignOffsets中每遇到一个消息会自增,所以lastoffset 就是offset值减一

appendInfo.lastOffset = offset.get - 1

} else {

// we are taking the offsets we are given

if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)

throw new IllegalArgumentException("Out of order offsets found in " + messages)

}

//在assignOffsets里会重新压缩,需要检查消息长度是否过长

// re-validate message sizes since after re-compression some may exceed the limit

for(messageAndOffset <- validMessages.shallowIterator) {

if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {

// we record the original message set size instead of trimmed size

// to be consistent with pre-compression bytesRejectedRate recording

BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)

BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)

throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."

.format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))

}

}

// check messages set size may be exceed config.segmentSize

if(validMessages.sizeInBytes > config.segmentSize) {

throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."

.format(validMessages.sizeInBytes, config.segmentSize))

}

//如果当前的消息添加后超过active segments的文件长度,则创建一个新的日志文件再添加。

// maybe roll the log if this segment is full

val segment = maybeRoll(validMessages.sizeInBytes)

//把消息追加到active segment中,如果字节数足够,就调用OffsetIndex.append添加索引

// now append to the log

segment.append(appendInfo.firstOffset, validMessages)

//更新nextOffsetMetadata变量到最新的offset

// increment the log end offset

updateLogEndOffset(appendInfo.lastOffset + 1)

trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"

.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))

if(unflushedMessages >= config.flushInterval)

flush()

appendInfo

}

} catch {

case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)

}

}

4.1     maybeRoll

我们再来看看maybeRoll:

private def maybeRoll(messagesSize: Int): LogSegment = {

val segment = activeSegment

if (segment.size > config.segmentSize - messagesSize ||

segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||

segment.index.isFull) {

debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."

.format(name,

segment.size,

config.segmentSize,

segment.index.entries,

segment.index.maxEntries,

time.milliseconds - segment.created,

config.segmentMs - segment.rollJitterMs))

roll()

} else {

segment

}

}

如果当前的消息添加后超过active segments的文件长度或者segment创建时间太久就会切文件,否则直接返回active segment。

Roll代码和注释如下:

/**

kafka源码解析之三:Log模块读写源码分析——(三)

* Roll the log over to a new active segment starting with the current logEndOffset.

* This will trim the index to the exact size of the number of entries it currently contains.

* @return The newly rolled segment

*/

def roll(): LogSegment = {

val start = time.nanoseconds

lock synchronized {

val newOffset = logEndOffset

val logFile = logFilename(dir, newOffset)

val indexFile = indexFilename(dir, newOffset)

//如果文件存在,则先删除

for(file <- List(logFile, indexFile); if file.exists) {

warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")

file.delete()

}

segments.lastEntry() match {

case null =>

case entry => entry.getValue.index.trimToValidSize()

}

//生成一个新的LogSegment

val segment = new LogSegment(dir,

startOffset = newOffset,

indexIntervalBytes = config.indexInterval,

maxIndexSize = config.maxIndexSize,

rollJitterMs = config.randomSegmentJitter,

time = time)

//添加到segments列表中

val prev = addSegment(segment)

if(prev != null)

throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))

//调度一个异步刷盘操作

// schedule an asynchronous flush of the old segment

scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)

info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))

segment

}

}

我们再来看异步刷屏flush到底做了啥:

/**

* Flush log segments for all offsets up to offset-1

* @param offset The offset to flush up to (non-inclusive); the new recovery point

*/

def flush(offset: Long) : Unit = {

if (offset <= this.recoveryPoint)

return

debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +

time.milliseconds + " unflushed = " + unflushedMessages)

for(segment <- logSegments(this.recoveryPoint, offset))

segment.flush()

lock synchronized {

if(offset > this.recoveryPoint) {

this.recoveryPoint = offset

lastflushedTime.set(time.milliseconds)

}

}

}

这个函数主要做的事情就是读取从recveryPoint到offset之间日志段的刷屏,而Segment.flush 最后会分别调用FileMessageSet和OffsetIndex的flush函数刷盘。 最后刷完盘后会更新recoveryPoint到offset。

4.2     Segment.append

这个函数是LogSegment提供的append,作用是将一组消息追加写入到以给定offset开始的日志段中。如果写入超过了4KB(默认的log.index.interval.bytes属性值)则额外写入一条新的索引项记录到索引文件中。这个方法不是线程安全的,所以后面调用的时候需要有锁同步机制的保护

/**

* Append the given messages starting with the given offset. Add

* an entry to the index if needed.

*

* It is assumed this method is being called from within a lock.

*

* @param offset The first offset in the message set.

* @param messages The messages to append.

*/

@nonthreadsafe

def append(offset: Long, messages: ByteBufferMessageSet) {

if (messages.sizeInBytes > 0) {

trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))

// append an entry to the index (if needed)

//如果自上次写入index到现在之间写入log日志的字节大于配置的indexIntervalBytes,则往索引文件总写入当前offset。

if(bytesSinceLastIndexEntry > indexIntervalBytes) {

index.append(offset, log.sizeInBytes())

this.bytesSinceLastIndexEntry = 0

}

//调用FileMessageSet.append,把消息写入到channel里

// append the messages

log.append(messages)

this.bytesSinceLastIndexEntry += messages.sizeInBytes

}

}

Kafka

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:华为徐直军:以持续创新加快数字化发展
下一篇:[译转]Java CyclicBarrier与CountDownLatch
相关文章