Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

2024-05-08 18:28

本文主要是介绍Flink checkpoint 源码分析- Checkpoint snapshot 处理流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

在上一篇博客中我们分析了代码中barrier的是如何流动改的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客

最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 

现在我们接着跟踪相应代码,观察是如何算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。

代码分析

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。

通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler

这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理历程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。

At least once 下 barrier是如何处理的

at least once 下对于barrier的处理是在以下的方法中实现的。

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier

public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId = receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels == 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount = null;int pos = 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId == barrierId) {barrierCount = next;break;}pos++;}if (barrierCount != null) {// add one to the count to that barrier and check for completionint numBarriersNew = barrierCount.incrementBarrierCount();if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug("Received all barriers for checkpoint {}", barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId > latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID = barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}

如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.

在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。

这里面当收取到第一个barrier,会将这个barrier信息存在个队列中。

当收取到时非第一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候就会将barrier队列中在这个barrier之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。

Exactly once checkpoint是如何处理的

首先看这一段的代码

@Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId == barrierId);if (numBarriersReceived++ == 0) {if (getNumOpenChannels() == 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}}try {currentState = currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived == numOpenChannels) {numBarriersReceived = 0;lastCancelledOrCompletedCheckpointId = currentCheckpointId;LOG.debug("{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}}

这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。

unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。

这篇关于Flink checkpoint 源码分析- Checkpoint snapshot 处理流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/971085

相关文章

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装

MySQL的cpu使用率100%的问题排查流程

《MySQL的cpu使用率100%的问题排查流程》线上mysql服务器经常性出现cpu使用率100%的告警,因此本文整理一下排查该问题的常规流程,文中通过代码示例讲解的非常详细,对大家的学习或工作有一... 目录1. 确认CPU占用来源2. 实时分析mysql活动3. 分析慢查询与执行计划4. 检查索引与表

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

Redis如何使用zset处理排行榜和计数问题

《Redis如何使用zset处理排行榜和计数问题》Redis的ZSET数据结构非常适合处理排行榜和计数问题,它可以在高并发的点赞业务中高效地管理点赞的排名,并且由于ZSET的排序特性,可以轻松实现根据... 目录Redis使用zset处理排行榜和计数业务逻辑ZSET 数据结构优化高并发的点赞操作ZSET 结

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构