千个Hive UDF迁移到Spark--Facebook实践经验

网友投稿 1126 2022-05-30

多年来,Facebook已将Hive用作主要的查询引擎,当Facebook将作业从Hive迁移到Spark SQL时,遇到了各种各样的挑战和困难,其中Hive UDF的迁移就是遇到了很多明显的问题。本文将着重介绍Facebook在将上千个Hive UDF迁移到Spark SQL时,遇到的兼容性、功能、性能方面的几个问题,以及相应的解决办法。

一、什么是Hive UDF

Hive 支持的函数分为内置函数,和用户自定义函数(UDF)。当内置函数不足以满足需求时,可以通过自定义函数逻辑注册成UDF在SQL中使用即可。

UDF分为三类:

1. 普通UDF

输入一行数据中的一列或者多列,输出单个值。

例如:

SELECT is_substring(col1, col2) AS substring FROM normal_udf

输入

输出

上述例子中,判断col2列是否是col1的子子字符串,每一行对应一个输出结果。

2. 自定义聚合函数

输入表中的一行或多行数据,输出单个结果。例如:max、min属于内置聚合函数。

例如:

SELECT CONTACT_SET(id) AS all_ids FROM dim_three_row

输入

上千个Hive UDF迁移到Spark--Facebook实践经验

输出

上述例子中,COLLECT_SET方法即将id列多个值进行聚合, 用逗号分隔后输出一个值。

3.自定义表生成函数

输入表中一行数据,输出多行多列数据,即类似于生成一个表

例如:

SELECT a.id, b.col1, b.col2 FROM a LATERAL VIEW split_str(a.name) b AS name, age

输入

输出

上述例子,split_str方法对name列进行切分,并结合literal view一起使用,将UDTF的执行结果转储为view b,再与a表进行合并输出最后结果。

二、Spark中如何执行HiveUDF

由于Hive中部分数据类型Spark并不支持。 在Spark中通过封装类,创建Hive的GenericUDF,SimpleGenericUDAF 和GenericUDTF这三种基本类型,例如下图第6行代码。然后调用Hive中的接口执行相应的方法体,再输出最终结果时,再进行解封装将结果转换为Spark中的数据类型(第10行)。

三、Hive UDF迁移到Spark SQL的困难

Facebook计算引擎主要时Hive,其中包含有一千多的UDF,UDF的执行时间占整个业务CPU时间的70%。所以UDF的迁移是整个迁移工作中很重要的一部分。经过最初的几线测试发现,其中只有58%的UDF能够被很好的兼容和支持,对于失败的测试用例所遇到的问题可以分为如下几类:

1.  Spark不支持部分Hive接口

众所周知,目前Spark不支持Hive的部分接口。如下:

以上这些方法都存在兼容性问题,虽然通过业务调整可以避免一些不兼容的接口,但是getRequiredJars,getRequiredFiles这两个方法的使用特别的广泛,它们是用于自动加载需要的文件和jar包的,比如在executor中读取文件时,则需要保证该文件已经在executor的工作目录中,是否则就会出错。需要解决该兼容性问题。

解决方法:

在Spark driver端进行UDF初始化时,识别出需要用到的文件和jar包,通过SparkContext.addJar 和 addFile方法注册这些资源并分布到各个executor节点。

在executor中进行二次确认,先查看需要用到的资源是否已经在工作目录中,若已经包含则不需要处理;否则,尝试创建一个软链接到该文件的绝对路径中。这里没有直接复制文件到executor节点,是为了防止资源文件或jar包过大,带来太多额外的工作负担。

2. 线程安全

Hive中每个任务都在单独的JVM进程中运行,因此绝大多数Hive UDF的编写都没有考虑并发性问题,但是在Spark中每个executor运行在一个单独的JVM进程中,一个executor中可以同时运行多个任务。因此Hive UDF直接运行在Spark中可能会存在线程不安全的问题。

如下例子中,定义了一个static的全局变量mapping,当在Spark中运行时,可能会有多个实例同时执行该UDF,当两个实例都运行到17~19行时,实例1判断mapping为空,执行初始化,此时实例2也判断发现mapping为空,也进行初始化。实例1执行略快,当实例1初始化完成向mapping中写数据后,实例2初始化完成,覆盖了实例1中的数据,从而导致数据丢失。

解决方法1:

通过Synchronize方法加锁,对mapping的初始化进行加锁,从而避免多个进程同时初始化。

这种方法有两个弊端:1、频繁的加锁解锁导致性能下降,因为每一行数据进来都需要进行加锁操作,降低执行效率;2、代码改动量大,上千个UDF都进行这样的修改,代码工作量大。

解决方法2:

