Flink1.14.* 各种算子在StreamTask控制下如何调用的源码

2024-08-31 08:12

本文主要是介绍Flink1.14.* 各种算子在StreamTask控制下如何调用的源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  • 前言:
  • 一、StreamTask执行算子的生命周期
  • 二、 Source的streamTask用的是SourceStreamTask
  • 三、基础转换操作,窗口用的是OneInputStreamTask
    • 1、初始化OneInputStreamTask
    • 2、StreamTask运行invoke调用的是StreamTask的processInput方法
    • 3、从缓冲区获取数据放入到内存中
    • 4、调用算子的processElement方法处理数据,
  • 四、sink的streamTask用的也是OneInputStreamTask
  • 五、OneInputStreamTask和SourceStreamTask类关系图

前言:

在 Apache Flink 中,StreamTask 类是处理流数据的核心执行单元。
它负责管理算子的生命周期,并调用算子的处理方法。StreamTask 类的全路径(即完整的包名和类名)如下:
StreamTask 类位于 flink-streaming-java 模块中,具体的包结构为 org.apache.flink.streaming.runtime.tasks
全路径如下

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

一、StreamTask执行算子的生命周期

先看StreamTask大体执行流程(忽略实现类的细节)

public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> implements TaskInvokable, CheckpointableTask, CoordinatedTask, AsyncExceptionHandler, ContainingTaskDetails {protected OP mainOperator;private boolean mailboxLoopRunning;//第一步构造函数,把processInput赋值给mailboxProcessorprotected StreamTask(Environment environment, @Nullable TimerService timerService, UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) throws Exception {this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);//默认为truethis.mailboxLoopRunning = true;}//第三步StreamTask执行public final void invoke() throws Exception {//省略代码this.runMailboxLoop(); }//SourceStreamTask会重写这个方法,OneInputStreamTask不会重写protected void processInput(Controller controller) throws Exception {//删除干扰代码,}
}
public class MailboxProcessor implements Closeable {protected final MailboxDefaultAction mailboxDefaultAction;//第二步 构造函数把processInput方法赋值给mailboxDefaultActionpublic MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor) {//这里mailboxDefaultAction传的是this::processInputthis.mailboxDefaultAction = (MailboxDefaultAction)Preconditions.checkNotNull(mailboxDefaultAction);}//第四步,public void runMailboxLoop() throws Exception {//suspended默认是falsethis.suspended = !this.mailboxLoopRunning;//this.isNextLoopPossible默认是truewhile(this.isNextLoopPossible()) {this.mailboxProcessor.runMailboxLoop();}}private boolean isNextLoopPossible() {return !this.suspended;}//第五步,调用processInputpublic void runMailboxLoop() throws Exception {//这个执行的是processInput方法this.mailboxDefaultAction.runDefaultAction(defaultActionContext);}
}

不同的实现类都是按照上面的步骤初始化执行的

二、 Source的streamTask用的是SourceStreamTask

