Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

本文主要是介绍Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

1 何为状态

  • 在批处理过程中,数据是划分为块分片去完成的,然后每一个Task去处理一个分片。当分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于state的需求还是比较小的。

  • 在流计算过程中,对State有非常高的要求,因为在流系统中输入是一个无限制的流,会持续运行从不间断。在这个过程当中,就需要将状态数据很好的管理起来。

  • Flink的失败恢复依赖于“检查点机制+可部分重发的数据源”。

  • 检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。

  • 可部分重发的数据源:Flink选择最近完成的检查点K。然后系统重放整个分布式的数据流,然后给予每个operator他们在检查点k快照中的状态。数据源被设置为从位置Sk开始重新读取流。例如在Apache Kafka中,那意味着告诉消费者从偏移量Sk开始重新消费。

  • Flink中有两种基本类型的State,即Keyed State和Operator State。

  • State可以被记录,在失败的情况下数据还可以恢复

一句话的事儿:state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】

2 检查点Checkpoint 与Barrier

一句话的事儿: checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态

为了保证state的容错性,Flink需要对state进行checkpoint。

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提是:
持久化的source(如kafka),它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。

  • 单流的barrier:

    1: 屏障作为数据流的一部分随着记录被注入到数据流中。屏障永远不会赶超通常的流记录,它会严格遵循顺序。

    2: 屏障将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。

    3: 每一个屏障携带着快照的ID,快照记录着ID并且将其放在快照数据的前面。

    4: 屏障不会中断流处理,因此非常轻量级。

  • 并行barrier

    1:不止一个输入流的时的operator,需要在快照屏障上对齐(align)输入流,才会发射出去。
    2:可以看到1,2,3会一直放在Input buffer,直到另一个输入流的快照到达Operator。

3 有状态的Operator工作一览图

Stateful Flink applications are optimized for local state access. Task state 
is always maintained in memory or, if the state size exceeds the available memory,
in access-efficient on-disk data structures. Hence, tasks perform all computations 
by accessing local, often in-memory, state yielding very low processing latencies.
Flink guarantees exactly-once state consistency in case of failures by periodically 
and asynchronously checkpointing the local state to durable storage.

4 状态管理

4.1 原始状态与托管状态

Keyed State和Operator State,可以以两种形式存在:

  • 原始状态(raw state)

  • 托管状态(managed state)

  • 托管状态是由Flink框架管理的状态

  • 原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

  • 通常在DataStream上的状态推荐使用托管的状态。

  • 当实现一个用户自定义的operator时,会使用到原始状态

4.2 State-Keyed State 是什么?直接上干货。(兄弟 State-Operator State 与key无关)

  • 顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。
    stream.keyBy(…)

  • state的数据结构;

    (1) ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值

    (2) ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值

    (3) ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值

    (4) MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素

  • 需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。实际上:这些状态有三种存储方式:

      MemoryStateBackend:FsStateBackendRockDBStateBackend
    

4.3 State-Keyed State 存储方式?直接上干货

  • MemoryStateBackend

    state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中
    基于内存的state backend在生产环境下不建议使用。

  • FsStateBackend

    state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中,可以使用hdfs等分布式文件系统。

  • RocksDBStateBackend

    RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时RocksDB需要配置一个远端的filesystem。

    uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。

    RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

4.4 State 生成快照

4.5 State 快照恢复

5 与Key相关的状态管理案例实战(以Key分组进行状态管理)

5.1 RichFlatMapFunction 核心代码奉上

package xuwei.tech.streaming;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** qinkaixin 2018 11 24 */
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), Tuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}
}

5.2 RichFlatMapFunction 执行操作

public static void main(String[] args) throws Exception{//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(0).flatMap(new CountWindowAverage()).print();env.execute("StafulOperator");System.out.println("***********");
}

5.3 最终结果为什么是这样的?

  • if the count reaches 2, emit the average and clear the state
  • 所以Tuple2.of(1L, 3L), Tuple2.of(1L, 5L) 一组
  • 所以Tuple2.of(1L, 7L),Tuple2.of(1L, 4L)一组

6 与Operator相关的State案例实战

  • 与Key无关的State,与Operator绑定的state,整个operator只对应一个state

  • 保存Operator state的数据结构为ListState

  • 举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射

  • 继承CheckpointedFunction,实现snapshotState和restoreState。

      To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface.Whenever a checkpoint has to be performed, snapshotState() is called. The counterpart,initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actuallyrecovering from an earlier checkpoint. Given this,initializeState() is not only the place where different types of state areinitialized, but also where state recoverylogic is included.
    

6.1 BufferingSink案例

    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction {private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}
}

6.2 Stateful Source案例

    public static class CounterSource  extends RichParallelSourceFunction<Long>implements ListCheckpointed<Long> {/**  current offset for exactly once semantics */private Long offset;/** flag for job cancellation */private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {return Collections.singletonList(offset);}@Overridepublic void restoreState(List<Long> state) {for (Long s : state)offset = s;}
}

7 checkPoint的配置进一步升华

7.1 checkpoint 开关

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
  • checkpoint开启之后,默认的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有两种,Exactly-once和At-least-once
  • Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)

7.2 checkpoint 调优配置(Cancel处理很有意思)

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 高级选项:// 设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);cancel处理选项:(1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint(2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
    

8 State Backend 状态的后端存储(一剑封喉)

8.1 配置说明

修改State Backend的两种方式

  • 第一种:单任务调整

      修改当前任务代码env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));或者new MemoryStateBackend()或者new RocksDBStateBackend( hdfs->url, true);【需要添加第三方依赖】
    
  • 第二种:全局调整

      修改flink-conf.yamlstate.backend: filesystemstate.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
    

8.2 精彩案例实战

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class SocketWindowWordCountJavaCheckPoint {public static void main(String[] args) throws Exception{//获取需要的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9010;}//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 高级选项:// 设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend//env.setStateBackend(new MemoryStateBackend());//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String hostname = "SparkMaster";String delimiter = "\n";//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒.sum("count");//在这里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把数据打印到控制台并且设置并行度windowCounts.print().setParallelism(1);//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public  WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "作者 : 秦凯新 , 窗大小2秒,滑动1秒       {" +" word='" + word + '\'' +", count=" + count +'}';}}
}

8.3 精彩案例结果

9 华山论剑结束

这里围绕状态管理进行了详细的说明。一篇好文不容易,请发表你的评论,给予作者以肯定,谢谢。后续更精彩!

10 结语

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

秦凯新 于深圳 20182136

这篇关于Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/441262

相关文章

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

高效管理你的Linux系统: Debian操作系统常用命令指南

《高效管理你的Linux系统:Debian操作系统常用命令指南》在Debian操作系统中,了解和掌握常用命令对于提高工作效率和系统管理至关重要,本文将详细介绍Debian的常用命令,帮助读者更好地使... Debian是一个流行的linux发行版,它以其稳定性、强大的软件包管理和丰富的社区资源而闻名。在使用

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

深入理解C语言的void*

《深入理解C语言的void*》本文主要介绍了C语言的void*,包括它的任意性、编译器对void*的类型检查以及需要显式类型转换的规则,具有一定的参考价值,感兴趣的可以了解一下... 目录一、void* 的类型任意性二、编译器对 void* 的类型检查三、需要显式类型转换占用的字节四、总结一、void* 的

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1