将mapping对象从static改为普通对象,每个实例使用自己内部的mapping对象。

缺点:实例之间数据和状态无法共享,消耗更多的内存,因为每个实例都需要保存一份该数据;

优点:代码改动小,易于操作。

3. 序列化和反序列化

在Spark中,UDF的对象在driver端进行初始化、序列化,然后分发给executor节点,在executor中再进行反序列化读取对象。这样的序列化和反序列化中有一些问题,一些类型可以用Kryo进行序列化但是却不能正确的反序列化,例如:guava的ImmutableSet类型。

解决方法:

1、  对于公共的常用类型,可以自定义序列化方法,或者引入已有的序列化和反序列化方法;

2、  对于无法正确序列化和反序列化的对象,加上transient修饰词。即:该对象再driver端只做初始化,不进行序列化,在executor中需要使用该对象是,重新进行初始化。

4.  性能问题

由于Spark和Hive都不支持对方的一些数据类型,在Spark中执行Hive UDF时,先将Spark的数据类型转换为Hive的inspectors或Java类型,在Hive中进行计算,执行完成后再将结果从Hive类型转换为Spark类型,如下图红框中的部分代码。

这种封装和解封的过程,比Spark原生的UDF需要多花2倍的CPU,对于复杂的数据类型,如:map, array, structure等则需要花费更多的时间。Facebook的UDF在Spark计算中占用15%的CPU时间,因此,对于耗时最长的那些Hive UDF,可以将其转换为Spark原生的UDF。

四、Partial Aggregate

Hive中max函数即为聚合函数,其执行流程如下图, Mapper端将所有数据进行shuffle,在Reducer端将相同key的数据读取到同一个reducer中进行max的计算。

SELECT id, max(value) FROM t1 GROUP BY id

这种计算方法有两个问题:(1)每一条数据都需要通过网络shuffle传播,带来的网络压力很大;(2)当存在数据倾斜时,个别的reducer处理时长将明显比其他reducer长,出现大部分reducer空闲,等待某一个reducer执行的情况,从而导致整个作业的执行时间很长。

同样的sql和UDF,Partial Aggregation计算过程如下,每一个mapper先将自己本地的数据执行一遍部分聚合,再将聚合后的结果进行shuffle,reducer再对所有的部分聚合结果进行全局的聚合。

优点:不必要所有的数据都通过网络进行shuffle传输,减轻网络压力;部分计算压力移到mapper端,减轻reducer压力,并且在一定程度上可以减轻数据倾斜带来的弊端。

Spark已经支持了Partial Aggregation,通过一定的适配Hive UDF使其可以支持Partial Aggregation后,整体的工作性能提升,CPU提升20%,shuffle的数据量减少17%。但是却存在部分UDF性能变差,最差的变慢了300%。

经过分析发现有两种情况会导致性能变慢:

(1) 查询规模:列扩展的情况下,可能会导致shuffle的数据量变多。如下例子,查询的列包含了min, max,count,avg,经过partial aggregation mapper端输出列从原来的一列,变为5列,但是mapper端合并的数据量却很少,从而导致整体的shuffle数据量增多。

(2) 数据分布:在一些分布情况下,mapper端数据不能进行合并或者合并的极少,导致数据量没有变少,但是却增加了额外的CPU时间。

解决方法:

我们可以看出,影响Partial aggregation性能的有这三个因素: UDAF的性能、列扩张、行缩减。Facebook提出一种基于代价的Optimizer,利用输入列数、输出的列数、UDAF的计算代价等等属性,计算出计算代价,从而决定是否使用Partial aggregation。

经过该方法,整体的性能有所提升,但是并不能解决所有的UDAF的执行性能变慢的问题,因为“行缩减”这个因素并没有一个好的衡量标准。因为,数据集每天的分布不一定相同、即使是相同的数据集在不同聚合函数下分布情况是不一样的,所以“行缩减”的程度较难量化。

五、未来工作

根据以上分析和介绍,其中性能的提升还有待进一步优化。将考虑一种基于历史作业的开关,决定是否使用Partial aggregation。基于历史作业的eventLog,可以得到很多的统计信息,包括各个节点的执行时间、输出的数据行数等。基于这些信息可以做出更好的预测是否使用Partial aggregation。

近年来,越来越多的论文中提到,利用历史作业的执行信息进行作业优化,比如:提取高价值SubExpression等,进行物化提升具有相同子结构作业的执行时间,相信未来eventLog的利用将越来越多样,越成熟。

引用

https://databricks.com/session/supporting-over-a-thousand-custom-hive-user-defined-functions

软件开发 云计算

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python 和 MySQL 数据库:实用介绍
下一篇:Python中的集合是什么以及如何使用它们?
相关文章