欢迎光临散文网 会员登陆 & 注册

一文搞懂Spark CheckPoint

2023-06-27 10:20 作者:涤生大数据  | 我要投稿


CheckPoint主要应用

checkpoint在spark中主要有两方面应用:一是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;另外是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候能够接着之前进度继续进行处理(如之前waiting batch的job会在重启后继续处理)。 RDD checkPoint实际上是利用 hdfs 的冗余来实现高可用;文件rdd保持的是该rdd的信息;

如果 rdd1.checkpoint(), 那么后面依赖 rdd1的 rdd2 在计算时加载 rdd1实际上是从 checkpoint产生的eliableCheckpointRDD而来,(而不是从 rdd0->rdd1重新计算);

如果 rdd1.persist()并且checkpoint了,会优先加载 cache 里面的,然后是 checkpoint 里面的。

Checkpoint 到底是什么和需要用 Checkpoint 解决什么问题?

1.Spark 在生产环境下经常会面临 Transformation 的 RDD 非常多(例如一个Job 中包含1万个RDD) 或者是具体的 Transformation 产生的RDD 本身计算特别复杂和耗时(例如计算时常超过1个小时) , 可能业务比较复杂,此时我们必需考虑对计算结果的持久化。

2.Spark 是擅长多步骤迭代,同时擅长基于 Job 的复用。这个时候如果曾经可以对计算的过程进行复用,就可以极大的提升效率。因为有时候有共同的步骤,就可以免却重复计算的时间。

3.如果采用persists 把数据在内存中的话,虽然最快速但是也是最不可靠的;如果放在磁盘上也不是完全可靠的,例如磁盘会损坏,系统管理员可能会清空磁盘。

4.Checkpoint 的产生就是为了相对而言更加可靠的持久化数据,在 Checkpoint 可以指定把数据放在本地并且是多副本的方式,但是在正常生产环境下放在 HDFS 上,这就天然的借助HDFS 高可靠的特征来完成最大化的可靠的持久化数据的方式

5.Checkpoint 是为了最大程度保证绝对可靠的复用 RDD 计算数据的 Spark 的高级功能,通过 Checkpoint 可以把数据持久化到 HDFS 上来保证数据的最大程度的安任性

6.Checkpoint 就是针对整个RDD 计算链条中特别需要数据持久化的环节(后面会反覆使用当前环节的RDD) 开始基于HDFS 等的数据持久化复用策略,通过对 RDD 启动 Checkpoint 机制来实现容错和高可用

注:对于Shuffle Dependency加Checkpoint是一个值得提倡的做法

 Checkpoint运行原理图

Checkpoint源码解析 

1、以rdd中的 iterator 方法为例,它会先在缓存中查看数据 (内部会查看 Checkpoint 有没有相关数据),然后再从 CheckPoint 中查看数据。

Checkpoint 有两种形式,一种是 reliably 和 一种是 locally

2、通过调用 SparkContext.setCheckpointDir 方法来指定进行 Checkpoint 操作的 RDD 把数据放在那里,在生产集群中是放在 HDFS 上的,同时为了提高效率在进行 Checkpoint 的时候可以指定很多目录

3、在进行 RDD 的 checkpoint 的时候,其所依赖的所有 RDD 都会清空掉;官方建议如果要进行 checkpoint 时,必需先缓存在内存中。但实际可以考虑缓存在本地磁盘上或者是第三方组件如 Taychon 上。在进行 checkpoint 之前需要通过 SparkConetxt 设置 checkpoint 的文件夹

4、作为最住实际,一般在进行 checkpoint 方法调用前通过都要进行 persists 来把当前 RDD 的数据持久化到内存,这是因为 checkpoint 是 lazy 级别,必需有 Job 的执行且在Job 执行完成后才会从后往前回溯那个 RDD 进行了Checkpoint 标指,然后对该标记了要进行 Checkpoint 的 RDD 新启动一个Job 执行具体 Checkpoint 的过程

5、Checkpoint 改变了 RDD 的 Lineage6、当我们调用了checkpoint 方法要对RDD 进行Checkpoint 操作的话,此时框架会自动生成 RDDCheckpointData

7、触发时机当 RDD 上运行一个Job 后就会立即触发 RDDCheckpointData 中的 checkpoint 方法,在其内部会调用 doCheckpoint( )方法,实际上在生产环境上会调用 ReliableRDDCheckpointData 的 doCheckpoint( )方法

8、在生产环境下会导致 ReliableRDDCheckpointData 的 writeRDDToCheckpointDirectory 的调用,而在 writeRDDToCheckpointDirectory 方法内部会触发runJob 来执行当前的RDD 中的数据写到Checkpoint 的目录中,同时会产生ReliableCheckpointRDD 实例

在writeRDDToCheckpointDirectory方法中可以看到:将作为一个单独的任务(RunJob)将RDD中每个parition的数据依次写入到checkpoint目录(writePartitionToCheckpointFile),此外如果该RDD中的partitioner如果不为空,则也会将该对象序列化后存储到checkpoint目录。所以,在做checkpoint的时候,写入的hdfs中的数据主要包括:RDD中每个parition的实际数据,以及可能的partitioner对象(writePartitionerToCheckpointDir)。

9、在写完checkpoint数据到hdfs以后,将会调用rdd的markCheckpoined方法,主要斩断该rdd的对上游的依赖,以及将paritions置空等操作。

checkpoint 读流程

