Kafka服务端性能优化指导

网友投稿 794 2022-05-28

Kafka内核对于请求处理各个环节都打了断点,通过断点可以清楚的看到,服务在处理请求各个阶段的耗时,继而指导服务端性能优化。

通过Request类,可以详细查看服务端如何计算请求处理各个阶段耗时:

// RequstChannel#Request class Request(val processor: Int, val context: RequestContext, val startTimeNanos: Long, memoryPool: MemoryPool, @volatile private var buffer: ByteBuffer, metrics: RequestChannel.Metrics) extends BaseRequest { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads @volatile var requestDequeueTimeNanos = -1L // request被IO线程从RequestQueue取出的时间点 @volatile var apiLocalCompleteTimeNanos = -1L // request被Broker本地处理完成的时间点 @volatile var responseCompleteTimeNanos = -1L // request被处理完成,执行RequestChannle#sendResponse的时间点(封装Response的时间点,默认也是入processor的response队列的时间点) @volatile var responseDequeueTimeNanos = -1L // response被processor发送的时间点 @volatile var apiRemoteCompleteTimeNanos = -1L // request被远端Broker完成处理的时间点(部分请求涉及例如procuder请求) @volatile var messageConversionsTimeNanos = 0L // 执行数据格式转换的时间 // def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = { val endTimeNanos = Time.SYSTEM.nanoseconds if (apiLocalCompleteTimeNanos < 0) apiLocalCompleteTimeNanos = responseCompleteTimeNanos if (apiRemoteCompleteTimeNanos < 0) apiRemoteCompleteTimeNanos = responseCompleteTimeNanos ... ... // request从开始入RequestQueue到被IO线程处理耗时,此指标过大,有以下几个原因: // 1. RequestQueue过小,不能承担大量的请求,可通过调大 queued.max.requests 参数来缓解 // 2. I/O线程少,不能及时处理RequestQueue里的请求,可通过调整IO线程个数(num.io.threads)来缓解 val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos) // 请求在本节点处理耗时, 如果此指标过大,需做一下动作: // 1. 检查节点CPU,磁盘IO 看是否存在瓶颈 // 2. 检查节点上的IO线程 val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos) // 请求在其他节点处理耗时,如果此指标过大,需检查节点间网络、对端节点磁盘IO,CPU使用率等指标 val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos) // 限流的时间, 这个参数对于定位数据生产、数据同步慢有帮助作用 val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos) // response在processor的response队列里待的时间长度,如果此指标过大,可能原因是: // 1. processor个数过少,处理不过来,可通过适当调节 num.network.threads 来缓解 val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos) // response被成功发送的耗时,如果此指标较大,说明服务端到对端的网络存在较大延迟,需检查网络 val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) // 执行数据格式转化耗时 val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) // 请求从SocketChannel接收到被完全发送出去的总耗时 val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)

Kafka服务端性能优化指导

上述代码转化成图片,如下图:

最后,上一段服务端Request debug log

2020-11-25 14:09:55,004 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1] | Completed request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-4-fetcher-0, correlationId=166259) -- {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1414976616,session_epoch=166259,topics=[],forgotten_topics_data=[]},response:{throttle_time_ms=0,error_code=0,session_id=1414976616,responses=[]} from connection 10.244.228.252:21007-10.244.228.89:56264-1;totalTime:500.256,requestQueueTime:0.112,localTime:0.214,remoteTime:499.676,throttleTime:0.075,responseQueueTime:0.1,sendTime:0.077,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT | kafka.request.logger (RequestChannel.scala:256) 2020-11-25 14:09:55,094 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2] | Completed request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-5-fetcher-0, correlationId=161103) -- {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=21892705,session_epoch=161103,topics=[],forgotten_topics_data=[]},response:{throttle_time_ms=0,error_code=0,session_id=21892705,responses=[]} from connection 10.244.228.252:21007-10.244.229.85:45824-1;totalTime:501.224,requestQueueTime:0.085,localTime:0.31,remoteTime:500.463,throttleTime:0.105,responseQueueTime:0.111,sendTime:0.148,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT | kafka.request.logger (RequestChannel.scala:256)

EI企业智能 FusionInsight Kafka

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【Java】2021年JavaSE体系基础快速入门(一)
下一篇:股市学习稳扎稳打(三)如何找出颈线,分辨压力位和支撑位
相关文章