spark常见面试点
一、spark是什么?spark和mapreduce的对比及spark架构
spark是什么
spark是一种基于内存的大数据分析计算引擎。
spark和mapreduce的对比
相同点:spark和mapreduce都是用于大数据分析的计算引擎。
不同点(spark比mapreduce快的原因):
1)mapreduce只要简单的map阶段和reduce阶段,每一个maptask所产生的的数据都会落盘,而且需要经过sort combine阶段,产生大量磁盘IO,而spark是尽可能高效实用内存。
2)对于复杂计算来说,mapreduce不能进行迭代式计算需要开启多个应用进行处理,也就是需要开启多个进程处理,而spark可以依赖其dag依赖关系只需要开启一个应用即可,可以避免重复启动应用带来的性能损耗,并且spark的task是线程级别的,基于线程调度是比进程要快的。
3)从编程模型角度看,spark内部根据不同场景提供了不同类型的高性能算子,并可以将数据进行持久化。
综合以上几点,所以spark比mapreduce快。
spark架构
driver:主要负责创建SparkContext上下文,提交spark作业,并把 job转换为task,在各个executor进程之间协调任务的调度。
executor:executor负责task的执行,并把结果返回给driver。
二、spark编程模型RDD
什么是RDD
RDD是Resilient Distributed Dataset的缩写。
弹性:存储弹性,RDD处理的数据可以存储在磁盘也可以存储在内存中 ;计算弹性,计算过程中第一可以根据需求对数据重新分片,第二计算过程中出错了可以借助缓存自动恢复,第三计算出错了有重试机制。
RDD的五大特性
getpartitons:Array[Partition] 数据的分区数(可以设置分区数)。
partitioner: 自定义分区器,控制数据进入哪个分区。
compute:定义分区数据的计算逻辑。
getDenpendencies:RDD的血缘依赖关系,用于容错恢复。
getPreferedLocations:数据本地性,移动数据不如移动计算,存储每个partition的计算位置。
三、spark常用算子
转换算子
1)单value
map, filter ,mapPartitions,flatMap,glom,coalecse,repartition,groupBy等
2)key value
partitionBy ,groupByKey ,reduceByKey,aggregateBykey,mapValues,join,coGroup等
3)双 value
交集intersection,并集union,差集subtract:拉链zip
行动算子
collect,foreach,reduce,take,first,count,save相关算子等
其中collect是把收集executor执行结果到driver打印,而foreach则是每个executor分布式调用foreach打印
四、spark内存模型
统一内存分布
1)堆内内存
300M预留:用于存储spark的内部对象
存储内存:(总spark.executor.memory-300M)*0.6*0.5,用于rdd缓存以及广播变量
执行内存:(总spark.executor.memory-300M)*0.6*0.5,用于存储shuffle过程中的中间数据
用户内存:(总spark.executor.memeory-300M)*0.4,用于t存储ask任务的执行时所需要的数据
存储内存+执行内存((总-300M)*0.6)是有spark.memory.fraction这个参数控制的,所以如果rdd缓存比较频繁或者shuffle过程比较多,可以适当调大spark.memory.fraction,并且可以通过调节spark.storage.storagefraction这个参数来控制存储内存和执行内存的比例。
2)堆外内存
堆外内存的调节通过spark.executor.memeory.overhead来控制,常见的一个报错是shuffle file cannot find。这是因为shuffle过程中reduce端从map端拉取数据时是从map端的堆外内存中拉取数据的,大小默认是executor内存大小的0.1,最小是384M。内存不足时,executor就会挂掉,executor对应的BlockManager就挂掉了,reduce拉不到数据,所以报shuffle file cannot find的问题。一般在提交任务的时候需要通过spark.executor.memory.overhead这个参数适当把堆外内存调大(比如2048M)。
并且可以适当调大spark网络连接的超时时长(spark.core.connection.ack.wait.timeout默认120s),因为也有可能是reduce去拉取数据的时候,executor上正在执行垃圾回收,导致工作线程停止了,所以调大堆外内存的同时适当调大超时时长也有必要(比如300s)
五、spark调优(资源、shuffle、数据倾斜)
1.内存
2.shuffle
3.数据倾斜
spark中的数据倾斜主要分为两大类
1)reduceByKey和groupByKey这类算子导致的数据倾斜问题。一般采用调整并行度和双重聚合的方式解决。如果是由于并行度太低导致多个key落到同一个分区造成的数据倾斜问题,可以适当调大并行度解决。如果是少数几个key造成的数据倾斜,二次聚合的方式处理,先为每个key增加一个随机数前缀进行一次预聚合,减少数据量,然后再把随机数前缀去掉,进行二次聚合即可。
2)join过程中产生的数据倾斜问题
①数据量大的RDDjoin数据量小的RDD
这种情况可以将数据量小的RDD进行广播,数据量大的RDD通过map类算子结果广播变量替换join,避免shuffle过程,也就避免了数据倾斜。
②数据量大的RDDjoin数据量大的RDD
如果造成数据倾斜一个RDD中的其中某几个key造成的
通过分支的思维,通过countByKey算子将产生数据倾斜的数据和未产生数据倾斜的数据分别单独计算,产生数据倾斜的那部分数据可以采用随机数+数据膨胀的方式进行处理,最后将结果union起来即可。
如果造成数据倾斜一个RDD中的其中多个key造成的
如果造成数据倾斜的可以有很大,那分拆就没什么效果了。这个时候可以采用随机数+扩容的方式进行解决,将其中一个RDD的每个key都打上n以内的随机前缀,另外一个RDD将每条数据扩容为n倍,每条数据分别依次打上0-n的前缀,这样也能解决数据倾斜问题。
六、spark提交流程原理
以yarn-cluster为例,通过spark-submit提交任务,首先创建yarn-cluster客户端向ResourceManager提交任务,resourceManger随之会通知NodeManager启动该任务对应的ApplicationMaster,ApplicationMaster会启动Driver线程,Driver向ResourceManager申请相应的资源,然后applicationMaster会在合适NodeManager上启动executor进程,executor进程启动完毕后向Driver反向注册。所有executor全部注册完之后,Driver端开始执行main函数,每遇到一个行动算子就会生成一个job,并根据宽依赖划分stage,每个stage生成对应的TaskSet,之后再将task分发到executor上执行,结果在反馈给Driver端,任务结束后释放资源。