@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> {private final SourceStreamTask<OUT, SRC, OP>.LegacySourceFunctionThread sourceThread;protected void init() {//这个mainOperator是StreamTask的字段,,SourceFunction<?> source = (SourceFunction)((StreamSource)this.mainOperator).getUserFunction();}    protected void processInput(Controller controller) throws Exception {//这里启动线程的run方法this.sourceThread.start();}private class LegacySourceFunctionThread extends Thread {private final CompletableFuture<Void> completionFuture = new CompletableFuture();LegacySourceFunctionThread() {}public void run() {try {if (!SourceStreamTask.this.operatorChain.isTaskDeployedAsFinished()) {StreamTask.LOG.debug("Legacy source {} skip execution since the task is finished on restore", SourceStreamTask.this.getTaskNameWithSubtaskAndId());((StreamSource)SourceStreamTask.this.mainOperator).run(SourceStreamTask.this.lock, SourceStreamTask.this.operatorChain);}//删除干扰代码} catch (Throwable var2) {//删除干扰代码}}}
}

第一点需要注意的是由于SourceStreamTask重写了streamTaskprocessInput方法,所以streamTaskinvoke方法执行的是子类的SourceStreamTaskprocessInput方法

第二点看一下init方法,这里(SourceFunction)((StreamSource)this.mainOperator).getUserFunction()就是获取的source算子,不清楚的可以看一下kafkaSource这篇文章Flink 1.14.*版本kafkaSource源码
由这里来触发SourceFunctionrun方法,即FlinkKafkaConsumerBaserun方法

三、基础转换操作,窗口用的是OneInputStreamTask

这种一般都是中间算子,或者最后一个算子(例如kafkaSink),所以主要涉及到从输入源获取数据,处理数据,并将结果写入输出中
如果连着看下面两篇文章,你就会知道为什么sink也是用的OneInputStreamTask
Flink 1.14.*中flatMap,filter等基本转换函数源码
Flink 1.14.* 版本kafkaSink源码

1、初始化OneInputStreamTask

public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {public void init() throws Exception {//output是私有类StreamTaskNetworkOutput对象DataOutput<IN> output = this.createDataOutput(numRecordsIn);StreamTaskInput<IN> input = this.createTaskInput(inputGate);//这个inputProcessor字段是给父类StreamTask初始化的,这时候父类inputProcessor=StreamOneInputProcessorthis.inputProcessor = new StreamOneInputProcessor(input, output, this.operatorChain);}private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate) {int numberOfInputChannels = inputGate.getNumberOfInputChannels();StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels);TypeSerializer<IN> inSerializer = this.configuration.getTypeSerializerIn1(this.getUserCodeClassLoader());return StreamTaskNetworkInputFactory.create(inputGate, inSerializer, this.getEnvironment().getIOManager(), statusWatermarkValve, 0, this.getEnvironment().getTaskStateManager().getInputRescalingDescriptor(), (gateIndex) -> {return ((StreamEdge)this.configuration.getInPhysicalEdges(this.getUserCodeClassLoader()).get(gateIndex)).getPartitioner();}, this.getEnvironment().getTaskInfo());}//返回的是下面私有类StreamTaskNetworkOutput对象private DataOutput<IN> createDataOutput(Counter numRecordsIn) {return new OneInputStreamTask.StreamTaskNetworkOutput(this.operatorChain.getFinishedOnRestoreInputOrDefault((Input)this.mainOperator), this.inputWatermarkGauge, numRecordsIn);}    //私有内部类,对应上面init中的outputprivate static class StreamTaskNetworkOutput<IN> implements DataOutput<IN> {private final Input<IN> operator;public void emitRecord(StreamRecord<IN> record) throws Exception {//调用的算子的processElement方法this.operator.processElement(record);}}}

这里是调用init初始化,StreamOneInputProcessor一起初始化了

public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {private StreamTaskInput<IN> input;private DataOutput<IN> output;public StreamOneInputProcessor(StreamTaskInput<IN> input, DataOutput<IN> output, BoundedMultiInput endOfInputAware) {//此input就是StreamTaskNetworkInputthis.input = (StreamTaskInput)Preconditions.checkNotNull(input);//此output就是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象this.output = (DataOutput)Preconditions.checkNotNull(output);this.endOfInputAware = (BoundedMultiInput)Preconditions.checkNotNull(endOfInputAware);}public DataInputStatus processInput() throws Exception {DataInputStatus status = this.input.emitNext(this.output);//删除干扰代码return status;}}

后面看到this.inputProcessor.processInput其实就是调用的上面类的processInput方法

下面简单介绍一下StreamTaskNetworkInputFactory的创建的两种不同的StreamTaskInput,也可以不用看

public class StreamTaskNetworkInputFactory {public StreamTaskNetworkInputFactory() {}//这里只看返回StreamTaskNetworkInputpublic static <T> StreamTaskInput<T> create(CheckpointedInputGate checkpointedInputGate, TypeSerializer<T> inputSerializer, IOManager ioManager, StatusWatermarkValve statusWatermarkValve, int inputIndex, InflightDataRescalingDescriptor rescalingDescriptorinflightDataRescalingDescriptor, Function<Integer, StreamPartitioner<?>> gatePartitioners, TaskInfo taskInfo) {return (StreamTaskInput)(rescalingDescriptorinflightDataRescalingDescriptor.equals(InflightDataRescalingDescriptor.NO_RESCALE) ? new StreamTaskNetworkInput(checkpointedInputGate, inputSerializer, ioManager, statusWatermarkValve, inputIndex) : new RescalingStreamTaskNetworkInput(checkpointedInputGate, inputSerializer, ioManager, statusWatermarkValve, inputIndex, rescalingDescriptorinflightDataRescalingDescriptor, gatePartitioners, taskInfo));}
}

StreamTaskNetworkInputFlink 中用于从网络接收数据并将其传递给任务处理的基本组件。它实现了 StreamInput 接口,并负责从网络缓冲区中读取数据,将数据反序列化为 StreamRecord,然后传递给下游的处理逻辑。
主要功能:

  1. 从网络接收数据:读取来自上游任务通过网络发送的数据。
  2. 数据反序列化:将接收到的字节数据反序列化为 StreamRecord 对象
  3. 调用下游处理逻辑:将反序列化后的 StreamRecord 对象传递给下游的处理逻辑(如操作符的 processElement 方法)。

RescalingStreamTaskNetworkInputStreamTaskNetworkInput 的一个扩展,用于处理任务重新缩放(rescaling)场景下的数据接收。任务重新缩放是指在运行时动态调整任务并行度,以适应负载变化。RescalingStreamTaskNetworkInput 主要用于确保在重新缩放过程中数据能够正确地重新分配和处理。
主要功能:

  1. 处理重新缩放场景:在任务重新缩放期间,确保数据能够正确地重新分配和处理。
  2. 数据重分配逻辑:在接收数据时,可能需要根据新的并行度进行数据重分配,以确保数据能够被正确处理。
  3. 继承自 StreamTaskNetworkInput:继承了 StreamTaskNetworkInput 的基本功能,同时增加了处理重新缩放场景的逻辑。

这样初始化部分就完成了

2、StreamTask运行invoke调用的是StreamTask的processInput方法

通过上面第一章节介绍StreamTask的,知道StreamTaskinvoke方法最终执行的是processInput方法,因为OneInputStreamTask不像SourceStreamTask重写了processInput方法,所以调用的还是父类StreamTaskprocessInput方法

public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> implements TaskInvokable, CheckpointableTask, CoordinatedTask, AsyncExceptionHandler, ContainingTaskDetails {protected void processInput(Controller controller) throws Exception {DataInputStatus status = this.inputProcessor.processInput();}
}

这时候this.inputProcessor=StreamOneInputProcessor,调用processInput即调用StreamOneInputProcessorprocessInput方法

//从OneInputStreamTask初始化章节粘贴过来的,方便
public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {private StreamTaskInput<IN> input;private DataOutput<IN> output;public StreamOneInputProcessor(StreamTaskInput<IN> input, DataOutput<IN> output, BoundedMultiInput endOfInputAware) {//此input就是StreamTaskNetworkInputthis.input = (StreamTaskInput)Preconditions.checkNotNull(input);//此output就是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象this.output = (DataOutput)Preconditions.checkNotNull(output);this.endOfInputAware = (BoundedMultiInput)Preconditions.checkNotNull(endOfInputAware);}public DataInputStatus processInput() throws Exception {DataInputStatus status = this.input.emitNext(this.output);//删除干扰代码return status;}}

StreamOneInputProcessor.processInput中会调this.input.emitNext(this.output),因为构造StreamOneInputProcessor对象时已经赋值

所以processInput方法中DataInputStatus status = this.input.emitNext(this.output) 调用的是StreamTaskNetworkInputemitNext方法;

public final class StreamTaskNetworkInput<T> extends AbstractStreamTaskNetworkInput<T, SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>> {}
public abstract class AbstractStreamTaskNetworkInput<T, R extends RecordDeserializer<DeserializationDelegate<StreamElement>>> implements StreamTaskInput<T> {//从缓冲区读取到当前内存中private R currentRecordDeserializer = null;public DataInputStatus emitNext(DataOutput<T> output) throws Exception {while(true) {//当前内存有缓冲区的数据if (this.currentRecordDeserializer != null) {DeserializationResult result;try {//从deserializationDelegate尝试获取下一个记录result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);} catch (IOException var4) {throw new IOException(String.format("Can't get next record for channel %s", this.lastChannel), var4);}if (result.isFullRecord()) {//处理该记录并返回this.processElement((StreamElement)this.deserializationDelegate.getInstance(), output);return DataInputStatus.MORE_AVAILABLE;}}//通过pollNext()方法从checkpointedInputGate中获取下一个元素,并将其封装在Optional中。Optional<BufferOrEvent> bufferOrEvent = this.checkpointedInputGate.pollNext();//然后检查bufferOrEvent是否存在if (bufferOrEvent.isPresent()) {//如果是缓冲区,则调用processBuffer方法进行处理if (((BufferOrEvent)bufferOrEvent.get()).isBuffer()) {this.processBuffer((BufferOrEvent)bufferOrEvent.get());continue;}//如果是事件,则调用processEvent方法进行处理并返回结果return this.processEvent((BufferOrEvent)bufferOrEvent.get());}}}
}

最终调的是父类AbstractStreamTaskNetworkInputemitNext方法

3、从缓冲区获取数据放入到内存中

通过上面emitNext实现,while循环中先判断当前内存区是否有缓冲区的数据,有则处理结束此次emitNext方法,如果没有则从缓冲区获取数据到当前内存区,再跳过本次循环,让下一个循环开始执行处理内存区数据的方法
this.checkpointedInputGate.pollNext()这个就不看了,你就知道从缓冲区返回数据就行了,

看一下processBuffer方法

protected void processBuffer(BufferOrEvent bufferOrEvent) throws IOException {//获取缓存管道信息this.lastChannel = bufferOrEvent.getChannelInfo();Preconditions.checkState(this.lastChannel != null);//可以理解为给currentRecordDeserializer初始化,选定类型this.currentRecordDeserializer = this.getActiveSerializer(bufferOrEvent.getChannelInfo());Preconditions.checkState(this.currentRecordDeserializer != null, "currentRecordDeserializer has already been released");//把缓冲区的数据写入到当前内存区this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}

4、调用算子的processElement方法处理数据,

通过StreamOneInputProcessor初始化知道,入参output实际上是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {if (recordOrMark.isRecord()) {//这里就调用了OneInputStreamTask里的私有类StreamTaskNetworkOutput中的emitRecord方法output.emitRecord(recordOrMark.asRecord());}
}
private static class StreamTaskNetworkOutput<IN> implements DataOutput<IN> {private final Input<IN> operator;public void emitRecord(StreamRecord<IN> record) throws Exception {//调用的算子的processElement方法this.operator.processElement(record);}
}

emitRecord方法就会调用算子的processElement方法,之后就可以看基础转换函数和窗口函数文章中,他们是被调用processElement触发的
如果不清楚可以看Flink 1.14.*中flatMap,filter等基本转换函数源码

四、sink的streamTask用的也是OneInputStreamTask

sink可以看成是一个像flatMapfilter、窗口一样的算子,通过OneInputStreamTask触发到sinkFuncitionprocessElement方法,执行流程都是一样的,

不懂的可以看下面两篇文章,比对一下,sink和基本转换、窗口算子触发方式是否一样
Flink 1.14.*中flatMap,filter等基本转换函数源码
Flink 1.14.* 版本kafkaSink源码

五、OneInputStreamTask和SourceStreamTask类关系图

在这里插入图片描述

在这里插入图片描述

比对两个关系图,SourceStreamTask多了SourceFunction接口和streamSource

这篇关于Flink1.14.* 各种算子在StreamTask控制下如何调用的源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1123357

相关文章

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

如何在页面调用utility bar并传递参数至lwc组件

1.在app的utility item中添加lwc组件: 2.调用utility bar api的方式有两种: 方法一,通过lwc调用: import {LightningElement,api ,wire } from 'lwc';import { publish, MessageContext } from 'lightning/messageService';import Ca

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大,但不同的语言在不同领域都有着自己的优势,为了强强联合,LabVIEW提供了强大的外部程序接口能力,包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等。通过DLL可以使用户很方便地调用C、C++、C#、VB等编程语言写的程序以及windows自带的大