Flink1.4 Fault Tolerance源码解析-1

2024-02-26 12:32

本文主要是介绍Flink1.4 Fault Tolerance源码解析-1,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言:本篇关注Flink,对Fault Tolerance的源码实现进行阐述,主要介绍Api层及Flink现有实现。

本篇文章重点关注以下问题:

  • 具备Fault Tolerance能力的两种对象:Function和Operator
  • 分析两个接口,列举典型实现,并做简要分析

1. 具备Fault Tolerance能力的两种对象

  • Function
  • Operator

1.1 Function对象

org.apache.flink.api.common.functions.Function

作为所有用户自定义函数的基本接口,如已经预定义的FlatMapFunction就是基础自Function,Function并未定义任何方法,只是作为标识接口。
所有Function对象的Fault Tolerance都是通过继承CheckpointedFunction接口实现的,换话说,容错能力是Function的可选项,这点与Operator不同。

1.2 Operator对象

org.apache.flink.streaming.api.operators.StreamOperator

所有Operator的基本接口,如已经预定义的StreamFilter、StreamFlatMap就是StreamOperator的实现。
与Function是标识接口不同,StreamOperator内置了几个和检查点相关的接口方法,因此,在Operator中,容错能力是实现Operator的必选项,这点不难理解,因为Operator处于运行时时,诸如分区信息都是必要要做快照的。


2. CheckpointedFunction

org.apache.flink.streaming.api.checkpoint. CheckpointedFunction

CheckpointedFunction类结构图
CheckpointedFunction接口是有状态转换函数的核心接口,两个接口方法:

  • initializeState:Function初始化的时候调用,一般用作初始化state数据结构。
  • snapshotState:请求state快照时被调用。

snapshotState方法中方法签名中的参数FunctionSnapshotContext可以获取此Function中的所有State信息(快照),通过该上下文,可以获取该Function之前变更所产生的最终结果。

2.1 FlinkKafkaProducerBase

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

FlinkKafkaProducerBase
方法签名:

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {}

FlinkKafkaConsumerBase是Flink实现基于Kafka的Source的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制。

关键代码:

/** Consumer从各topic partitions读取的初始offsets. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;/** 保存已消费的、但是Offset未提交至Broken或Zk的数据. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();/*** 如果程序从Checkpoint启动,此变量保存此Consumer上次消费的offset</br>* * <p>此变量主要由 {@link #initializeState(FunctionInitializationContext)} 进行赋值.**/
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;/** 在state backend上保存的State信息(Offset信息) . */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 兼容1.2.0版本的State,可无视ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);// 各Partition的offset信息this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));if (context.isRestored() && !restoredFromOldState) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());// 兼容1.2.0版本的State,可无视for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {restoredFromOldState = true;unionOffsetStates.add(kafkaOffset);}oldRoundRobinListState.clear();if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {throw new IllegalArgumentException("Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");}// 将待恢复的State信息保存进‘restoredState’变量中,以便程序异常时用于恢复for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);} else {LOG.info("No restore state for FlinkKafkaConsumer.");}
}@Override
public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {// 首先清空state backend对应offset的全局存储(State信息)unionOffsetStates.clear();// KafkaServer的连接器,根据Kafka版本由子类实现final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// 连接器还未初始化,unionOffsetStates的值从 restored offsets 或是 subscribedPartition上读取for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// 如果启用快照时同步提交Offset,则在初始化时,用restoredState给pendingOffsetsToCommit赋值pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {// 通过连接器获取当前消费的OffsetsHashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// 保存当前消费的OffsetpendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}// 给state backend对应offset的全局存储(State信息)赋值for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// pendingOffsetsToCommit的保护机制,最多存储100个元素,正也是此Map需要有序的原因while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}
}

快照总结:

  • initializeState方法从state backend中恢复State,并将相关信息保存入restoredState
  • snapshotState方法将当前准备放入state backend的state信息保存至unionOffsetStates,如果应用需要在快照的同时提交Offset,则将消费的Offset信息保存至pendingOffsetsToCommit。

