文档出现乱码怎么回事?(文档为什么突然乱码了)
767
2022-05-28
即MessageSet是MessageAndOffset类的集合。
case class MessageAndOffset(message: Message, offset: Long)
MessageAndOffset是一个case类,带有Message和offset这两个成员。
从名字就知道是带ByteBuffer的MessageSet类,其构造函数类会调用create函数,里面就会创建一个ByteBuffer:
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for(message <- messages)
writeMessage(buffer, message, offsetCounter.getAndIncrement)
buffer.rewind()
buffer
上面的writeMessage代码如下:
private[Kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
buffer.putLong(offset)
buffer.putInt(message.size)
buffer.put(message.buffer)
message.buffer.rewind()
}
从上面的函数我们看出buffer里是先写offset和message.size后面再写消息,这样我们就可以看出不压缩时消息的存储格式为:
ByteBufferMessageSet的消息格式:
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
Message 的消息格式:
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
4. Log.append 日志流程
流程图如下:
这个函数把消息集合写入到真正的日志文件中,并返还LogAppendInfo:
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages)
// if we have any valid messages, append them to the log
if(appendInfo.shallowCount == 0)
return appendInfo
//去掉一些不合法的字节,这些不合法的字节是通过检查CRC值来的
// trim any invalid bytes or partial messages before appending it to the on-dis log
var validMessages = trimInvalidBytes(messages, appendInfo)
try {
// they are valid, insert them in the log
lock synchronized {
// nextOffsetMetadata是一个LogOffsetMetadata,通过updateLogEndOffset函数每次更新messageOffset字段,就能得到当前日志的lastOffset。下一次写从这个offset查找就能知道下一次写入的offset是什么
appendInfo.firstOffset = nextOffsetMetadata.messageOffset
if(assignOffsets) {
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
//对validMessages(ByteBufferMessageSet)消息组里面的每个消息的第一个字段offset进行赋值,这样每条写到日志里面的消息头就有offset了
validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
// offset在validMessages.assignOffsets中每遇到一个消息会自增,所以lastoffset 就是offset值减一
appendInfo.lastOffset = offset.get - 1
} else {
// we are taking the offsets we are given
if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
//在assignOffsets里会重新压缩,需要检查消息长度是否过长
// re-validate message sizes since after re-compression some may exceed the limit
for(messageAndOffset <- validMessages.shallowIterator) {
if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
// we record the original message set size instead of trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
}
}
// check messages set size may be exceed config.segmentSize
if(validMessages.sizeInBytes > config.segmentSize) {
throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validMessages.sizeInBytes, config.segmentSize))
}
//如果当前的消息添加后超过active segments的文件长度,则创建一个新的日志文件再添加。
// maybe roll the log if this segment is full
val segment = maybeRoll(validMessages.sizeInBytes)
//把消息追加到active segment中,如果字节数足够,就调用OffsetIndex.append添加索引
// now append to the log
segment.append(appendInfo.firstOffset, validMessages)
//更新nextOffsetMetadata变量到最新的offset
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
if(unflushedMessages >= config.flushInterval)
flush()
appendInfo
}
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
4.1 maybeRoll
我们再来看看maybeRoll:
private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
if (segment.size > config.segmentSize - messagesSize ||
segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||
segment.index.isFull) {
debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
.format(name,
segment.size,
config.segmentSize,
segment.index.entries,
segment.index.maxEntries,
time.milliseconds - segment.created,
config.segmentMs - segment.rollJitterMs))
roll()
} else {
segment
}
}
如果当前的消息添加后超过active segments的文件长度或者segment创建时间太久就会切文件,否则直接返回active segment。
Roll代码和注释如下:
/**
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
* @return The newly rolled segment
*/
def roll(): LogSegment = {
val start = time.nanoseconds
lock synchronized {
val newOffset = logEndOffset
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
//如果文件存在,则先删除
for(file <- List(logFile, indexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
segments.lastEntry() match {
case null =>
case entry => entry.getValue.index.trimToValidSize()
}
//生成一个新的LogSegment
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
//添加到segments列表中
val prev = addSegment(segment)
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
//调度一个异步刷盘操作
// schedule an asynchronous flush of the old segment
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
segment
}
}
我们再来看异步刷屏flush到底做了啥:
/**
* Flush log segments for all offsets up to offset-1
* @param offset The offset to flush up to (non-inclusive); the new recovery point
*/
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint)
return
debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
time.milliseconds + " unflushed = " + unflushedMessages)
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()
lock synchronized {
if(offset > this.recoveryPoint) {
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
}
}
}
这个函数主要做的事情就是读取从recveryPoint到offset之间日志段的刷屏,而Segment.flush 最后会分别调用FileMessageSet和OffsetIndex的flush函数刷盘。 最后刷完盘后会更新recoveryPoint到offset。
4.2 Segment.append
这个函数是LogSegment提供的append,作用是将一组消息追加写入到以给定offset开始的日志段中。如果写入超过了4KB(默认的log.index.interval.bytes属性值)则额外写入一条新的索引项记录到索引文件中。这个方法不是线程安全的,所以后面调用的时候需要有锁同步机制的保护
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param offset The first offset in the message set.
* @param messages The messages to append.
*/
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
//如果自上次写入index到现在之间写入log日志的字节大于配置的indexIntervalBytes,则往索引文件总写入当前offset。
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
//调用FileMessageSet.append,把消息写入到channel里
// append the messages
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
Kafka
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。