Flink CheckPoint奇技淫巧 | 原理和在生产中的应用

2024-09-06 21:38

本文主要是介绍Flink CheckPoint奇技淫巧 | 原理和在生产中的应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的Checkpoint机制。Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamport algorithm”算法 (分布式快照算法)。

Checkpoint的执行流程

每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

file

  • CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier;
  • 当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理;
  • 下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理;
  • 每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。
  • 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败 ;

Checkpoint常用设置

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);// advanced options:// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

  • 使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用于指定checkpoint的触发间隔(单位milliseconds),而CheckpointingMode默认是CheckpointingMode.EXACTLYONCE,也可以指定为CheckpointingMode.ATLEAST_ONCE
  • 也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.ATLEASTONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以
  • checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
  • minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
  • maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
  • enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETEONCANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAINONCANCELLATION则在job canceled的时候会保留externalized checkpoint state
  • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行

flink-conf.yaml相关配置

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false

  • state.backend用于指定checkpoint state存储的backend,默认为none
  • state.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持async的state backend可能会忽略这个参数
  • state.backend.fs.memory-threshold,默认为1024,用于指定存储于files的state大小阈值,如果小于该值则会存储在root checkpoint metadata file
  • state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置
  • state.backend.local-recovery,默认为false
  • state.checkpoints.dir,默认为none,用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见
  • state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数
  • state.savepoints.dir,默认为none,用于指定savepoints的默认目录
  • taskmanager.state.local.root-dirs,默认为none

增量式的检查点- Checkpoint设置的奇技淫巧

增量式检查点

Flink的检查点是一个全局的、异步的程序快照,它周期性的生成并送到持久化存储(一般使用分布式系统)。当发生故障时,Flink使用最新的检查点进行重启。一些Flink的用户在程序“状态”中保存了GB甚至TB的数据。这些用户反馈在大量 的状态下,创建检查点通常很慢并且耗资源,这也是为什么Flink在 1.3版本开始引入“增量式的检查点”。

在引入“增量式的检查点”之前,每一个Flink的检查点都保存了程序完整的状态。后来我们意识到在大部分情况下这是不必要的,因为上一次和这次的检查点之前 ,状态发生了很大的变化,所以我们创建了“增量式的检查点”。增量式的检查点仅保存过去和现在状态的差异部分。

增量式的检查点可以为拥有大量状态的程序带来很大的提升。在早期的测试中,一个拥有TB级别“状态”程序将生成检查点的耗时从3分钟以上降低 到了30秒左右。因为增量式的检查点不需要每次把完整的状态发送到存储中。

现在只能通过RocksDB state back-end来获取增量式检查点的功能,Flink使用RocksDB内置的备份机制来合并检查点数据。这样Flink增量式检查点的数据不会无限制的增大,它会自动合并老的检查点数据并清理掉。

要启用这个机制,可以如下设置:RocksDBStateBackend backend =new RocksDBStateBackend(filebackend, true);

增量式检查点如何工作

Flink 增量式的检查点以“RocksDB”为基础,RocksDB是一个基于 LSM树的KV存储,新的数据保存在内存中,称为memtable。如果Key相同,后到的数据将覆盖之前的数据,一旦memtable写满了,RocksDB将数据压缩并写入到磁盘。memtable的数据持久化到磁盘后,他们就变成了不可变的sstable。

RocksDB会在后台执行compaction,合并sstable并删除其中重复的数据。之后RocksDB删除原来的sstable,替换成新合成的ssttable,这个sstable包含了之前的sstable中的信息。

在这个基础之上,Flink跟踪前一个checkpoint创建和删除的RocksDB sstable文件,因为sstable是不可变的,Flink可以因此计算出 状态有哪些改变。为了达到这个目标,Flink在RocksDB上触发了一个刷新操作,强制将memtable刷新到磁盘上。这个操作在Flink中是同步的,其他的操作是异步的,不会阻塞数据处理。

Flink 的checkpoint会将新的sstable发送到持久化存储(例如HDFS,S3)中,同时保留引用。Flink不会发送所有的sstable, 一些数据在之前的checkpoint存在并且写入到持久化存储中了,这样只需要增加引用次数就可以了。因为compaction的作用,一些sstable会合并成一个sstable并删除这些sstable,这也是为什么Flink可以减少checkpoint的历史文件。

