如何处理Flink反压问题

网友投稿 979 2022-05-30

一、什么是反压

反压是流处理系统中用来保障应用可靠性的一个重要机制。由于流应用是7*24小时运行,数据输入速率也不是一成不变,可能随时间产生波峰波谷,当某个处理单元由于到来的数据忽然增加,暂时性超出其处理能力时,就会出现数据在接收队列上累积,当数据的累积量超出处理单元的容量时,会出现数据丢失现象甚至因为系统资源耗尽而导致应用崩溃。为此,需要一种反压机制来告知上游处理单元降低数据发送的速率,以缓解下游处理单元的压力。

二、Flink中如何实现反压机制

Flink由于是天然的流计算架构,算子之间的数据传输采取类似阻塞队列的方式,当接收者队列满了后,发送者就会被阻塞,从而产生反压。先来看下flink的网络栈。

逻辑视图:

上游数据输出到ResultPartition,下游任务从InputGate收取数据。

物理视图:

不同任务之间的每个(远程)网络连接将在 Flink 的网络栈中获得自己的 TCP 通道。但是,如果同一任务的不同子任务被安排到了同一个 TaskManager,则它们与同一个 TaskManager 的网络连接将被多路复用,并共享一个 TCP 信道以减少资源占用。如上示例中是 A.1→B.3、A.1→B.4 以及 A.2→B.3 和 A.2→B.4就是复用同一个通道。

每个子任务的结果称为结果分区,每个结果拆分到单独的子结果分区( ResultSubpartitions )中——每个逻辑通道有一个。在堆栈的这一部分中,Flink 不再处理单个记录,而是将一组序列化记录组装到网络缓冲区中。每当子任务的发送缓冲池耗尽时——也就是缓存驻留在结果子分区的缓存队列中或更底层的基于 Netty 的网络栈中时——生产者就被阻塞了,无法继续工作,并承受背压。接收器也是类似:较底层网络栈中传入的 Netty 缓存需要通过网络缓冲区提供给 Flink。如果相应子任务的缓冲池中没有可用的网络缓存,Flink 将在缓存可用前停止从该通道读取。

在1.5之前,这种机制会导致单个子任务的背压会影响同一个通道的所有子任务,因此1.5之后Flink引入了基于信用的流量控制。

基于信用的流量控制,是在Flink原有反压机制上,在ResultPartition和InputGate中间又加了一层信用反馈。每一次 ResultPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息。当Credit未0时,上游就知道下游无法在接收数据,就会停止发送,这样就不会把底层链路堵住。

三、如何定位产生反压的节点

一看反压:

FlinkUI上可以直观的看到每个算子的背压状态。背压状态显示的是请求的block率。

二看监控:

通过反压状态可以大致锁定反压可能存在的算子,但具体反压是由于自身处理速度慢还是由于下游处理慢导致的,需要通过metric监控进一步监控。

如何处理Flink的反压问题

Flink 1.9版本以上可以通过outPoolUsage,inPoolUsage,floatingBuffersUsage,exclusiveBuffersUsage来确认。其中inPoolUsage是floatingBUffersUsage和exclusiveBuffersUsagee的总和。

三看检查点

通过查看检查点历史情况,可以看到检查点在哪个task耗时最长,以及每个subtask的耗时时间,时间长的一般有两种可能,状态较大或者barrier被阻塞。

反压可能产生的原因包括:

1)资源不足:如果CPU/内存或者IO使用率较高,可通过增加并发、资源或优化代码等方式调整。

2)GC频繁:分析GC日志或Heap Dump,确认是否有内存泄露,可适当提高内存缓解。

3)数据倾斜:观察subtask的数据处理是否分布均匀,可通过对热点key进行二次分发或者使用local/global aggregation解决。

注:部分截图来自https://flink.apache.org/2019/06/05/flink-network-stack.html

DLI Apache

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:华为云与北大BIOPIC联合发布蛋白质多序列比对开源数据集
下一篇:Tomcat被曝重大漏洞,影响过去 13 年的所有版本
相关文章