现在是2019版,如何回到老版(无法回到老版本)
929
2022-05-29
众所周知,目前Spark的基于代价的优化策略,能够给SQL执行计划带来很大的优化,比如:调整Join顺序,决定Join类型(BroadcastHashJoin 或者 SortMergeJoin)等等。 但是该优化策略有一个明显的问题是:对于代价的估计是基于表的一些统计信息的,若这些统计信息不存在或者过期,则会对SQL的优化带来负面的影响。因此,本文介绍的Adaptive Query Execution就是针对这种问题,不依赖于统计信息进行优化。
AQE的一个难点就是在何时进行再次优化规则。Spark程序执行时,一般都是并行或者是管道式的,但是了解过Spark内核的人都知道,Spark作业有一个DAG Stage的划分,Stage之间会进行shuffle操作,所以每一个stage要等待其上一个stage作业全部完成才能开始,这就为AQE的执行提供了一个时机,因此此时已经能够知道前一个Stage的中间结果的大小、列数等统计信息,可以为我们的AQE执行提供所需的统计信息。
首先,对于第一层叶子节点的Stage(即不依赖于其他任何Stage的那些Stage)不需要执行AQE;
每一个Stage执行完成后,就标记该stage的状态为完成,同时收集统计信息,并更新对应的逻辑计划;
根据收集到的这些统计信息,重新执行指定的一些优化规则,再转为物理计划;
然后,基于这个新的优化后的Plan, 从之前已经完成的Stage向后继续执行,并重复上述步骤,直到整个sql执行完成
AQE有如下3个特征:
1、动态合并Shuffle分区
Spark执行查询过程中会有很多Shuffle操作,即Stage之间的数据传递,需要通过网络对数据进行传递并合并计算等操作。影响Shuffle的性能有很多因素,其中分区的个数就是一个很重要的因素。 分区的个数目前是用默认的配置项200来决定的,该值的选择对Shuffle影响很大:
若分区个数太少,则每个分区需要处理的数据量很大,每个task处理一个分区的数据,可能会需要将数据溢写到磁盘,从而降低执行效率;
若分区个数太大,则每个分区处理很少的数据,但是task个数很多,导致很多小的网络数据获取和传播,同样会因为IO瓶颈带来性能下降。
在AQE中,首先设置一个较大的分区个数,然后随着Stage任务的执行,在运行时根据metrics统计信息将小的数据量的分区进行合并,从而自动调整分区个数。以 SELECT max(i) FROM tbl GROUP BY j 为例,
原表很小,在group执行之前,只有两个分区;
初始分区个数设置为5,则本地group之后会将数据划分为5个分区;
若没有AQE,则shuffle之后将分为5个task分别执行,其中有3个task的数据量很小,提交这样的task执行会浪费一定的资源;
但是开启AQE之后,会自动将小分区合并,如下图,合并之后剩余3个分区,且每个分区的数据量相近。
2、动态切换join策略
Spark中用的最多的Join方式为BroadcastHashJoin 和 SortMergeJoin,所有的Join类型中BroadcastHashJoin性能最好,因为避免了数据的shuffle。 所以Spark目前通过估计join两端表的大小与广播阈值的关系,来判断是否可以使用BroadcastHashJoin。 但是该值的估计常常是不准确的,比如:有一个过滤效率很高的filter,可能使得过滤后的数据可以广播,但是估计值却偏大; 或者是Join的一端是一个很复杂的操作时,估计的值就更加不准确,常常估计出一个很大的值导致使用SortMergeJoin,而实际执行后会发现该复杂的查询后的结果集很小且适合广播。
AQE在执行过程中,重新进行优化,可以利用前一个Stage执行结果的大小,直接的知道是否适合广播。如下例子:
在该例子中,两个表原始Join时,根据CBO估计的大小是SortMergeJoin,但是当stage2执行完成后,调用了AQE重新执行优化规则发现,实际结果小于广播阈值(默认10M),因此可以使用BroadcastHashJoin, 则会修改Join类型,从而节约Join的时间。
这里要注意,前两个Stage中shuffle写的操作此时已经完成,这部分的时间无法避免;能优化的是Join的Stage中Shuffle读以及Join的执行时间。
3、动态优化join数据倾斜
当每个分区的数据分布不均匀时,容易出现数据倾斜的问题,有些场景下尤其是Join时,若出现数据倾斜,可能会导致个别的task任务特别繁重,其他所有的task都执行完毕,executor处于空闲状态,等待这几个数据倾斜的task执行完成。AQE能够自动检测具有数据倾斜的分区,并将这些数据量很大的分区进行切分。 如下例: A和B表做Join,其中A0分区特别大,在没有AQE时如下图:
开启AQE之后,将A0分区划分为两个差不多大的分区(A0-0和A0-1),这两个分区同时与B0分区进行join,因为A0本来就是需要与B0做Join的。划分后,使用了5个数据量相近的task同时执行该任务,可以获得更好的性能。
参考:https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
数据湖探索 DLI
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。