kafka源码解析之二:Producer代码分析(scala版本)(下)

网友投稿 849 2022-05-29

这里主要是根据config.queueEnqueueTimeoutMs参数选择不同的入队列方式,该参数的值含义如下:

# Timeout for event enqueue:

# 0: events will be enqueued immediately or dropped if the queue is full

# -ve: enqueue will block indefinitely if the queue is full

# +ve: enqueue will block up to this many milliseconds if the queue is full

#queue.enqueue.timeout.ms=

最后再看看异步发送线程里面到底做了些什么,线程的run函数很简单只是调用processEvents函数,代码如下:

private def processEvents() {

var lastSend = SystemTime.milliseconds

var events = new ArrayBuffer[KeyedMessage[K,V]]

var full: Boolean = false

// drain the queue until you get a shutdown command

Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))

.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {

currentQueueItem =>

val elapsed = (SystemTime.milliseconds - lastSend)

// check if the queue time is reached. This happens when the poll method above returns after a timeout and

// returns a null object

val expired = currentQueueItem == null

if(currentQueueItem != null) {

trace("Dequeued item for topic %s, partition key: %s, data: %s"

.format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))

events += currentQueueItem

}

// check if the batch size is reached

full = events.size >= batchSize

//有两种情况会导致消息开始发送,一种是events数组个数达到批量发送的个数,另一种是poll超时后也会发送

if(full || expired) {

if(expired)

debug(elapsed + " ms elapsed. Queue time reached. Sending..")

if(full)

debug("Batch full. Sending..")

// if either queue time has reached or batch size has reached, dispatch to event handler

tryToHandle(events)

lastSend = SystemTime.milliseconds

events = new ArrayBuffer[KeyedMessage[K,V]]

}

}

// send the last batch of events

tryToHandle(events)

if(queue.size > 0)

throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"

.format(queue.size))

}

这个函数主要做的事情是从queue.poll 中获取消息,直到收到一个shutdown命令

把每个消息加到events数组中, 如果evnes数组个数得到config.batchNumMessages或者得到超时时间,则批量发送数据。

最后发送时调用handler.handle(events)进行发送,流程跟同步发送一样 。

3.   Server处理发送流程

Kafka server在启动的时候会开启N个线程来处理请求。其中N是由num.io.threads属性指定,默认是8。Kafka推荐你设置该值至少是机器上磁盘数。在KafkaServer的startup方法中,如代码所示:

def startup() {

...

// 创建一个请求处理的线程池,在构造时就会开启多个线程准备接收请求

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

...

}

class KafkaRequestHandlerPool {

...

for(i <- 0 until numThreads) {

runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)

threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))

threads(i).start() // 启动每个请求处理线程

}

...

}

KafkaRequestHandler实际上是一个Runnable,它的run核心方法中以while (true)的方式调用api.handle(request)不断地接收请求处理,如下面的代码所示:

class KafkaRequestHandler... extends Runnable {

...

def run() {

...

while (true) {

...

apis.handle(request) // 调用apis.handle等待请求处理

}

...

}

...

}

在KafkaApis中handle的主要作用就是接收各种类型的请求。本文只关注ProducerRequest请求:

def handle(request: RequestChannel.Request) {

...

request.requestId match {

case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) // 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest处理

case ...

}

...

}

如此看来,核心的方法就是handleProducerOrOffsetCommitRequest了。这个方法之所以叫这个名字,是因为它同时可以处理ProducerRequest和OffsetCommitRequest两种请求,后者其实也是一种特殊的ProducerRequest。从Kafka 0.8.2之后kafka使用一个特殊的topic来保存提交位移(commit offset)。这个topic名字是__consumer_offsets。本文中我们关注的是真正的ProducerRequest。下面来看看这个方法的逻辑,如下图所示:

整体逻辑看上去非常简单,如下面的代码所示:

def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {

...

val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) // 将消息追加写入本地提交日志

val numPartitionsInError = localProduceResults.count(_.error.isDefined) // 计算是否存在发送失败的分区

if(produceRequest.requiredAcks == 0) { // request.required.acks = 0时的代码路径

if (numPartitionsInError != 0) {

info(("Send the close connection response due to error handling produce request " +

"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")

.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))

requestChannel.closeConnection(request.processor, request) // 关闭底层Socket以告知客户端程序有发送失败的情况

} else {

...

}

} else if (produceRequest.requiredAcks == 1 || // request.required.acks = 0时的代码路径,当然还有其他两个条件

produceRequest.numPartitions <= 0 ||

numPartitionsInError == produceRequest.numPartitions) {

val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))

