Spark避坑指南----UnsafeRow对象的持久化

网友投稿 675 2022-05-29

Spark推出Tungsten计划用于提升Spark的性能与资源使用,其中为了消除JVM对象模型和GC代价,提供了UnsafeRow对象类型。它由jvm提供的sun.misc.Unsafe实现,内部存储的是二进制,继承自InternalRow,是SparkSQL中的中间算子的处理和输出数据类型。

Spark避坑指南----UnsafeRow对象的持久化

正是由于UnsafeRow的特殊性,我们发现在某些情况下可能会无法正确序列/持久化该类型,产生数据读取不一致的情况,下面我们通过几个例子说明:

例子1:默认的RDD.saveAsObjectFile无法正确处理UnsafeRow类型

首先我们准备一些用于测试的数据表:

我们希望获取到查询`select * from emp, dept where emp.emp_dept_id = dept.dept_id`的结果,该查询对两张表进行了Join操作,后续我们希望将其RDD的数据保存,再重新读取保存的数据。

为了得到RDD的结果,我们构造一个LogicalRDD的Plan,该Plan会直接Scan RDD的数据。再通过Dataset.ofrows来得到Plan的结果

得到的正确结果如下:

同样的,我们调用rdd.saveAsObjectFile将RDD[InternalRow]持久化,再利用spark.sparkContext.objectFile读取。

得到的结果发生了问题,它在总行数不变的情况下,数据被多次复制了,数据读取不一致:

我们查看saveAsObjectFile方法,发现序列化的方式是Java的默认序列化方式,该方法无法正确序列化UnsafeRow对象。

例子2:RDD的checkpoint方法

Spark提供了checkpoint的方法帮助开发者做中间结果的持久化,开发者可以利用checkpoint将计算查询中复杂的中间结果进行缓存,减少重复计算。

其中,localCheckpoint是将结果存在executor的本地磁盘中,checkpoint是将结果存在hdfs中,checkpoint相比localCheckpoint能获得容错机制,但是性能会相对较差。

在本例中,我们仍采用之前的数据和查询,首先验证一下localCheckpoint():

得到的结果也是错误的:

由于checkpoint是惰性的,并且在实际的调用过程中会将原来的计算重新执行一遍,所以一般推荐在checkpoint之前进行cache操作,这样到了真正执行时,checkpoint会直接读取cache的数据,而不用触发二次计算:

结果和localCheckpoint一样,是错误的数据。

通过以上的例子我们发现在对UnsafeRow的类型持久化时,java的序列化方法不能起到正确的作用。UnsafeRow支持的序列化方式为Externalizable和KryoSerializable,我们再对例子2进行验证,需要做两处修改:

1) 在创建sparkSession的时候设置“spark.serializer” 为 “org.apache.spark.serializer.KryoSerializer”

2)在persist的时候选择带SER的StorageLevel

得到的结果如下:

和实际的结果是一致的。

EI企业智能 智能数据 数据湖探索 DLI

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Ubuntu 软连接vs硬链接 总结
下一篇:【微资讯】细思极恐,你还在被APP授权吗
相关文章