欢迎光临散文网 会员登陆 & 注册

锁屏面试题百日百刷-Spark篇(六)

2023-03-15 21:18 作者:zjlala96  | 我要投稿

锁屏面试题百日百刷,每个工作日坚持更新面试题。锁屏面试题app、小程序现已上线,官网地址:https://www.demosoftware.cn。已收录了每日更新的面试题的所有内容,还包含特色的解锁屏幕复习面试题、每日编程题目邮件推送等功能。让你在面试中先人一步!接下来的是今日的面试题:


1.什么时候需要启用checkpoint?

(1)使用了stateful转换-如果 application 中使用了updateStateByKey或reduceByKeyAndWindow等 stateful 操作,必须提供 checkpoint 目录来允许定时的RDD checkpoint

(2)希望能从意外中恢复Driver。

2.导出checkpoint数据

<1> checkpoint 的时机

在 Spark Streaming 中,JobGenerator 用于生成每个 batch 对应的 jobs,它有一个定时器,定时器的周期即初始化 StreamingContext 时设置的 batchDuration。这个周期一到,JobGenerator 将调用generateJobs方法来生成并提交 jobs,这之后调用 doCheckpoint 方法来进行 checkpoint。doCheckpoint 方法中,会判断当前时间与streaming application start 的时间之差是否是 checkpoint duration 的倍数,只有在是的情况下才进行checkpoint。

<2> checkpoint 的形式

最终 checkpoint 的形式是将类 Checkpoint的实例序列化后写入外部存储,值得一提的是,有专门的一条线程来做将序列化后的 checkpoint 写入外部存储。

3.Spark的内存模型

Spark的Executor的内存管理是基于JVM的内存管理之上,Spark对JVM堆内(On-Heap)空间进行了更为详细的分配,以便充分利用内存,同时Spark引入堆外内存(OffHeap)内存,可以直接在Worker节点的系统内存中开辟空间,进一步优化内存使用。

Spark的堆内(On-Heap)空间是由--executor-memory或spark.executor.memory参数配置,Executor内运行的并发任务共享JVM堆内内存。而且该堆内内存是一种逻辑上的管理,因为对象的释放都是由JVM完成。

Spark引入堆外内存(OffHeap)内存主要是为了提高Shuffle排序的效率,存储优化过的二进制数据。从2.0之后Spark可以直接操作系统的堆外内存,减少不必要的开销。改参数默认不开启,通过spark.memory.offHeap.ennable参数启用,并由spark.memory.offHeap.size参数设定堆外空间大小。

默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:

Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;

用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。

预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。

 

Execution 内存和 Storage 内存动态调整

上面两张图中的 Execution 内存和 Storage 内存之间存在一条虚线,这是为什么呢?

在 Spark 1.5 之前,Execution 内存和 Storage 内存分配是静态的,换句话说就是如果 Execution 内存不足,即使 Storage 内存有很大空闲程序也是无法利用到的;反之亦然。这就导致我们很难进行内存的调优工作,我们必须非常清楚地了解 Execution 和 Storage 两块区域的内存分布。而目前 Execution 内存和 Storage 内存可以互相共享的。也就是说,如果 Execution 内存不足,而 Storage 内存有空闲,那么 Execution 可以从 Storage 中申请空间;反之亦然。所以上图中的虚线代表 Execution 内存和 Storage 内存是可以随着运作动态调整的,这样可以有效地利用内存资源。Execution 内存和 Storage 内存之间的动态调整可以概括如下

具体的实现逻辑如下:

1 程序提交的时候我们都会设定基本的 Execution 内存和 Storage 内存区域(通过

spark.memory.storageFraction 参数设置);

2 在程序运行时,如果双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU规则进行的。若己方空间不足而对方空余时,可借用对方的空间(; 存储空间不足是指不足以放下一个完整的 Block);

3 Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间

4 Storage 内存的空间被对方占用后,目前的实现是无法让对方"归还",因为需要考虑 Shuffle过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache在内存的数据不一定在后面使用。

注意:上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。

Task 之间内存分布

为了更好地使用使用内存,Executor 内运行的 Task 之间共享着 Execution 内存。具体的,Spark 内部维护了一个 HashMap 用于记录每个 Task 占用的内存。当 Task 需要在 Execution 内存区域申请 numBytes 内存,其先判断 HashMap 里面是否维护着这个 Task 的内存使用情况,如果没有,则将这个 Task 内存使用置为0,并且以 TaskId 为 key,内存使用为 value 加入到 HashMap 里面。之后为这个 Task 申请 numBytes 内存,如果Execution 内存区域正好有大于 numBytes 的空闲内存,则在 HashMap 里面将当前 Task 使用的内存加上numBytes,然后返回;如果当前 Execution 内存区域无法申请到每个 Task 最小可申请的内存,则当前 Task 被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。每个 Task 可以使用 Execution 内存大小范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的 Task 个数。一个 Task 能够运行必须申请到最小内存为 (1/2N * Execution 内存);当 N = 1 的时候,Task 可以使用全部的 Execution 内存。

比如如果 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为5,则该 Task 可以申请的内存范围为 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范围。

锁屏面试题百日百刷-Spark篇(六)的评论 (共 条)

分享到微博请遵守国家法律