FlinkKafkaConsumerBase继承了CheckpointListener接口,此接口是一个监听接口,以便当快照完成时通知Function进行一些必要处理;FlinkKafkaConsumerBase借用此接口来提交Offset,代码如下:

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {if (!running) {LOG.debug("notifyCheckpointComplete() called on closed source");return;}final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {LOG.debug("notifyCheckpointComplete() called on uninitialized source");return;}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {try {// 在pendingOffsetsToCommit中找出checkpointId对应的offset信息final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);if (posInMap == -1) {LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);return;}@SuppressWarnings("unchecked")// 取出checkpointId对应的Offset信息Map<KafkaTopicPartition, Long> offsets =(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);// 将该checkpointId之前的Offset信息移除(pendingOffsetsToCommit有序的原因)for (int i = 0; i < posInMap; i++) {pendingOffsetsToCommit.remove(0);}if (offsets == null || offsets.size() == 0) {LOG.debug("Checkpoint state was empty.");return;}// 通过连接器向Broken或Zk提交Offset信息fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);} catch (Exception e) {if (running) {throw e;}}}
}

2.2 其他实现

因项目目前只涉及Kafka,故只研究了KafkaConsumerFunction的容错处理实现,其他诸如StatefulSequenceSource、MessageAcknowledgingSourceBase实现类似。


3. StreamOperator

org.apache.flink.streaming.api.operators.StreamOperator

StreamOperator
StreamOperator内置了我们上面谈到的几个跟检查点相关的接口方法:

  • initializeState
  • snapshotState
  • notifyOfCompletedCheckpoint

正因为快照相关方法都已内置在StreamOperator这个顶层接口中,所以operator中快照机制由可选项变成了必选项。

这里需要注意的是snapshotState方法,它返回值为OperatorSnapshotResult。它是一个可以存储四种State类型的容器:

  • keyedStateManagedFuture
  • keyedStateRawFuture
  • operatorStateManagedFuture
  • operatorStateRawFuture

有关四种State类型不是本节重点,可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html
下面以Flink内置的一个Operator(StreamFlatMap)为切入点,介绍一些常用类。

3.1 AbstractStreamOperator

org.apache.flink.streaming.api.operators.AbstractStreamOperator

AbstractStreamOperator是StreamOperator的抽象类,为operator的实现提供模板,当然也为以上的三个跟快照相关的接口方法的实现提供了模板。

3.2 AbstractUdfStreamOperator

org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator

该抽象类继承自AbstractStreamOperator,用于进一步为operator的实现提供模板,不过从类名可以看出来,它主要是为用户定义函数(udf)的operator提供模板。

值得注意的是,方法snapshotState中,有如下代码:

if (userFunction instanceof CheckpointedFunction) {  ((CheckpointedFunction) userFunction).snapshotState(context);  return true;  
} 

Operator中出现了CheckpointedFunction,这是因为function只是静态的函数,它的运行还必须借助于operator,因此其状态也必须借助于operator来帮助其与Flink的运行时交互以达到最终的持久化的目的。
3.3 StreamFlatMap
StreamFlatMap代码较为简单,专注于使用FlatMap对应的Function实现业务逻辑。

if (userFunction instanceof CheckpointedFunction) {  ((CheckpointedFunction) userFunction).snapshotState(context);  return true;  
} 

4. Function和StreamOperator之间的关联

观察AbstractUdfStreamOperator的构造函数:

public AbstractUdfStreamOperator(F userFunction) {  this.userFunction = requireNonNull(userFunction);  checkUdfCheckpointingPreconditions();  
}  

可以发现,所有UDF的Operator都内嵌了对应的Function,这是因为Function仅仅是一个静态的函数,其真正需要发挥作用依赖于Operator,以便在Flink运行时进行交互达到持久化目的。


小结

本篇剖析了Flink针对Function以及Operator如何做快照以及如何恢复的实现。虽然,还没有涉及到fault tolerance的最终实现机制,但是这是我们的入口。

这篇关于Flink1.4 Fault Tolerance源码解析-1的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/748885

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

OWASP十大安全漏洞解析

OWASP(开放式Web应用程序安全项目)发布的“十大安全漏洞”列表是Web应用程序安全领域的权威指南,它总结了Web应用程序中最常见、最危险的安全隐患。以下是对OWASP十大安全漏洞的详细解析: 1. 注入漏洞(Injection) 描述:攻击者通过在应用程序的输入数据中插入恶意代码,从而控制应用程序的行为。常见的注入类型包括SQL注入、OS命令注入、LDAP注入等。 影响:可能导致数据泄

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

CSP 2023 提高级第一轮 CSP-S 2023初试题 完善程序第二题解析 未完

一、题目阅读 (最大值之和)给定整数序列 a0,⋯,an−1,求该序列所有非空连续子序列的最大值之和。上述参数满足 1≤n≤105 和 1≤ai≤108。 一个序列的非空连续子序列可以用两个下标 ll 和 rr(其中0≤l≤r<n0≤l≤r<n)表示,对应的序列为 al,al+1,⋯,ar​。两个非空连续子序列不同,当且仅当下标不同。 例如,当原序列为 [1,2,1,2] 时,要计算子序列 [