欢迎光临散文网 会员登陆 & 注册

Spark优化

2023-01-10 11:35 作者:深言不忘心  | 我要投稿

一、spark的优化的目的是什么?

减少内存的使用,减少shuffle,减少节点之间数据传输的数据量。

二、spark优化方式

  • 2.1 资源优化

    在提交任务时可以通过给参数赋值对任务进行优化,比如

    指定当前application一共使用多个executor

    --num-executor

    指定启动一个executor需要用多少个core

    --executor-cores 

    指定启动一个executor需要用多少内存,这里的core就对应了服务器的线程数,一般core的数量和任务的分区数为一个core对应1-3个分区task,避免资源浪费

    --executor-memeory

    ./spark-submit --master spark://xxx  --executor-cores xxx --executor-memory xx .....

  • 2.2 并行度优化

    除了提交任务时指定资源外,还要设置在代码中设置相关的并行度,不然并行度很小,给再多的资源也用不了。

    设置并行度的方式分以下几种情况:

    1)在算子后面直接带上分区数个数


    2)spark.default.parallelism

这个参数表示的意思是代码中RDD默认生成的分区数,在本地环境中跟local后面带上的数是一样的,在集群(standalone和yarn)中是跟executor所在节点核的总数是一致的。

    3)sparkStreaming中direct模式

与读取的topic的分区数保持一致。

  • 2.3 代码优化

    1)对代码中频繁用到的RDD进行持久化,当一个job跑完,其它job再用相同的RDD时就可以直接到内存中读取数据而不用再重新执行。

    持久化的方式有:

    ①cache:默认调用的就是persist()=persist(StorgeLevel.memeory_level)

    ②persist:memory_only,memory_and_disk,memory_only_ser,memory_and_disk_ser

    ③checkpoint:一般在sparkStreaming中存储状态使用


    2)尽量避免使用shuffle算子

    大多数shuffle还是避免不了的主要就是join,有些情况可以用map类算子+广播变量的方式代替join。

    3)多使用高性能算子

    比如在保存数据或者将数据插入数据库时,使用mapPartition代替map,foreachPartition代替foreach;在处理大量小文件或者在对数据进行过滤后,可以使用coalesce算子连减小分区,在数据量大,分区少的时候可以用repartition来增加分区等等


  • 2.4 shuffle优化

     主要可以优化的参数有:每次shuffle过程拉取数据的缓存大小,默认48M,数据量大的时候可以适当增大,较少数据拉取的次数。同时还可以调节拉取任务失败重试次数以及重试等待时间等。

    • 2.5 内存优化

      spark的统一内存分布如下:

    • 1)300M预留

      2)(总-300M)*0.6(-- spark.memory.fraction)

       其中0.5是用于RDD缓存和广播变量(--spark.memory.storageFraction)

        0.5是用于shuffle。

    • 3)(总-300M)*0.4

         这是task运行内存

      所以要提高task运行内存的话,可以将spark.memeory.fraction的比例调低一点。

  • 2.6 堆外内存优化

    reduce端去map端拉取数据的时候,是从map端的堆外内存中获取的,堆外内存的大小默认是executor内存大小的0.1,最小为384M。真正处理大数据的时候,这经常出现问题,当内存不足时executor会挂掉,会报shuffle file cannot find的问题。也就是shuffleReadTask拉取数据的时候文件找不到,所以可以把堆外内存调大点

    --conf  spark.executor.memoryOverhead=2048M

  • 2.7 数据倾斜处理

    spark中早成数据倾斜的原因是某些分区的数据量明显大于其它分区,导致某些task处理时间过长。

    主要分为以下两大类

    1)使用reduceByKey或者groupByKey这类算子造成数据倾斜

    这类情况可以通过双重聚合或调整并行度的方式解决,如果是并行度太低导致多个key落到同一个分区造成的数据倾斜,可以调大并行度解决。如果是由于少数key造成的数据倾斜也可以通过双重聚合的方式解决,也就是通过先为每个key增加随机数前缀先做一次聚合,减少数据量,然后再把随机数前缀去掉聚合。


    2)join过程产生数据倾斜

    join造成数据倾斜也能分为以下几种

    ①数据量大的RDDjoin数据量小的RDD

    这种情况可以将数据量小的RDD进行广播,然后数据量较大的RDD通过map类算子结合广播变量替换join计算,避免shuffle过程,也就避免了数据倾斜。

    ②数据量大的RDDjoin数据量大的RDD

    如果造成数据倾斜是由其中一个RDD少部分key的数据量过大引起的,这个时候可以对数据进行分拆,将造成数据倾斜的RDD的那些key单独形成一个RDD,未造成数据倾斜的形成一个RDD,两部分数据单独计算,其中造成数据倾斜的那一部分可以采取随机数+数膨胀的方式解决数据倾斜问题,最后再将计算结果union即可。

如果造成数据倾斜是由很多key,那分拆就没什么用了,这个时候可以通过随机数+扩容的方式解决,将其中一个RDD的每条数据的key加上一个n以内的随机前缀,将另外一个RDD的每条数据扩容为n条数据,并依次打上0-n的前缀。最后将两个处理的RDDjoin即可。



Spark优化的评论 (共 条)

分享到微博请遵守国家法律