为了分析checkpoint的数据变更,而上传整理过的sstable是多余的(这里的意思是之前已经上传过的,不需要再次上传)。Flink处理这种情况,仅带来一点点开销。这个过程很重要,因为在任务需要重启的时候,Flink只需要保留较少的历史文件。

file

假设有一个子任务,拥有一个keyed state的operator,checkpoint最多保留2个。上面的图片描述了每个checkpoint对应的RocksDB 的状态,它引用到的文件,以及在checkpoint完成后共享状态中的count值。

checkpoint ‘CP2’,本地的RocksDB目录有两个sstable文件,这些文件是新生成的,于是Flink将它们传到了checkpoint 对应的存储目录。当checkpoint完成后,Flink在共享状态中创建两个实体,并将count设为1。在这个共享状态中,这个key 由operator、subtask,原始的sstable名字组成,value为sstable实际存储目录。

checkpoint‘CP2’,RocksDB有2个老的sstable文件,又创建了2个新的sstable文件。Flink将这两个新的sstable传到 持久化存储中,然后引用他们。当checkpoint完成后,Flink将所有的引用的相应计数加1。

checkpoint‘CP3’,RocksDB的compaction将sstable-(1), sstable-(2), sstable-(3) 合并成 sstable-(1,2,3),然后删除 原始的sstable。这个合并后的文件包含了和之前源文件一样的信息,并且清理掉了重复的部分。sstable-(4)还保留着,然后有一个 新生成的sstable-(5)。Flink将新的 sstable-(1,2,3)以及 sstable-(5)传到持久化存储中, sstable-(4)仍被‘CP2’引用,所以 将计数增加1。现在有了3个checkpoint,'CP1','CP2','CP3',超过了预设的保留数目2,所以CP1被删除。作为删除的一部分, CP1对应的文件(sstable-(1)、sstable-(2)) 的引用计数减1。

checkpoint‘CP4’,RocksDB将sstable-(4), sstable-(5), 新的 sstable-(6) 合并成 sstable-(4,5,6)。Flink将新合并 的 sstable-(4,5,6)发送到持久化存储中,sstable-(1,2,3)、sstable-(4,5,6) 的引用计数增加1。由于再次到达了checkpoint的 保留数目,‘CP2’将被删除,‘CP2’对应的文件(sstable-(1)、sstable-(2)、sstable(3) )的引用计数减1。由于‘CP2’对应 的文件的引用计数达到0,这些文件将被删除。

需要注意的地方

如果使用增量式的checkpoint,那么在错误恢复的时候,不需要考虑很多的配置项。一旦发生了错误,Flink的JobManager会告诉 task需要从最新的checkpoint中恢复,它可以是全量的或者是增量的。之后TaskManager从分布式系统中下载checkpoint文件, 然后从中恢复状态。

增量式的checkpoint能为拥有大量状态的程序带来较大的提升,但还有一些trade-off需要考虑。总的来说,增量式减少了checkpoint操作的时间,但是相对的,从checkpoint中恢复可能更耗时,具体情况需要根据应用程序包含的状态大小而定。相对的,如果程序只是部分失败,Flink TaskManager需要从多个checkpoint中读取数据,这时候使用全量的checkpoint来恢复数据可能更加耗时。同时,由于新的checkpoint可能引用到老的checkpoint,这样老的checkpoint就不能被删除,这样下去,历史的版本数据会越来越大。需要考虑使用分布式来存储checkpoint,另外还需要考虑读取带来的带宽消耗。

声明:本号所有文章除特殊注明,都为原创,公众号读者拥有优先阅读权,未经作者本人允许不得转载,否则追究侵权责任。

关注我的公众号,后台回复【JAVAPDF】获取200页面试题!5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

大数据技术与架构

备注:所有内容首发公众号,这里不保证实时性和完整性,大家扫描文末二维码关注哦~

这篇关于Flink CheckPoint奇技淫巧 | 原理和在生产中的应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143191

相关文章