在做完checkpoint后,获取原来RDD的依赖以及partitions数据都将从CheckpointRDD中获取。也就是说获取原来rdd中每个partition数据以及partitioner等对象,都将转移到CheckPointRDD中。在CheckPointRDD的一个具体实现ReliableRDDCheckpointRDD中的compute方法中可以看到,将会从hdfs的checkpoint目录中恢复之前写入的partition数据。而partitioner对象(如果有)也会从之前写入hdfs的paritioner对象恢复。

总的来说,checkpoint读取过程是比较简单的。

持久化和checkpoint的区别:

1.cache/persist 可以说一方面是为了提速,另一方面是为了当某一重要步骤过长,后面的依赖出错(可能是逻辑错误)情况下,可以无需从头算起。e/p。.ersist 可以说一方面是为

2.checkpoint:则更多的是为了高可用。其核心是 hdfs 的 replicaton。其情形是集群总某个点的硬件设备坏掉,例如 persist 中某个盘坏了,整个应用仍然是可用的。Checkpoint的产生就是为了相对而言更加可靠的持久化数据,在Checkpoint的时候可以指定把数据放在本地,并且是多副本的方式,但是在生产环境下是放在HDFS上,这就天然的借助了HDFS高容错、高可靠的特征来完成了最大化的可靠的持久化数据的方式。e/p。.ersist 可以说一方面是为

3.checkpoint是为了最大程度保证绝对可靠的复用RDD计算数据的Spark高级功能,通过checkpoint通常把数据持久化到HDFS来保证数据最大程度的安全性。e/p。.ersist 可以说一方面是为

4.checkpoint就是针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)开始基于HDFS等的数据持久化复用策略,通过对RDD启动checkpoint机制来实现容错和高可用。e/p。.ersist 可以说一方面是为

5.持久化只是将数据保存在BlockManager中;但是RDD的lineage是不会变化的。Checkpoint完毕之后,RDD已经没有之前的lineage(血缘关系),而只有一个强行为其设置的CheckpointRDD, 也就是说checkpoint之后,lineage发生了改变。e/p。.ersist 可以说一方面是为

6.rdd.persist(StorageLevel.DISK_ONLY)虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。e/p。.ersist 可以说一方面是为

假如进行一个1万个步骤,在9000个步骤的时候persist,数据还是有可能丢失的,但是如果checkpoint,数据丢失的概率几乎为0。

注:Spark相比Hadoop的优势在于尽量不去持久化,所以使用 pipeline,cache 等机制。用户如果感觉 job 可能会出错可以手动去 checkpoint 一些重要的RDD,job如果出错,下次运行时直接从 checkpoint 中读取数据。唯一不足的是,checkpoint 需要两次运行 job。

Spark的容错的机制

Spark 采用Lineage(血统)CheckPoint(检查点)两种方式来解决分布式数据集中的容错问题。Lineage本质上类似于数据库的重做日志(redo log),只不过这个日志粒度很大,是对整个RDD分区做重做进而恢复数据的。

在容错机制中,如果集群中一个节点死机了,而且运算窄依赖,则只需要把丢失的父RDD分区重算即可,不依赖于其他节点。但对宽依赖,则需要父RDD的所有分区都重算,这个代价就很昂贵了。因此,Spark 提供设置检查点的方式来保存Shuffle前的祖先RDD数据,将依赖关系删除。当数据丢失时,直接从检查点中恢复数据。为了确保检查点不会因为节点死机而丢失,检查点数据保存在磁盘中,通常是hdfs文件。

Lineage

RDD Lineage被称为RDD运算图或RDD依赖关系图,是RDD所有父RDD的图。它是在RDD上执行transformations函数并创建逻辑执行计划(logical execution plan)的结果,是RDD的逻辑执行计划。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据Transformation和Action操作。当这个RDD的部分分区数据丢失时,它可以通过Lineage找到丢失的父RDD的分区进行局部计算来恢复丢失的数据,这样可以节省资源提高运行效率。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

依赖关系决定Lineage的复杂程度,同时也是的RDD具有了容错性。因为当某一个分区里的数据丢失了,Spark程序会根据依赖关系进行局部计算来恢复丢失的数据。

容错原理

在Spark的容错机制中,当一个节点宕机了,进行容错恢复时,对于窄依赖来讲,进行重计算时只要把丢失的父RDD分区重算即可,不依赖于其他节点。而对于Shuffle Dependency来说,进行重计算时需要父RDD的分区都存在,这样计算量就太大了比较耗费性能。

如下图所示,如果 RDD_1 中的 Partition3 出错丢失,对于OneToOne的窄依赖,Spark 会回溯到 Partition3 的父分区 RDD_0 的 Partition3

对于部分RangeToOne的窄依赖则会回溯到父分区RDD_0的Partition0和Partition3,对 RDD_0 的 Partition0和Partition3 重算算子,得到 RDD_1 的 Partition3。其他分区丢失也是同理重算进行容错恢复。

对于全RangeToOne的窄依赖,由于其父分区是 RDD_0 的所有分区,所以需要回溯到 RDD_0,重算 RDD_0 的所有分区,然后将 RDD_1 的 Partition3 需要的数据聚集合并为 RDD_1 的 Partition3。在这个过程中,由于 RDD_0 中不是 RDD_1 中 Partition3 需要的数据也全部进行了重算,所以产生了大量冗余数据重算的开销。

一文搞懂Spark CheckPoint的评论 (共 条)

分享到微博请遵守国家法律