本文主要是介绍Flink checkpoint 源码分析- Checkpoint snapshot源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
背景
在上一篇的博客里,大致介绍了flink checkpoint中的触发的大体流程,现在介绍一下触发之后下游的算子是如何做snapshot。
上一篇的文章: Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析-CSDN博客
代码分析
1. 在SubtaskCheckpointCoordinatorImpl中的checkpointState 主要进行了这个操作, source首先构造barrier,然后广播给下游。我们现在跟踪一下barrier的流动。
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState
CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());
这个广播实际上是将数据写入到了下游。写的方法实际上就是netty写。
从flush的方法进去可以看到实际上是通知下游数据可用,下游看到数据可用就可以拉数据。因此可以看到这里的数据传递是通过pull的方式。
最后这个方法最后调用的是:org.apache.flink.runtime.io.network.netty.PartitionRequestQueue#notifyReaderNonEmpty方法,通过netty告知下游有数据了。
这些数据是从哪里读取到的呢?其实是在org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#getNextBuffer里面,flink对netty 进行了封装
从这个方法再往上就可以看到是org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getNextBufferOrEvent。
这里就是channel读取数据的地方。
这里有一个方法:transformToBufferOrEvent。这里判断里面是数据还是时间。flink中定义的事件如下。
如果这个是barrier 会在这里解析出来:
public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {if (buffer.remaining() < 4) {throw new IOException("Incomplete event");}final ByteOrder bufferOrder = buffer.order();buffer.order(ByteOrder.BIG_ENDIAN);try {final int type = buffer.getInt();if (type == END_OF_PARTITION_EVENT) {return EndOfPartitionEvent.INSTANCE;} else if (type == CHECKPOINT_BARRIER_EVENT) {return deserializeCheckpointBarrier(buffer);} else if (type == END_OF_SUPERSTEP_EVENT) {return EndOfSuperstepEvent.INSTANCE;} else if (type == END_OF_CHANNEL_STATE_EVENT) {return EndOfChannelStateEvent.INSTANCE;} else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {long id = buffer.getLong();return new CancelCheckpointMarker(id);} else if (type == ANNOUNCEMENT_EVENT) {int sequenceNumber = buffer.getInt();AbstractEvent announcedEvent = fromSerializedEvent(buffer, classLoader);return new EventAnnouncement(announcedEvent, sequenceNumber);} else if (type == VIRTUAL_CHANNEL_SELECTOR_EVENT) {return new SubtaskConnectionDescriptor(buffer.getInt(), buffer.getInt());} else if (type == CHANNEL_UNAVAILABLE_EVENT) {int channelIndex = buffer.getInt();return new ChannelUnavailableEvent(channelIndex);} else if (type == OTHER_EVENT) {try {final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);final String className = deserializer.readUTF();final Class<? extends AbstractEvent> clazz;try {clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);} catch (ClassNotFoundException e) {throw new IOException("Could not load event class '" + className + "'.", e);} catch (ClassCastException e) {throw new IOException("The class '" + className + "' is not a valid subclass of '"+ AbstractEvent.class.getName() + "'.", e);}final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);event.read(deserializer);return event;} catch (Exception e) {throw new IOException("Error while deserializing or instantiating event.", e);}} else {throw new IOException("Corrupt byte stream for event");}} finally {buffer.order(bufferOrder);}}
未完待续
这篇关于Flink checkpoint 源码分析- Checkpoint snapshot源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!