.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))

requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) // 发送response给客户端

} else { //  request.required.acks = -1时的代码路径

// create a list of (topic, partition) pairs to use as keys for this delayed request

val producerRequestKeys = produceRequest.data.keys.toSeq

val statuses = localProduceResults.map(r =>

r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap

val delayedRequest =  new DelayedProduce(...) // 此时需要构造延时请求进行处理,此段逻辑比较复杂,需要理解Purgatory的概念,本文暂不考虑,后续再分析

...

}

由上面代码可见,无论request.required.acks是何值,都需要首先将待发送的消息集合追加写入本地的提交日志中。此时如何按照默认值是是0的情况,那么这写入日志后需要判断下所有消息是否都已经发送成功了。如果出现了发送错误,那么就将关闭连入broker的Socket Server以通知客户端程序错误的发生。

最后的这个方法就是Partition的appendMessagesToLeader,其主要代码如下:

def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {

inReadLock(leaderIsrUpdateLock) {

val leaderReplicaOpt = leaderReplicaIfLocal() // 判断目标分区的leader副本是否在该broker上

leaderReplicaOpt match {

case Some(leaderReplica) => // 如果leader副本在该broker上

val log = leaderReplica.log.get // 获取本地提交日志文件句柄

val minIsr = log.config.minInSyncReplicas

val inSyncSize = inSyncReplicas.size

// Avoid writing to leader if there are not enough insync replicas to make it safe

if (inSyncSize < minIsr && requiredAcks == -1) { //只有request.required.acks等于-1时才会判断ISR数是否不足

throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"

.format(topic,partitionId,minIsr,inSyncSize))

}

val info = log.append(messages, assignOffsets = true) // 真正的写日志操作,由于涉及Kafka底层写日志的,以后有机会写篇文章专门探讨这部分功能

// probably unblock some follower fetch requests since log end offset has been updated

replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))

// we may need to increment high watermark since ISR could be down to 1

maybeIncrementLeaderHW(leaderReplica)

info

kafka源码解析之二:Producer代码分析(scala版本)(下)

case None => // 如果不在,直接抛出异常表明leader不在该broker上

throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"

.format(topic, partitionId, localBrokerId))

}

}

至此,一个最简单的scala版同步producer的代码走读就算正式完成了,可以发现Kafka设计的思路就是在每个broker上启动一个server不断地处理从客户端发来的各种请求,完成对应的功能并按需返回对应的response

4.   问题 解答

1. producer客户端是否会跟zk交互?

实际上不会跟zookeeper交互,所有的获取metedata的信息都是通过和broker来获取。只有Consumer需要和zk交互。

2. compression.codec 这个参数是如何使用的

这个是Producer端的一个参数,用来设置消息打包是否要加压。从官网看目前支持如下选项:This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip", "snappy" and “lz4”.

例如设置为:

compression.codec=none

3. 问题 :  blockingChannel.send(request) 到底是阻塞的还是非阻塞的。

这个是阻塞的发送。因为在BlockingChannel中的Connect中,设置的socket属性是blocking:

class BlockingChannel( val host: String,

val port: Int,

val readBufferSize: Int,

val writeBufferSize: Int,

val readTimeoutMs: Int ) extends Logging { def connect() = lock synchro n

ized  {

def connect() = lock syn

chro n ized  {

if(!connected) {

try {

channel = SocketChannel.open()

if(readBufferSize > 0)

channel.socket.setReceiveBufferSize(readBufferSize)

if(writeBufferSize > 0)

channel.socket.setSendBufferSize(writeBufferSize)

channel.configureBlocking(true) }

5.

附录1.

Case

http://nerd-is.in/2013-09/scala-learning-pattern-matching-and-case-classes/

6.

参考资料1.       Kafka Producer同步模式发送message

源码分析

http://blog.csdn.net/itleochen/article/details/19926785

2.       Kafka producer原理 (Scala版同步

producer)

Scala Kafka

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IT运维做什么?
下一篇:ThoughtWorks读书雷达(2016)
相关文章