Flink checkpoint 源码分析- Checkpoint snapshot源码分析

2024-04-30 18:12

本文主要是介绍Flink checkpoint 源码分析- Checkpoint snapshot源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

在上一篇的博客里,大致介绍了flink checkpoint中的触发的大体流程,现在介绍一下触发之后下游的算子是如何做snapshot。

上一篇的文章: Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析-CSDN博客

代码分析

1. 在SubtaskCheckpointCoordinatorImpl中的checkpointState 主要进行了这个操作, source首先构造barrier,然后广播给下游。我们现在跟踪一下barrier的流动。

org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState

 CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

这个广播实际上是将数据写入到了下游。写的方法实际上就是netty写。

从flush的方法进去可以看到实际上是通知下游数据可用,下游看到数据可用就可以拉数据。因此可以看到这里的数据传递是通过pull的方式。

最后这个方法最后调用的是:org.apache.flink.runtime.io.network.netty.PartitionRequestQueue#notifyReaderNonEmpty方法,通过netty告知下游有数据了。

这些数据是从哪里读取到的呢?其实是在org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#getNextBuffer里面,flink对netty 进行了封装

从这个方法再往上就可以看到是org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getNextBufferOrEvent。

这里就是channel读取数据的地方。

这里有一个方法:transformToBufferOrEvent。这里判断里面是数据还是时间。flink中定义的事件如下。

如果这个是barrier 会在这里解析出来:

public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {if (buffer.remaining() < 4) {throw new IOException("Incomplete event");}final ByteOrder bufferOrder = buffer.order();buffer.order(ByteOrder.BIG_ENDIAN);try {final int type = buffer.getInt();if (type == END_OF_PARTITION_EVENT) {return EndOfPartitionEvent.INSTANCE;} else if (type == CHECKPOINT_BARRIER_EVENT) {return deserializeCheckpointBarrier(buffer);} else if (type == END_OF_SUPERSTEP_EVENT) {return EndOfSuperstepEvent.INSTANCE;} else if (type == END_OF_CHANNEL_STATE_EVENT) {return EndOfChannelStateEvent.INSTANCE;} else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {long id = buffer.getLong();return new CancelCheckpointMarker(id);} else if (type == ANNOUNCEMENT_EVENT) {int sequenceNumber = buffer.getInt();AbstractEvent announcedEvent = fromSerializedEvent(buffer, classLoader);return new EventAnnouncement(announcedEvent, sequenceNumber);} else if (type == VIRTUAL_CHANNEL_SELECTOR_EVENT) {return new SubtaskConnectionDescriptor(buffer.getInt(), buffer.getInt());} else if (type == CHANNEL_UNAVAILABLE_EVENT) {int channelIndex = buffer.getInt();return new ChannelUnavailableEvent(channelIndex);} else if (type == OTHER_EVENT) {try {final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);final String className = deserializer.readUTF();final Class<? extends AbstractEvent> clazz;try {clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);} catch (ClassNotFoundException e) {throw new IOException("Could not load event class '" + className + "'.", e);} catch (ClassCastException e) {throw new IOException("The class '" + className + "' is not a valid subclass of '"+ AbstractEvent.class.getName() + "'.", e);}final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);event.read(deserializer);return event;} catch (Exception e) {throw new IOException("Error while deserializing or instantiating event.", e);}} else {throw new IOException("Corrupt byte stream for event");}} finally {buffer.order(bufferOrder);}}

未完待续

这篇关于Flink checkpoint 源码分析- Checkpoint snapshot源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949565

相关文章

Nginx屏蔽服务器名称与版本信息方式(源码级修改)

《Nginx屏蔽服务器名称与版本信息方式(源码级修改)》本文详解如何通过源码修改Nginx1.25.4,移除Server响应头中的服务类型和版本信息,以增强安全性,需重新配置、编译、安装,升级时需重复... 目录一、背景与目的二、适用版本三、操作步骤修改源码文件四、后续操作提示五、注意事项六、总结一、背景与

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

Android 缓存日志Logcat导出与分析最佳实践

《Android缓存日志Logcat导出与分析最佳实践》本文全面介绍AndroidLogcat缓存日志的导出与分析方法,涵盖按进程、缓冲区类型及日志级别过滤,自动化工具使用,常见问题解决方案和最佳实... 目录android 缓存日志(Logcat)导出与分析全攻略为什么要导出缓存日志?按需过滤导出1. 按

Linux中的HTTPS协议原理分析

《Linux中的HTTPS协议原理分析》文章解释了HTTPS的必要性:HTTP明文传输易被篡改和劫持,HTTPS通过非对称加密协商对称密钥、CA证书认证和混合加密机制,有效防范中间人攻击,保障通信安全... 目录一、什么是加密和解密?二、为什么需要加密?三、常见的加密方式3.1 对称加密3.2非对称加密四、

MySQL中读写分离方案对比分析与选型建议

《MySQL中读写分离方案对比分析与选型建议》MySQL读写分离是提升数据库可用性和性能的常见手段,本文将围绕现实生产环境中常见的几种读写分离模式进行系统对比,希望对大家有所帮助... 目录一、问题背景介绍二、多种解决方案对比2.1 原生mysql主从复制2.2 Proxy层中间件:ProxySQL2.3

python使用Akshare与Streamlit实现股票估值分析教程(图文代码)

《python使用Akshare与Streamlit实现股票估值分析教程(图文代码)》入职测试中的一道题,要求:从Akshare下载某一个股票近十年的财务报表包括,资产负债表,利润表,现金流量表,保存... 目录一、前言二、核心知识点梳理1、Akshare数据获取2、Pandas数据处理3、Matplotl

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499