Spark常规性能调优

Posted by BY KiloMeter on April 8, 2019

资源分配调优

性能调优最重要的途径是增加更加的资源,在一定范围之内,增加资源与性能的提升是成正比的。

问:那么应该多分配什么资源?

答:executor、每个executor可以使用的CPU和内存、driver内存

问:在哪里分配这些资源?

答:在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置executor的数量
--driver-memory 100m \  配置driver的内存(影响不大)
--executor-memory 100m \  配置每个executor的内存大小
--executor-cores 3 \  配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

SparkContext,DAGScheduler,TaskScheduler会将算子切割成大量的task,提交到Application的executor上执行。

通过增加executor和executor和CPU核数,都能够增加Spark任务的并行度。

增加executor的内存对于性能的提升主要来自三个方面:

1、如果对RDD进行cache,那么更大的内存将能缓存更多的数据,将更少的数据写入磁盘,甚至不用写入磁盘

2、对于shuffle操作,reduce端需要内存来存放拉取的数据,如果内存不够也会写入磁盘,增大内存可以减少磁盘IO。

3、task的执行过程会创建大量对象,如果内存比较小,可能会导致JVM堆内存满了,然后频繁发生GC

什么是Spark执行的并行度?

Spark中,每个stage中task的数量,就是Spark每阶段的并行度

并行度如果少了的话,程序无法利用全部的资源,将会产生资源的浪费,增加spark的并行度,可以充分利用集群资源,减少每个task需要处理的数据量,最终提升任务的性能和运行速度。

设置推荐:如果task的数量和CPU core的数量一致,可能任务能同时完成,但是这是最理想的情况,但是一般情况下,有些task处理的数据量比较少,很快就能结束掉任务了,首先结束的任务也会空出CPU core,因此最好task的数量是CPU core的2~3倍,一个task执行完后,其他的task可以补充上来。

如何设置并行度

SparkConf conf = new SparkConf() .set(“spark.default.parallelism”, “500”)

RDD调优

1、RDD架构重构与优化

尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。

2、公共RDD一定要实现持久化,即将RDD缓存到内存中/磁盘中,之后对该RDD进行计算都是直接取这份持久化的数据

3、如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM内存溢出。当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。 序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。 如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。同样能能使用内存+磁盘,序列化

4、为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化

持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。该方法仅针对在内存资源十分充足的条件下使用。

持久化代码如下

RDD.persist(StorageLevel.MEMORY_ONLY());//纯内存
RDD.persist(StorageLevel.MEMORY_ONLY_SER());//纯内存+序列化
RDD.persist(StorageLevel.MEMORY_AND_DISK());//纯内存+磁盘
RDD.persist(StorageLevel.MEMORY_AND_DISK_SER());//纯内存+磁盘+序列化
RDD.persist(StorageLevel.DISK_ONLY());//纯磁盘
//如果要使用双副本的,在上面的每个级别后面加上_2
RDD.persist(StorageLevel.MEMORY_ONLY_2());//纯内存*2

广播变量

在一些方法中,如果使用到了外部的变量,默认情况下每个task是会获取该变量的副本,如果使用的外部变量比较多,会产生不小的网络流量,还将会对内存造成不小的压力,假设有1000个task,如果有一个10M的大变量(比如List,Map等),会消耗10G的内存。因为会增加内存压力,所以进而会影响到前面谈到的,比如GC,数据缓存到磁盘等等一系列影响Spark任务性能的情况。

广播变量的使用可以解决这种情况。广播变量的工作原理如下:首先广播变量仅会在driver进程中有一份初始副本,在每个Worker节点上,由BlockManager负责管理每个executor的内存和磁盘上的数据,executore在执行task的过程中,遇到广播变量会首先尝试在本地的BlockManager获取值,第一次获取不到,BlockManager会远程从driver上拉取一份副本到本地,下一次如果本地的executor还要获取该数据,可以直接从BlockManager中获取。此外,BlockManager获取数据时,可能不是通过远程driver进行拉取,也可能从附近较近节点中进行获取。

因此,广播变量使用之后,外部变量的副本数变成了worker的数量,网络传输速度加快(因为数据可能从附近节点获取),内存消耗大大减少。

序列化

前面一直说到序列化,如果开启了序列化,那么在以下场景将会发挥作用:

1、算子函数中使用到了外部变量(外部变量从driver拷贝到worker的executor会有网络的传输)

2、持久化RDD时使用序列化

3、shuffle(shuffle过程也会产生大量的网络传输)

在RDD使用了序列化级别的持久化时,每个executor上处理的RDD partition将会序列化成一个大的字节数组缓存在每个executor的block manager上

spark中推荐使用kyro序列化机制,该机制相比于Java默认的序列化机制,序列化的速度更快,数据的压缩效率更高

如何开启Spark的kyro序列化机制?

SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

前面说到kyro序列化这么厉害,那为什么Spark不把它设置为默认的序列化器呢?原因在于,如果要开启kyro序列化器,需要注册序列化对象所属的类,如果没有注册,kyro的序列化效果无法达到最佳。

SparkConf conf = new SparkConf()
                .setAppName(Constants.SPARK_APP_NAME_SESSION)
                .setMaster("local")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(new Class[]{CategorySortKey.class});

fastutil工具类

fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue; fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度; fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件; fastutil最新版本要求Java 7以及以上版本;

fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。 fastutil还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。

本地化等待时长

Spark的Driver进程,对Application中每个stage的task,进行分配之前,会计算出每个task所需要处理的数据分片,Spark会优先把task任务分配到数据分片所在的机器上,但是如果数据所在机器的资源已经被其他task占据了,此时的task会选择等待一段时间,到最后等不到就选择一个比较差的本地化级别,比如把task分配到离目标数据比较近的节点上,拉取数据后进行运算,由于要拉取数据,就会产生网络IO,这会对性能产生不小的影响。

Spark中本地化级别有以下几种:

PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好 NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输 NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分 RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输 ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

是否要调节该参数可以通过查看Spark作业日志,如果大部分都是PROCESS_LOCAL级别,那就不用调节该参数了,如果大部分都是NODE_LOCAL,ANY,那就需要调节下这几个参数了,反复调整运行程序,查看级别是否有所改善,作业运行时间是否缩短。

spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
//默认等待时间都是3s
//设置方法如下
new SparkConf()
  .set("spark.locality.wait", "10")