本文主要是介绍StreamTask数据流:StreamTask能力概述、Flink处理网络数据逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 一. StreamTask核心组件与能力
- 二. OneInputStreamTask接入网络数据并处理
- 三. 处理数据
- 1. StreamElement类别
- 2. 业务数据处理逻辑
- 四. 小结
先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的
,这里以OneInputStreamTask为例进行说明。
一. StreamTask核心组件与能力
如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。
OneInputStreamTask
public void init() throws Exception {StreamConfig configuration = getConfiguration();int numberOfInputs = configuration.getNumberOfInputs();if (numberOfInputs > 0) {// 创建CheckpointedInputGateCheckpointedInputGate inputGate = createCheckpointedInputGate();TaskIOMetricGroup taskIOMetricGroup = getEnvironment().getMetricGroup().getIOMetricGroup();taskIOMetricGroup.gauge("checkpointAlignmentTime", inputGate::getAlignmentDurationNanos);// 创建DataOutput组件DataOutput<IN> output = createDataOutput();StreamTaskInput<IN> input = createTaskInput(inputGate, output);// 创建StreamOneInputProcessorinputProcessor = new StreamOneInputProcessor<>(input,output,getCheckpointLock(),operatorChain);}headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
- 创建CheckpointedInputGate:CheckpointedInputGate是对InputGate进行封装,
实现对CheckpointBarrier对齐的功能
。此组件可以接入上游Task实例写入指定InputChannel中的Buffer数据。- 创建DataOutput组件:在StreamTaskInput中会将
接入的数据 通过DataOutput组件输出到算子链的HeaderOperator
中。- 创建StreamTaskInput组件:用于接收数据,将InputGate和DataOutput作为内部成员,完成对数据的接入和输出。
- 创建StreamOneInputProcessor数据处理器:此组件会被Task线程模型调度并执行,实现周期性地从StreamTaskInput组件中读取数据元素并处理。
小结:
OneInputStreamTask初始化过程中,包括创建StreamTaskInput和DataOutput组件。
接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。
二. OneInputStreamTask接入网络数据并处理
StreamTask.processInput()方法定义了处理数据的主要流程。
- 数据最终会通过MailboxProcessor调度与执行
- 调用StreamOneInputProcessor.processInput()方法完成数据元素的获取和处理
- 调度StreamOneInputProcessor组件,串联并运行StreamTaskInput组件、DataOutput组件和OperatorChain组件,最终完成数据元素的处理操作。
StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status = inputProcessor.processInput();// 上游如果还有数据,则继续等待执行if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}// 上游如果没有数据,则发送控制消息到控制器if (status == InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();jointFuture.thenRun(suspendedDefaultAction::resume);
}
接下来详细看StreamOneInputProcessor.processInput()
emitNext():通过StreamTaskNetworkInput接收数据元素,并返回InputStatus判断数据元素是否全部消费完毕。emitNext()会将DataOutput作为参数传递到方法内部,用于将数据元素输出到
算子链中
。
public InputStatus processInput() throws Exception {InputStatus status = input.emitNext(output);if (status == InputStatus.END_OF_INPUT) {synchronized (lock) {operatorChain.endHeadOperatorInput(1);}}return status;
}
StreamTaskNetworkInput.emitNext():处理数据逻辑。
//BufferOrEvent代表数据元素可以是Buffer类型,也可以是事件类型,
//比如CheckpointBarrier、TaskEvent等事件。public InputStatus emitNext(DataOutput<T> output) throws Exception {while (true) {// 从Deserializer中获取数据元素if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);// 如果DeserializationResult对应的Buffer数据已经被消费,则回收Buffer if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}// 如果result是完整的数据元素,则调用processElement()方法进行处理if (result.isFullRecord()) {processElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}// 从checkpointedInputGate中拉取数据//如果bufferOrEvent为空,则判断checkpointedInputGate是否已经关闭,如果已经关闭了则直接返回END_OF_INPUT状态,否则返回NOTHING_AVAILABLE状态。Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();// 如果有数据则调用processBufferOrEvent()方法进行处理if (bufferOrEvent.isPresent()) {processBufferOrEvent(bufferOrEvent.get());} else {// 如果checkpointedInputGate已关闭,则返回END_OF_INPUTif (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");if (!checkpointedInputGate.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}}
}
三. 处理数据
1. StreamElement类别
StreamElement具体类别有StreamRecord、StreamStatus以及Watermark,其中StreamRecord就是需要处理的业务数据,Watermark则是上游传递下来的Watermark事件。
//StreamTaskNetworkInput.processElement()
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {// StreamRecord类型if (recordOrMark.isRecord()){output.emitRecord(recordOrMark.asRecord());// Watermark类型} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);// LatencyMarker类型} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());// StreamStatus类型} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}
}
2. 业务数据处理逻辑
对于业务数据,调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作,然后通过DataOutput输出到算子链
中进行处理。
如下方法调用operator处理,实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator
。
OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecord<IN> record) throws Exception {synchronized (lock) {//累加器计算消费数量numRecordsIn.inc();//通过算子链处理operator.setKeyContextElement1(record);operator.processElement(record);}
}
四. 小结
Flink从InputGate中拉取数据元素并进行反序列化操作,转换成StreamElement类型后,再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。
《Flink设计与实现:核心原理与源码解析》 – 张利兵
这篇关于StreamTask数据流:StreamTask能力概述、Flink处理网络数据逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!