Flink checkpoint 源码分析- Checkpoint snapshot源码分析

2024-04-30 18:12

本文主要是介绍Flink checkpoint 源码分析- Checkpoint snapshot源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

在上一篇的博客里,大致介绍了flink checkpoint中的触发的大体流程,现在介绍一下触发之后下游的算子是如何做snapshot。

上一篇的文章: Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析-CSDN博客

代码分析

1. 在SubtaskCheckpointCoordinatorImpl中的checkpointState 主要进行了这个操作, source首先构造barrier,然后广播给下游。我们现在跟踪一下barrier的流动。

org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState

 CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

这个广播实际上是将数据写入到了下游。写的方法实际上就是netty写。

从flush的方法进去可以看到实际上是通知下游数据可用,下游看到数据可用就可以拉数据。因此可以看到这里的数据传递是通过pull的方式。

最后这个方法最后调用的是:org.apache.flink.runtime.io.network.netty.PartitionRequestQueue#notifyReaderNonEmpty方法,通过netty告知下游有数据了。

这些数据是从哪里读取到的呢?其实是在org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#getNextBuffer里面,flink对netty 进行了封装

从这个方法再往上就可以看到是org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getNextBufferOrEvent。

这里就是channel读取数据的地方。

这里有一个方法:transformToBufferOrEvent。这里判断里面是数据还是时间。flink中定义的事件如下。

如果这个是barrier 会在这里解析出来:

public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {if (buffer.remaining() < 4) {throw new IOException("Incomplete event");}final ByteOrder bufferOrder = buffer.order();buffer.order(ByteOrder.BIG_ENDIAN);try {final int type = buffer.getInt();if (type == END_OF_PARTITION_EVENT) {return EndOfPartitionEvent.INSTANCE;} else if (type == CHECKPOINT_BARRIER_EVENT) {return deserializeCheckpointBarrier(buffer);} else if (type == END_OF_SUPERSTEP_EVENT) {return EndOfSuperstepEvent.INSTANCE;} else if (type == END_OF_CHANNEL_STATE_EVENT) {return EndOfChannelStateEvent.INSTANCE;} else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {long id = buffer.getLong();return new CancelCheckpointMarker(id);} else if (type == ANNOUNCEMENT_EVENT) {int sequenceNumber = buffer.getInt();AbstractEvent announcedEvent = fromSerializedEvent(buffer, classLoader);return new EventAnnouncement(announcedEvent, sequenceNumber);} else if (type == VIRTUAL_CHANNEL_SELECTOR_EVENT) {return new SubtaskConnectionDescriptor(buffer.getInt(), buffer.getInt());} else if (type == CHANNEL_UNAVAILABLE_EVENT) {int channelIndex = buffer.getInt();return new ChannelUnavailableEvent(channelIndex);} else if (type == OTHER_EVENT) {try {final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);final String className = deserializer.readUTF();final Class<? extends AbstractEvent> clazz;try {clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);} catch (ClassNotFoundException e) {throw new IOException("Could not load event class '" + className + "'.", e);} catch (ClassCastException e) {throw new IOException("The class '" + className + "' is not a valid subclass of '"+ AbstractEvent.class.getName() + "'.", e);}final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);event.read(deserializer);return event;} catch (Exception e) {throw new IOException("Error while deserializing or instantiating event.", e);}} else {throw new IOException("Corrupt byte stream for event");}} finally {buffer.order(bufferOrder);}}

未完待续

这篇关于Flink checkpoint 源码分析- Checkpoint snapshot源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949565

相关文章

C++ Sort函数使用场景分析

《C++Sort函数使用场景分析》sort函数是algorithm库下的一个函数,sort函数是不稳定的,即大小相同的元素在排序后相对顺序可能发生改变,如果某些场景需要保持相同元素间的相对顺序,可使... 目录C++ Sort函数详解一、sort函数调用的两种方式二、sort函数使用场景三、sort函数排序

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

kotlin中const 和val的区别及使用场景分析

《kotlin中const和val的区别及使用场景分析》在Kotlin中,const和val都是用来声明常量的,但它们的使用场景和功能有所不同,下面给大家介绍kotlin中const和val的区别,... 目录kotlin中const 和val的区别1. val:2. const:二 代码示例1 Java

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

C++ 各种map特点对比分析

《C++各种map特点对比分析》文章比较了C++中不同类型的map(如std::map,std::unordered_map,std::multimap,std::unordered_multima... 目录特点比较C++ 示例代码 ​​​​​​代码解释特点比较1. std::map底层实现:基于红黑

Spring、Spring Boot、Spring Cloud 的区别与联系分析

《Spring、SpringBoot、SpringCloud的区别与联系分析》Spring、SpringBoot和SpringCloud是Java开发中常用的框架,分别针对企业级应用开发、快速开... 目录1. Spring 框架2. Spring Boot3. Spring Cloud总结1. Sprin