Spark优化
一、spark的优化的目的是什么?
减少内存的使用,减少shuffle,减少节点之间数据传输的数据量。
二、spark优化方式
2.1 资源优化
在提交任务时可以通过给参数赋值对任务进行优化,比如
指定当前application一共使用多个executor
--num-executor
指定启动一个executor需要用多少个core
--executor-cores
指定启动一个executor需要用多少内存,这里的core就对应了服务器的线程数,一般core的数量和任务的分区数为一个core对应1-3个分区task,避免资源浪费
--executor-memeory
./spark-submit --master spark://xxx --executor-cores xxx --executor-memory xx .....
2.2 并行度优化
除了提交任务时指定资源外,还要设置在代码中设置相关的并行度,不然并行度很小,给再多的资源也用不了。
设置并行度的方式分以下几种情况:
1)在算子后面直接带上分区数个数
2)spark.default.parallelism
这个参数表示的意思是代码中RDD默认生成的分区数,在本地环境中跟local后面带上的数是一样的,在集群(standalone和yarn)中是跟executor所在节点核的总数是一致的。
3)sparkStreaming中direct模式
与读取的topic的分区数保持一致。
2.3 代码优化
1)对代码中频繁用到的RDD进行持久化,当一个job跑完,其它job再用相同的RDD时就可以直接到内存中读取数据而不用再重新执行。
持久化的方式有:
①cache:默认调用的就是persist()=persist(StorgeLevel.memeory_level)
②persist:memory_only,memory_and_disk,memory_only_ser,memory_and_disk_ser
③checkpoint:一般在sparkStreaming中存储状态使用
2)尽量避免使用shuffle算子
大多数shuffle还是避免不了的主要就是join,有些情况可以用map类算子+广播变量的方式代替join。
3)多使用高性能算子
比如在保存数据或者将数据插入数据库时,使用mapPartition代替map,foreachPartition代替foreach;在处理大量小文件或者在对数据进行过滤后,可以使用coalesce算子连减小分区,在数据量大,分区少的时候可以用repartition来增加分区等等
2.4 shuffle优化
主要可以优化的参数有:每次shuffle过程拉取数据的缓存大小,默认48M,数据量大的时候可以适当增大,较少数据拉取的次数。同时还可以调节拉取任务失败重试次数以及重试等待时间等。
2.5 内存优化
spark的统一内存分布如下:
1)300M预留
2)(总-300M)*0.6(-- spark.memory.fraction)
其中0.5是用于RDD缓存和广播变量(--spark.memory.storageFraction)
0.5是用于shuffle。
3)(总-300M)*0.4
这是task运行内存
所以要提高task运行内存的话,可以将spark.memeory.fraction的比例调低一点。
2.6 堆外内存优化
reduce端去map端拉取数据的时候,是从map端的堆外内存中获取的,堆外内存的大小默认是executor内存大小的0.1,最小为384M。真正处理大数据的时候,这经常出现问题,当内存不足时executor会挂掉,会报shuffle file cannot find的问题。也就是shuffleReadTask拉取数据的时候文件找不到,所以可以把堆外内存调大点
--conf spark.executor.memoryOverhead=2048M
2.7 数据倾斜处理
spark中早成数据倾斜的原因是某些分区的数据量明显大于其它分区,导致某些task处理时间过长。
主要分为以下两大类
1)使用reduceByKey或者groupByKey这类算子造成数据倾斜
这类情况可以通过双重聚合或调整并行度的方式解决,如果是并行度太低导致多个key落到同一个分区造成的数据倾斜,可以调大并行度解决。如果是由于少数key造成的数据倾斜也可以通过双重聚合的方式解决,也就是通过先为每个key增加随机数前缀先做一次聚合,减少数据量,然后再把随机数前缀去掉聚合。
2)join过程产生数据倾斜
join造成数据倾斜也能分为以下几种
①数据量大的RDDjoin数据量小的RDD
这种情况可以将数据量小的RDD进行广播,然后数据量较大的RDD通过map类算子结合广播变量替换join计算,避免shuffle过程,也就避免了数据倾斜。
②数据量大的RDDjoin数据量大的RDD
如果造成数据倾斜是由其中一个RDD少部分key的数据量过大引起的,这个时候可以对数据进行分拆,将造成数据倾斜的RDD的那些key单独形成一个RDD,未造成数据倾斜的形成一个RDD,两部分数据单独计算,其中造成数据倾斜的那一部分可以采取随机数+数膨胀的方式解决数据倾斜问题,最后再将计算结果union即可。

如果造成数据倾斜是由很多key,那分拆就没什么用了,这个时候可以通过随机数+扩容的方式解决,将其中一个RDD的每条数据的key加上一个n以内的随机前缀,将另外一个RDD的每条数据扩容为n条数据,并依次打上0-n的前缀。最后将两个处理的RDDjoin即可。


