Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

本文主要是介绍Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

1 何为状态

  • 在批处理过程中,数据是划分为块分片去完成的,然后每一个Task去处理一个分片。当分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于state的需求还是比较小的。

  • 在流计算过程中,对State有非常高的要求,因为在流系统中输入是一个无限制的流,会持续运行从不间断。在这个过程当中,就需要将状态数据很好的管理起来。

  • Flink的失败恢复依赖于“检查点机制+可部分重发的数据源”。

  • 检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。

  • 可部分重发的数据源:Flink选择最近完成的检查点K。然后系统重放整个分布式的数据流,然后给予每个operator他们在检查点k快照中的状态。数据源被设置为从位置Sk开始重新读取流。例如在Apache Kafka中,那意味着告诉消费者从偏移量Sk开始重新消费。

  • Flink中有两种基本类型的State,即Keyed State和Operator State。

  • State可以被记录,在失败的情况下数据还可以恢复

一句话的事儿:state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】

2 检查点Checkpoint 与Barrier

一句话的事儿: checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态

为了保证state的容错性,Flink需要对state进行checkpoint。

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提是:
持久化的source(如kafka),它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。

  • 单流的barrier:

    1: 屏障作为数据流的一部分随着记录被注入到数据流中。屏障永远不会赶超通常的流记录,它会严格遵循顺序。

    2: 屏障将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。

    3: 每一个屏障携带着快照的ID,快照记录着ID并且将其放在快照数据的前面。

    4: 屏障不会中断流处理,因此非常轻量级。

  • 并行barrier

    1:不止一个输入流的时的operator,需要在快照屏障上对齐(align)输入流,才会发射出去。
    2:可以看到1,2,3会一直放在Input buffer,直到另一个输入流的快照到达Operator。

3 有状态的Operator工作一览图

Stateful Flink applications are optimized for local state access. Task state 
is always maintained in memory or, if the state size exceeds the available memory,
in access-efficient on-disk data structures. Hence, tasks perform all computations 
by accessing local, often in-memory, state yielding very low processing latencies.
Flink guarantees exactly-once state consistency in case of failures by periodically 
and asynchronously checkpointing the local state to durable storage.

4 状态管理

4.1 原始状态与托管状态

Keyed State和Operator State,可以以两种形式存在:

  • 原始状态(raw state)

  • 托管状态(managed state)

  • 托管状态是由Flink框架管理的状态

  • 原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

  • 通常在DataStream上的状态推荐使用托管的状态。

  • 当实现一个用户自定义的operator时,会使用到原始状态

4.2 State-Keyed State 是什么?直接上干货。(兄弟 State-Operator State 与key无关)

  • 顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。
    stream.keyBy(…)

  • state的数据结构;

    (1) ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值

    (2) ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值

    (3) ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值

    (4) MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素

  • 需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。实际上:这些状态有三种存储方式:

      MemoryStateBackend:FsStateBackendRockDBStateBackend
    

4.3 State-Keyed State 存储方式?直接上干货

  • MemoryStateBackend

    state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中
    基于内存的state backend在生产环境下不建议使用。

  • FsStateBackend

    state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中,可以使用hdfs等分布式文件系统。

  • RocksDBStateBackend

    RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时RocksDB需要配置一个远端的filesystem。

    uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。

    RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

4.4 State 生成快照

4.5 State 快照恢复

5 与Key相关的状态管理案例实战(以Key分组进行状态管理)

5.1 RichFlatMapFunction 核心代码奉上

package xuwei.tech.streaming;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** qinkaixin 2018 11 24 */
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), Tuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}
}

5.2 RichFlatMapFunction 执行操作

public static void main(String[] args) throws Exception{//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(0).flatMap(new CountWindowAverage()).print();env.execute("StafulOperator");System.out.println("***********");
}

5.3 最终结果为什么是这样的?

  • if the count reaches 2, emit the average and clear the state
  • 所以Tuple2.of(1L, 3L), Tuple2.of(1L, 5L) 一组
  • 所以Tuple2.of(1L, 7L),Tuple2.of(1L, 4L)一组

6 与Operator相关的State案例实战

  • 与Key无关的State,与Operator绑定的state,整个operator只对应一个state

  • 保存Operator state的数据结构为ListState

  • 举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射

  • 继承CheckpointedFunction,实现snapshotState和restoreState。

      To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface.Whenever a checkpoint has to be performed, snapshotState() is called. The counterpart,initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actuallyrecovering from an earlier checkpoint. Given this,initializeState() is not only the place where different types of state areinitialized, but also where state recoverylogic is included.
    

6.1 BufferingSink案例

    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction {private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}
}

6.2 Stateful Source案例

    public static class CounterSource  extends RichParallelSourceFunction<Long>implements ListCheckpointed<Long> {/**  current offset for exactly once semantics */private Long offset;/** flag for job cancellation */private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {return Collections.singletonList(offset);}@Overridepublic void restoreState(List<Long> state) {for (Long s : state)offset = s;}
}

7 checkPoint的配置进一步升华

7.1 checkpoint 开关

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
  • checkpoint开启之后,默认的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有两种,Exactly-once和At-least-once
  • Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)

7.2 checkpoint 调优配置(Cancel处理很有意思)

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 高级选项:// 设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);cancel处理选项:(1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint(2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
    

8 State Backend 状态的后端存储(一剑封喉)

8.1 配置说明

修改State Backend的两种方式

  • 第一种:单任务调整

      修改当前任务代码env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));或者new MemoryStateBackend()或者new RocksDBStateBackend( hdfs->url, true);【需要添加第三方依赖】
    
  • 第二种:全局调整

      修改flink-conf.yamlstate.backend: filesystemstate.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
    

8.2 精彩案例实战

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class SocketWindowWordCountJavaCheckPoint {public static void main(String[] args) throws Exception{//获取需要的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9010;}//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 高级选项:// 设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend//env.setStateBackend(new MemoryStateBackend());//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String hostname = "SparkMaster";String delimiter = "\n";//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒.sum("count");//在这里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把数据打印到控制台并且设置并行度windowCounts.print().setParallelism(1);//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public  WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "作者 : 秦凯新 , 窗大小2秒,滑动1秒       {" +" word='" + word + '\'' +", count=" + count +'}';}}
}

8.3 精彩案例结果

9 华山论剑结束

这里围绕状态管理进行了详细的说明。一篇好文不容易,请发表你的评论,给予作者以肯定,谢谢。后续更精彩!

10 结语

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

秦凯新 于深圳 20182136

这篇关于Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/441262

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

java中反射(Reflection)机制举例详解

《java中反射(Reflection)机制举例详解》Java中的反射机制是指Java程序在运行期间可以获取到一个对象的全部信息,:本文主要介绍java中反射(Reflection)机制的相关资料... 目录一、什么是反射?二、反射的用途三、获取Class对象四、Class类型的对象使用场景1五、Class

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密