mysql_mcp_server部署及应用实践案例

《mysql_mcp_server部署及应用实践案例》文章介绍了在CentOS7.5环境下部署MySQL_mcp_server的步骤,包括服务安装、配置和启动,还提供了一个基于Dify工作流的应用案例... 目录mysql_mcp_server部署及应用案例1. 服务安装1.1. 下载源码1.2. 创建独立

Java线程池核心参数原理及使用指南

《Java线程池核心参数原理及使用指南》本文详细介绍了Java线程池的基本概念、核心类、核心参数、工作原理、常见类型以及最佳实践,通过理解每个参数的含义和工作原理,可以更好地配置线程池,提高系统性能,... 目录一、线程池概述1.1 什么是线程池1.2 线程池的优势二、线程池核心类三、ThreadPoolE

Spring Boot Interceptor的原理、配置、顺序控制及与Filter的关键区别对比分析

《SpringBootInterceptor的原理、配置、顺序控制及与Filter的关键区别对比分析》本文主要介绍了SpringBoot中的拦截器(Interceptor)及其与过滤器(Filt... 目录前言一、核心功能二、拦截器的实现2.1 定义自定义拦截器2.2 注册拦截器三、多拦截器的执行顺序四、过

Java 队列Queue从原理到实战指南

《Java队列Queue从原理到实战指南》本文介绍了Java中队列(Queue)的底层实现、常见方法及其区别,通过LinkedList和ArrayDeque的实现,以及循环队列的概念,展示了如何高效... 目录一、队列的认识队列的底层与集合框架常见的队列方法插入元素方法对比(add和offer)移除元素方法

SQL 注入攻击(SQL Injection)原理、利用方式与防御策略深度解析

《SQL注入攻击(SQLInjection)原理、利用方式与防御策略深度解析》本文将从SQL注入的基本原理、攻击方式、常见利用手法,到企业级防御方案进行全面讲解,以帮助开发者和安全人员更系统地理解... 目录一、前言二、SQL 注入攻击的基本概念三、SQL 注入常见类型分析1. 基于错误回显的注入(Erro

Spring IOC核心原理详解与运用实战教程

《SpringIOC核心原理详解与运用实战教程》本文详细解析了SpringIOC容器的核心原理,包括BeanFactory体系、依赖注入机制、循环依赖解决和三级缓存机制,同时,介绍了SpringBo... 目录1. Spring IOC核心原理深度解析1.1 BeanFactory体系与内部结构1.1.1

Nginx内置变量应用场景分析

《Nginx内置变量应用场景分析》Nginx内置变量速查表,涵盖请求URI、客户端信息、服务器信息、文件路径、响应与性能等类别,这篇文章给大家介绍Nginx内置变量应用场景分析,感兴趣的朋友跟随小编一... 目录1. Nginx 内置变量速查表2. 核心变量详解与应用场景3. 实际应用举例4. 注意事项Ng

MySQL 批量插入的原理和实战方法(快速提升大数据导入效率)

《MySQL批量插入的原理和实战方法(快速提升大数据导入效率)》在日常开发中,我们经常需要将大量数据批量插入到MySQL数据库中,本文将介绍批量插入的原理、实现方法,并结合Python和PyMySQ... 目录一、批量插入的优势二、mysql 表的创建示例三、python 实现批量插入1. 安装 PyMyS

Java中的随机数生成案例从范围字符串到动态区间应用

《Java中的随机数生成案例从范围字符串到动态区间应用》本文介绍了在Java中生成随机数的多种方法,并通过两个案例解析如何根据业务需求生成特定范围的随机数,本文通过两个实际案例详细介绍如何在java中... 目录Java中的随机数生成:从范围字符串到动态区间应用引言目录1. Java中的随机数生成基础基本随

深入理解Redis线程模型的原理及使用

《深入理解Redis线程模型的原理及使用》Redis的线程模型整体还是多线程的,只是后台执行指令的核心线程是单线程的,整个线程模型可以理解为还是以单线程为主,基于这种单线程为主的线程模型,不同客户端的... 目录1 Redis是单线程www.chinasem.cn还是多线程2 Redis如何保证指令原子性2.