本文主要是介绍大数据面试-20210308:hdfs ,Spark streaming, Flink三者中的checkpoint原理 hdfs checkpoint原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
每达到触发条件,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge (这个过程称为checkpoint),如下图所示:
Checkpoint详细步骤
- NameNode管理着元数据信息,其中有两类持久化元数据文件:edits操作日志文件和fsimage元数据镜像文件。新的操作日志不会立即与fsimage 进行合并,也不会刷到NameNode的内存中,而是会先写到edits中(因为合并需要消耗大量的资源),操作成功之后更新至内存。
- 有dfs.namenode.checkpoint.period和dfs.namenode.checkpoint.txns 两个配置,只要达到这两个条件任何一个,secondarynamenode就会 执行checkpoint的操作。
- 当触发checkpoint操作时,NameNode会生成一个新的edits即上图中的edits.new文件,同时SecondaryNameNode会将edits文件和fsimage 复制到本地(HTTP GET方式)。
- secondarynamenode将下载下来的fsimage载入到内存,然后一条一条地执行edits文件中的各项更新操作,使得内存中的fsimage保存最新, 这个过程就是edits和fsimage文件合并,生成一个新的fsimage文件即上图中的Fsimage.ckpt文件。
- secondarynamenode将新生成的Fsimage.ckpt文件复制到NameNode节点。
- 在NameNode节点的edits.new文件和Fsimage.ckpt文件会替换掉原来的edits文件和fsimage文件,至此刚好是一个轮回,即在NameNode中又是 edits和fsimage文件。
- 等待下一次checkpoint触发SecondaryNameNode进行工作,一直这样循环操作。
Checkpoint触发条件
Checkpoint操作受两个参数控制,可以通过hdfs-site.xml进行配置:
从上面的描述我们可以看出,SecondaryNamenode根本就不是Namenode的一个热备,其只是将fsimage和edits合并。其拥有的fsimage不是最新的, 因为在他从NameNode下载fsimage和edits文件时候,新的更新操作已经写到edit.new文件中去了。而这些更新在SecondaryNamenode是没有同步到的!当 然,如果NameNode中的fsimage真的出问题了,还是可以用SecondaryNamenode中的fsimage替换一下NameNode上的fsimage, 虽然已经不是最新的fsimage,但是我们可以将损失减小到最少!
spark streaming checkpoint原理
- spark streaming 中对于一些 有状态的操作, 这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链, 必须隔一段时间进行一次进行一次 checkpoint。
cache 和 checkpoint 的区别
cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来然后放在内存中, 但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过 依赖链重放计算出来, 不同的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。
注意:
因为checkpoint是需要把 job 重新从头算一遍, 最好先cache一下, checkpoint就可以直接保存缓存中的 RDD 了, 就不需要重头计算一遍了, 对性能有极大的提升。
checkpoint 的正确使用姿势
使用很简单, 就是设置一下 checkpoint 目录,然后再rdd上调用 checkpoint 方法, action 的时候就对数据进行了 checkpoint。
checkpoint 写流程
RDD checkpoint 过程中会经过以下几个状态,
[ Initialized –> marked for checkpointing –> checkpointing in progress –> checkpointed ]
我们看下状态转换流程图
- data.checkpoint 这个函数调用中, 设置的目录中, 所有依赖的 RDD 都会被删除, 函数必须在 job 运行之前调用执行, 强烈建议 RDD 缓存 在内存中(又提到一次,千万要注意哟), 否则保存到文件的时候需要从头计算。初始化RDD的 checkpointData 变量为 ReliableRDDCheckpointData。 这时候标记为 Initialized 状态,
- 在所有 job action 的时候, runJob 方法中都会调用 rdd.doCheckpoint , 这个会向前递归调用所有的依赖的RDD, 看看需不需要 checkpoint 。 需要需要 checkpoint, 然后调用 checkpointData.get.checkpoint(), 里面标记 状态为 CheckpointingInProgress, 里面调用具体实现类的 ReliableRDDCheckpointData 的 doCheckpoint 方法,
- doCheckpoint -> writeRDDToCheckpointDirectory, 注意这里会把 job 再运行一次, 如果已经cache 了,就可以直接使用缓存中的 RDD 了, 就不需要重头计算一遍了(怎么又说了一遍), 这时候直接把RDD, 输出到 hdfs, 每个分区一个文件, 会先写到一个临时文件, 如果全部输出完,进行 rename , 如果输出失败,就回滚delete。
- 标记 状态为 Checkpointed, markCheckpointed 方法中清除所有的依赖, 怎么清除依赖的呢, 就是 吧RDD 变量的强引用 设置为 null, 垃圾回收了, 这个后面我们也知道,会触发 ContextCleaner 里面监听清除实际 BlockManager 缓存中的数据。
checkpoint 读流程
- 如果一个RDD 我们已经 checkpoint了,那么是什么时候用呢? checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用。 比如 spark streaming 挂掉了, 重启后就可以使用之前 checkpoint 的数据进行 recover ,当然在同一个 driver program 也可以使用。
- 如果 一个 RDD 被checkpoint了, 如果这个 RDD 上有 action 操作时候,或者回溯的这个 RDD 的时候,这个 RDD 进行计算的时候,里面判断如果已经 checkpoint 过, 对分区和依赖的处理都是使用的 RDD 内部的 checkpointRDD 变量。
具体细节如下:
如果 一个 RDD 被checkpoint了, 那么这个 RDD 中对分区和依赖的处理都是使用的 RDD 内部的 checkpointRDD 变量, 具体实现是 ReliableCheckpointRDD 类型。 这个是在 checkpoint 写流程中创建的。依赖和获取分区方法中先判断是否已经checkpoint, 如果已经checkpoint了, 就斩断依赖, 使用ReliableCheckpointRDD, 来处理依赖和获取分区。
如果没有,才往前回溯依赖。 依赖就是没有依赖, 因为已经斩断了依赖, 获取分区数据就是读取 checkpoint 到 hdfs目录中不同分区保存下来的文件。
flink checkpoint 原理
一次 Flink Checkpoint 的流程是从 CheckpointCoordinator 的 triggerCheckpoint 方法开始,下面来看看一次 Flink Checkpoint 涉及到的主要内容:
- Checkpoint 开始之前先进行预检查,比如检查最大并发的 Checkpoint 数,最小的 Checkpoint 之间的时间间隔。默认情况下,最大并发的 Checkpoint 数为 1,最小的 Checkpoint 之间的时间间隔为 0.
- 判断所有 Source 算子的 Subtask (Execution) 是否都处于运行状态,有则直接报错。同时检查所有待确认的算子的 SubTask(Execution)是否是运行状态,有则直接报错。
- 创建 PendingCheckpoint,同时为该次 Checkpoint 创建一个 Runnable,即超时取消线程,默认 Checkpoint 十分钟超时。
- 循环遍历所有 Source 算子的 Subtask(Execution),最底层调用 Task 的triggerCheckpointBarrier, 广播 CheckBarrier 到下游 ,同时 Checkpoint 其状态。
- 下游的输入中有 CheckpointBarrierHandler 类来处理 CheckpoinBarrier,然后会调用 notifyCheckpoint 方法,通知 Operator SubTask 进行 Checkpoint。
- 每当 Operator SubTask 完成 Checkpoint 时,都会向 CheckpointCoordoritor 发送确认消息。CheckpointCoordinator 的 receiveAcknowledgeMessage 方法会进行处理。
- 在一次 Checkpoint 过程中,当所有从 Source 端到 Sink 端的算子 SubTask 都完成之后,CheckpointCoordoritor 会通知算子进行 notifyCheckpointCompleted 方法,前提是算子的函数实现 CheckpointListener 接口。
Flink 会定时在任务的 Source 算子的 SubTask 触发 CheckpointBarrier,CheckpointBarrier 是一种特殊的消息事件,会随着消息通道流入到下游的算子中。只有当最后 Sink 端的算子接收到 CheckpointBarrier 并确认该次 Checkpoint 完成时,该次 Checkpoint 才算完成。所以在某些算子的 Task 有多个输入时,会存在 Barrier 对齐时间,我们可以在 Flink Web UI上面看到各个 Task 的 CheckpointBarrier 对齐时间。
下图是一次 Flink Checkpoint 实例流程示意图:
这篇关于大数据面试-20210308:hdfs ,Spark streaming, Flink三者中的checkpoint原理 hdfs checkpoint原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!