Spark_常规性能调优(二)

网友投稿 717 2022-05-29

Spark_常规性能调优(二)

1、优先使用数组以及字符串,而不是集合类。也就是说,优先使用array,而不是ArrayList、LinkedList、HashMap等集合。

比如:企业应用中的做法是,对于对于HashMap、List这种数据结构,统一用String拼接成特殊格式的字符串,比如Map persons = new HashMap()。可以优化为,特殊的字符串格式:id:name,address| id:name,address….这样的数据格式。

2、避免使用多层嵌套的对象结构。

比如:public class Teacher{private List student = new ArrayList()} , 这就是非常不好的例子。因为Teacher类的内部又嵌套了大量的小Student对象,

优化:我们使用特殊的字符串来进行数据的存储。比如,用json字符串来存储数据,就是一个非常好的选择。

{“teacher”:1,”tracherName”:”leo”,students:[{“studentid”:1,”studentName”:”tome”}]}。

3、对于有些能够避免的场景,尽量使用int替代String,虽然String比ArrayList、HashMap等数据结构高效多,占用内存量少,但是,还有会有额外的信息消耗,就想对于id这样的字段,我们完全可以使用int代替。

由于RDD的数据是持久化到内存,或者是磁盘中的,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如:MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等这种带ser的就代表序列化持久化级别,使用方式为:

RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。 这样是将数据序列化然后在持久化,大大的减小对内存的消耗。

答:当我们有两个Executor,每个Executor都有五个cpu core,

当我们设置参数new SparkConf().set(“spark.default.parallelism”,”5”)

这个参数的意思就是,把所有的RDD的partition都设置成5,也就是说明每个RDD的数据都会被分成五份。那么针对RDD的partition,一个partition就会启动一个线程task来进行计算。但是在集群中,我们分明分配了十个cpu core,但是只会使用五个task,使用五个cpu core,那么剩下的五个就会被浪费掉。

优化:我们完全可以设置相同的task,让十个cpu core都在运行,甚至设置20-30个,因为每个task运行结束的时候不一样,可能某个task很快就完成了,那么cpu又空闲了。

官网建议,设置集群总cpu数量的两倍~~三倍的并行度,这样的话,每个cpu core可能分配到并发运行 2 ~ 3 个task线程,那么这样就会连续、运转、最大的发挥他的功效来。

举例:当我们的spark-submit设置了executor的数量是10个,每个executor要求分配2个core,那么application总共会有20个core,此时我们可以设置:

new SparkConf().set(“spark.default.parallelism”,”60”)合理设置并行度。

答:数据本地化:指的是,数据离计算它的代码有多近,也就是我们要计算的数据和执行代码节点之间的距离。

(1)、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。

(2)、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。

(3)、NO_PREF:数据从哪里过来,性能都是一样的。

(4)、RACK_LOCAL:数据和计算它的代码在一个机架上。

(5)、ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。

1、 当我们task要处理partition的数据,在某一个Executor中,然后,有TaskSchedulerImpl首先会用最好的本地化级别去启动task,尽量的在那个包含了要处理的partition的executor中,去启动task ,就是PROCESS_LOCAL级别。

2、 但是,当那个包含了partition的executor中已经运行了足够的task 的时候,这个时候,这个计算的task默认的情况下,会等待一会,等待该executor什么时候空闲出来了一个cpu core ,然后这个task来启动。这个等待是有时间限制的(这个时间限制可以调优),当发现始终没有空闲的cpu core 的时候,那么就会放大一个级别去启动这个task。

3、 当然这个task去别的地方计算的时候,会调用RDD的itrator()方法,然后通过executor关联的blockManager,来尝试获取数据。BlocakManager底层,首先尝试从getLocal()在本地找数据,如果没有找到,那么就调用getRemote()方法,通过BlockTransferService,链接到有数据的BlockManager,来获取数据。

1、每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自己还要大。

2、Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。【结论:一个字符相当于占用6个字节】

3、Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。

4、 元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。

1、首先,自己设置RDD的并行度, 有两种方式,

(1)、在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task/partition的数量。

(2)、用SparkConf.set()方法,设置一个参数,spark.default.parallelize,可以统一设置这个application所有的RDD的partition数量。

2、其实,在程序中将RDD cache到内存中,调用RDD.cache()方法即可。

3、最后,观察Driver的log,就会发现类似于:“INFO BlockManagerMasterActor.Added rdd_0_1 in memory on mbk.local:50311(size:717.5 KB , free: 333.3 MB)”的日志信息,这就显示了每个partition占用了多少内存。

4、将这个内存信息乘以partition数量,即可得出RDD的内存占用量。

spark 应用性能调优

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:proc中的重要信息
下一篇:基于TI AM5728(浮点双DSP C66x +双ARM Cortex-A15)的开发板
相关文章