本文主要是介绍Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
点击上方蓝色字体,选择“设为星标”
回复”面试“获取更多惊喜
在阅读本文之前,你应该阅读过的系列:
《Flink重点难点:时间、窗口和流Join》
《Flink重点难点:网络流控和反压》
《Flink重点难点:维表关联理论和Join实战》
《Flink重点难点:内存模型与内存结构》
《Flink重点难点:Flink Table&SQL必知必会(一)》
Flink重点难点:Flink Table&SQL必知必会(二)
状态与容错
在 Flink 的框架中,进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。
本文首发公众号:import_bigdata
状态
我们在 Flink 的官方博客中找到这样一段话,可以认为这是对状态的定义:
When working with state, it might also be useful to read about Flink’s state backends. Flink provides different state backends that specify how and where state is stored. State can be located on Java’s heap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic.
这段话告诉我们,所谓的状态指的是,在流处理过程中那些需要记住的数据,而这些数据既可以包括业务数据,也可以包括元数据。Flink 本身提供了不同的状态管理器来管理状态,并且这个状态可以非常大。
Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储,例如 Flink 已经实现的对 RocksDB 支持。Flink 的官网同样给出了适用于状态计算的几种情况:
When an application searches for certain event patterns, the state will store the sequence of events encountered so far
When aggregating events per minute/hour/day, the state holds the pending aggregates
When training a machine learning model over a stream of data points, the state holds the current version of the model parameters
When historic data needs to be managed, the state allows efficient access to events that occurred in the past
以上四种情况分别是:复杂事件处理获取符合某一特定时间规则的事件、聚合计算、机器学习的模型训练、使用历史的数据进行计算。
Flink 状态分类和使用
我们在之前的课时中提到过 KeyedStream 的概念,并且介绍过 KeyBy 这个算子的使用。在 Flink 中,根据数据集是否按照某一个 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-Keyed State)两种类型。
如上图所示,Keyed State 是经过分区后的流上状态,每个 Key 都有自己的状态,图中的八边形、圆形和三角形分别管理各自的状态,并且只有指定的 key 才能访问和更新自己对应的状态。
与 Keyed State 不同的是,Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。每个算子子任务上的数据共享自己的状态。
但是有一点需要说明的是,无论是 Keyed State 还是 Operator State,Flink 的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。
我们可以看一下 State 的类图,对于 Keyed State,Flink 提供了几种现成的数据结构供我们使用,State 主要有四种实现,分别为 ValueState、MapState、AppendingState 和 ReadOnlyBrodcastState ,其中 AppendingState 又可以细分为 ReducingState、AggregatingState 和 ListState。
那么我们怎么访问这些状态呢?Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,类图如下:
下面演示一下如何使用 StateDesciptor 和 ValueState,代码如下:
public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 5L), Tuple2.of(1L, 2L)).keyBy(0).flatMap(new CountWindowAverage()).printToErr();env.execute("submit job");}public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {private transient ValueState<Tuple2<Long, Long>> sum;public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {Tuple2<Long, Long> currentSum;// 访问ValueStateif(sum.value()==null){currentSum = Tuple2.of(0L, 0L);}else {currentSum = sum.value();}// 更新currentSum.f0 += 1;// 第二个元素加1currentSum.f1 += input.f1;// 更新statesum.update(currentSum);// 如果count的值大于等于2,求知道并清空stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}public void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // state的名字TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // 设置默认值StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();descriptor.enableTimeToLive(ttlConfig);sum = getRuntimeContext().getState(descriptor);}
}
在上述例子中,我们通过继承 RichFlatMapFunction 来访问 State,通过 getRuntimeContext().getState(descriptor) 来获取状态的句柄。而真正的访问和更新状态则在 Map 函数中实现。
我们这里的输出条件为,每当第一个元素的和达到二,就把第二个元素的和与第一个元素的和相除,最后输出。我们直接运行,在控制台可以看到结果:
Operator State 的实际应用场景不如 Keyed State 多,一般来说它会被用在 Source 或 Sink 等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 Exactly-Once 语义。
同样,我们对于任何状态数据还可以设置它们的过期时间。如果一个状态设置了 TTL,并且已经过期,那么我们之前保存的值就会被清理。
想要使用 TTL,我们需要首先构建一个 StateTtlConfig 配置对象;然后,可以通过传递配置在任何状态描述符中启用 TTL 功能。
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();descriptor.enableTimeToLive(ttlConfig);
StateTtlConfig 这个类中有一些配置需要我们注意:
UpdateType 表明了过期时间什么时候更新,而对于那些过期的状态,是否还能被访问则取决于 StateVisibility 的配置。
状态后端种类和配置
我们在上面的内容中讲到了 Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储。默认情况下,Flink 的状态会保存在 taskmanager 的内存中,Flink 提供了三种可用的状态后端用于在不同情况下进行状态后端的保存。
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
MemoryStateBackend
顾名思义,MemoryStateBackend 将 state 数据存储在内存中,一般用来进行本地调试用,我们在使用 MemoryStateBackend 时需要注意的一些点包括:
每个独立的状态(state)默认限制大小为 5MB,可以通过构造函数增加容量,状态的大小不能超过akka的Framesize大小,聚合后的状态必须能够放进JobManager的内存中.
MemoryStateBackend 可以通过在代码中显示指定:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false));
其中,new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false) 中的 false 代表关闭异步快照机制。关于快照,我们在后面的课时中有单独介绍。
很明显 MemoryStateBackend 适用于我们本地调试使用,来记录一些状态很小的 Job 状态信息。
FsStateBackend
FsStateBackend 会把状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中,少量的元数据信息存储到 JobManager 的内存中。
使用 FsStateBackend 需要我们指定一个文件路径,一般来说是 HDFS 的路径,例如,hdfs://namenode:40010/flink/checkpoints
。
我们同样可以在代码中显示指定:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints", false));
FsStateBackend 因为将状态存储在了外部系统如 HDFS 中,所以它适用于大作业、状态较大、全局高可用的那些任务。
RocksDBStateBackend
RocksDBStateBackend 和 FsStateBackend 有一些类似,首先它们都需要一个外部文件存储路径,比如 HDFS 的 hdfs://namenode:40010/flink/checkpoints,此外也适用于大作业、状态较大、全局高可用的那些任务。
但是与 FsStateBackend 不同的是,RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 运行节点的数据目录下。
这意味着,RocksDBStateBackend 可以存储远超过 FsStateBackend 的状态,可以避免向 FsStateBackend 那样一旦出现状态暴增会导致 OOM,但是因为将状态数据保存在 RocksDB 数据库中,吞吐量会有所下降。
此外,需要注意的是,RocksDBStateBackend 是唯一支持增量快照的状态后端。
Checkpoints(检查点)
本文首发公众号:import_bigdata
Flink中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制,Checkpoints可以将同一时间点作业/算子的状态数据全局统一快照处理,包括前面提到的算子状态和键值分区状态。当发生了故障后,Flink会将所有任务的状态恢复至最后一次Checkpoint中的状态,并从那里重新开始执行。
那么Checkpoints的生成策略是什么样的呢?它会在什么时候进行快照的生成呢?
其实就是在所有任务都处理完同一个输入数据流的时候,这时就会对当前全部任务的状态进行一个拷贝,生成Checkpoints。
为了方便理解,这里先简单的用一个朴素算法来解释这一生成过程(Flink的Checkpoints算法实际要更加复杂,在下面会详细讲解)
暂停接受所有输入流。
等待已经流入系统的数据被完全处理,即所有任务已经处理完所有的输入数据。
将所有任务的状态拷贝到远程持久化,生成Checkpoints。在所有任务完成自己的拷贝工作后,Checkpoints生成完毕。
恢复所有数据流的接收。
恢复流程
为了方便进行实例的讲解,假设当前有一个Source任务,负责从一个递增的数字流(1、2、3、4……)中读取数据,读取到的数据会分为奇数流和偶数流,求和算子的两个任务会分别对它们进行求和。在当前任务中,数据源算子的任务会将输入流的当前偏移量存为状态,求和算子的任务会将当前和存为状态。
如上图,在当前生成的Checkpoints中保存的输入偏移为5,偶数求和为6,奇数求和为9。
假设在下一轮计算中,任务sum_odd计算出现了问题,任务sum_odd的时候产生了问题,导致结果出现错误。由于出现问题,为了防止从头开始重复计算,此时会通过Checkpoints来进行快照的恢复。
Checkpoints恢复应用需要以下三个步骤:
重启整个应用
利用最新的检查点重置任务状态
恢复所有任务的处理
第一步我们需要先重启整个应用,恢复到最原始的状态。
紧接着从检查点的快照信息中读取出输入源的偏移量以及算子计算的结果,进行状态的恢复。
状态恢复完成后,继续Checkpoints恢复的位置开始继续处理。
从检查点恢复后,它的内部状态会和生成检查点的时候完全一致,并且会紧接着重新处理那些从之前检查点完成开始,到发生系统故障之间已经处理过的数据。虽然这意味着Flink会重复处理部分消息,但上述机制仍然可以实现精确一次的状态一致性,因为所有的算子都会恢复到那些数据处理之前的时间点。
但这个机制仍然面临一些问题,因为Checkpoints和恢复机制仅能重置应用内部的状态,而应用所使用的Sink可能在恢复期间将结果向下游系统(如事件日志系统、文件系统或数据库)重复发送多次。为了解决这个问题,对于某些存储系统,Flink提供的Sink函数支持精确一次输出(在检查点完成后才会把写出的记录正式提交)。另一种方法则是适用于大多数存储系统的幂等更新。
生成策略
Flink中的Checkpoints是基于Chandy-Lamport分布式快照算法实现的,该算法不会暂停整个应用,而是会将生成Checkpoints的过程和处理过程分离,这样在部分任务持久化状态的过程中,其他任务还可以继续执行。
在介绍生成策略之前,首先需要介绍一下Checkpoints barrier(屏障)
这一种特殊记录。
如上图,与水位线相同,Flink会在Source中间隔性地生成barrier,通过barrier把一条流上的数据划分到不同的Checkpoints中,在barrier之前到来的数据导致的状态更改,都会被包含在当前所属的Checkpoints中;而基于barrier之后的数据导致的所有更改,就会被包含在之后的Checkpoints中。
假设当前有两个Source任务,各自消费一个递增的数字流(1、2、3、4……),读取到的数据会分为奇数流和偶数流,求和算子的两个任务会分别对它们进行求和,并将结果值更新至下游Sink。
此时JobManager向每一个Source任务发送一个新的Checkpoints编号,以此启动Checkpoints生成流程。
在Source任务收到消息后,会暂停发出记录,紧接着利用状态后端生成本地状态的Checkpoints,并把barrier连同编号广播给所有传出的数据流分区。
状态后端在状态存入Checkpoints后通知Source任务,并向JobManager发送确认消息。
在所有barrier发出后,Source将恢复正常工作。
Source任务会广播barrier至所有与之相连的任务,确保这些任务能从它们的每个输入都收到一个barrier
在等待过程中,对于barrier未到达的分区,数据会继续正常处理。而barrier已经到达的分区,它们新到来的记录会被缓冲起来,不能处理。这个等待所有barrier到来的过程被称为barrier对齐
任务中收齐全部输入分区发送的barrier后,就会通知状态后端开始生成Checkpoints,同时继续把Checkpoints barrier广播转发到下游相连的任务。
任务在发出所有的Checkpoints barrier后就会开始处理缓冲的记录。等到所有缓冲记录处理完后,任务就会继续处理Source。
Sink任务向JobManager确认收到Checkpoints barrier,在所有任务成功将自身状态存入Checkpoints后整个应用的Checkpoints才算完成。
Sink任务在收到分隔符后会依次进行barrier对齐,然后将自身状态写入Checkpoints,最终向JobManager发送确认信息。
JobManager在接收到所有任务返回的Checkpoints确认信息后,就说明此次Checkpoints生成结束。
本文首发公众号:import_bigdata
Savepoints(保存点)
由于Cheakpoints是周期性自动生成的,但有些时候我们需要手动的去进行镜像保存功能,于是Flink同时还为我们提供了Savepoints来完成这个功能,Savepoints不仅可以做到故障恢复,还可以用于手动备份、版本迁移、暂停或重启应用等。
Savepoints是Checkpoints的一种特殊实现,底层也是使用Checkpoint机制,因此Savepoints可以认为是具有一些额外元数据的Checkpoints。
Savepoints的生成和清理都无法由Flink自动进行,因此都需要用户自己来显式触发。
总结一下Checkpoint
和Savepoint
的区别和联系:
checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。
checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。
checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
两阶段提交
本文首发公众号:import_bigdata
假设一种场景,从Kafka Source拉取数据,经过一次窗口聚合,最后将数据发送到Kafka Sink,如下图:
JobManager向Source发送Barrier,开始进入pre-Commit阶段,当只有内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些已定义的状态变量即可。当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们。
当Source收到Barrier后,将自身的状态进行保存,后端可以根据配置进行选择,这里的状态是指消费的每个分区对应的offset。然后将Barrier发送给下一个Operator。
当Window这个Operator收到Barrier之后,对自己的状态进行保存,这里的状态是指聚合的结果(sum或count的结果),然后将Barrier发送给Sink。Sink收到后也对自己的状态进行保存,之后会进行一次预提交。
预提交成功后,JobManager通知每个Operator,这一轮检查点已经完成,这个时候,会进行第二次Commit。
以上便是两阶段的完整流程,提交过程中如果失败有以下几种情况:
pre-commit
失败,将恢复到最近一次CheckPoint位置一旦
pre-commit
完成,必须要确保commit也要成功
因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。
八千里路云和月 | 从零到大数据专家学习路径指南
我们在学习Flink的时候,到底在学习什么?
193篇文章暴揍Flink,这个合集你需要关注一下
Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
我们在学习Spark的时候,到底在学习什么?
在所有Spark模块中,我愿称SparkSQL为最强!
硬刚Hive | 4万字基础调优面试小总结
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解析
【面试&个人成长】2021年过半,社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么?「硬刚Hive续集」
你好,我是王知无,一个大数据领域的硬核原创作者。
做过后端架构、数据中间件、数据平台&架构、算法工程化。
专注大数据领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。
这篇关于Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!