本文主要是介绍Flink1.14 SplitEnumerator概念入门讲解与源码解析 (二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
SplitEnumerator概念
SplitEnumerator源码
void start()
void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
void addSplitsBack(List splits, int subtaskId);
void addReader(int subtaskId);
CheckpointT snapshotState(long checkpointId) throws Exception;
void close() throws IOException;
default void notifyCheckpointComplete(long checkpointId) throws Exception {}
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
StaticFileSplitEnumerator源码
ContinuousFileSplitEnumerator源码
public void start()
private void processDiscoveredSplits(Collection splits, Throwable error)
private void assignSplits()
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname)
参考
SplitEnumerator概念
SplitEnumerator被认为是整个Source的“大脑”。SplitEnumerator的功能实现
- SourceReader的注册处理
- SourceReader的失败处理
- SourceReader失败时会调用addSplitsBack()方法。SplitEnumerator应当收回已经被分配,但尚未被该SourceReader确认(acknowledged)的分片。
- SourceEvent的处理
- SourceEvents是SplitEnumerator和SourceReader之间来回传递的自定义事件。可以利用此机制来执行复杂的协调任务。
- 分片的发现以及分配
- SplitEnumerator可以将分片分配到SourceReader从而响应各种事件,包括发现新的分片,新SourceReader的注册,SourceReader的失败处理等。
SplitEnumerator 可以在 SplitEnumeratorContext 的帮助下完成所有上述工作,其会在 SplitEnumerator 的创建或者恢复的时候提供给 Source 。 SplitEnumeratorContext 允许SplitEnumerator 检索到 reader 的必要信息并执行协调操作。而在 Source 的实现中会将 SplitEnumeratorContext 传递给 SplitEnumerator 实例。
SplitEnumerator 的实现可以仅采用被动工作方式,即仅在其方法被调用时采取协调操作,但是一些 SplitEnumerator 的实现会采取主动性的工作方式。例如,SplitEnumerator
定期寻找分片并分配给 SourceReader
。 这类问题使用 SplitEnumeratorContext
类中的 callAsync()
方法比较方便。下面的代码片段展示了如何在 SplitEnumerator
不需要自己维护线程的条件下实现这一点。
class MySplitEnumerator implements SplitEnumerator<MySplit, MyCheckpoint> {private final long DISCOVER_INTERVAL = 60_000L;/*** 一种发现分片的方法*/private List<MySplit> discoverSplits() {...}@Overridepublic void start() {...enumContext.callAsync(this::discoverSplits, splits -> {Map<Integer, List<MySplit>> assignments = new HashMap<>();int parallelism = enumContext.currentParallelism();for (MySplit split : splits) {int owner = split.splitId().hashCode() % parallelism;assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);}enumContext.assignSplits(new SplitsAssignment<>(assignments));}, 0L, DISCOVER_INTERVAL);...}...
}
SplitEnumerator源码
void start()
/*** Start the split enumerator.** <p>The default behavior does nothing.*/// 启动方法void start();// ContinuousFileSplitEnumerator类中的实现// 用于流模式读取无界的FileSource,持续运行发现机制,周期性检测文件split然后分配给reader@Overridepublic void start() {context.callAsync(() -> enumerator.enumerateSplits(paths, 1),this::processDiscoveredSplits,discoveryInterval,discoveryInterval);}// 以下是NonSplittingRecursiveEnumerator类枚举目录下文件的代码实现@Overridepublic Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)throws IOException {final ArrayList<FileSourceSplit> splits = new ArrayList<>();for (Path path : paths) {final FileSystem fs = path.getFileSystem();final FileStatus status = fs.getFileStatus(path);addSplitsForPath(status, fs, splits);}return splits;}private void addSplitsForPath(FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target)throws IOException {if (!fileFilter.test(fileStatus.getPath())) {return;}if (!fileStatus.isDir()) {convertToSourceSplits(fileStatus, fs, target);return;}final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());for (FileStatus containedStatus : containedFiles) {addSplitsForPath(containedStatus, fs, target);}}
start方法主要是异步调用,定时调用fileEnumerator.enumerateSplits(paths, 1)方法,并且将返回结果交给processDiscoveredSplits方法处理,初始的延迟时间和间隔时间都是discoveryInterval。主要是在ContinuousFileSplitEnumerator类中实现。
void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
/*** Handles the request for a split. This method is called when the reader with the given subtask* id calls the {@link SourceReaderContext#sendSplitRequest()} method.** @param subtaskId the subtask id of the source reader who sent the source event.* @param requesterHostname Optional, the hostname where the requesting task is running. This* can be used to make split assignments locality-aware.*/// 处理split请求。拥有指定subtask id的reader调用SourceReaderContext#sendSplitRequest()的时候调用void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);// 以下方法在ContinuousFileSplitEnumerator类中实现@Overridepublic void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {// 既然有reader发来请求split,说明这个reader在空闲等待,加入到等待的reader集合中readersAwaitingSplit.put(subtaskId, requesterHostname);assignSplits();}private void assignSplits() {final Iterator<Map.Entry<Integer, String>> awaitingReader =readersAwaitingSplit.entrySet().iterator();while (awaitingReader.hasNext()) {// 遍历等待的readerfinal Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();// if the reader that requested another split has failed in the meantime, remove// it from the list of waiting readers// 忽略掉已经请求其他split,但是中途失败的readerif (!context.registeredReaders().containsKey(nextAwaiting.getKey())) {awaitingReader.remove();continue;}// 获取请求split reader所在hostname和subtask IDfinal String hostname = nextAwaiting.getValue();final int awaitingSubtask = nextAwaiting.getKey();// 获取下一个splitfinal Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);if (nextSplit.isPresent()) {// 调用SplitEnumeratorContext的指派单个split方法context.assignSplit(nextSplit.get(), awaitingSubtask);awaitingReader.remove();} else {break;}}}
void addSplitsBack(List<SplitT> splits, int subtaskId);
/*** Add a split back to the split enumerator. It will only happen when a {@link SourceReader}* fails and there are splits assigned to it after the last successful checkpoint.** @param splits The split to add back to the enumerator for reassignment.* @param subtaskId The id of the subtask to which the returned splits belong.*/void addSplitsBack(List<SplitT> splits, int subtaskId);// ContinuousFileSplitEnumerator类中的实现@Overridepublic void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {LOG.debug("File Source Enumerator adds splits back: {}", splits);splitAssigner.addSplits(splits);}/*** Adds a set of splits to this assigner. This happens for example when some split processing* failed and the splits need to be re-added, or when new splits got discovered.*/void addSplits(Collection<FileSourceSplit> splits);
FileSplitAssigner
有两个实现类(后面再细讲):
- SimpleSplitAssigner:随机顺序分配分片,不考虑位置。
- LocalityAwareSplitAssigner:优先分配位于本地的分片。
void addReader(int subtaskId);
/*** Add a new source reader with the given subtask ID.** @param subtaskId the subtask ID of the new source reader.*/// 添加一个新的SourceReader,指定subtask IDvoid addReader(int subtaskId);@Overridepublic void addReader(int subtaskId) {// this source is purely lazy-pull-based, nothing to do upon registration}
CheckpointT snapshotState(long checkpointId) throws Exception;
/*** Creates a snapshot of the state of this split enumerator, to be stored in a checkpoint.** <p>The snapshot should contain the latest state of the enumerator: It should assume that all* operations that happened before the snapshot have successfully completed. For example all* splits assigned to readers via {@link SplitEnumeratorContext#assignSplit(SourceSplit, int)}* and {@link SplitEnumeratorContext#assignSplits(SplitsAssignment)}) don't need to be included* in the snapshot anymore.** <p>This method takes the ID of the checkpoint for which the state is snapshotted. Most* implementations should be able to ignore this parameter, because for the contents of the* snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be* interesting for source connectors with external systems where those systems are themselves* aware of checkpoints; for example in cases where the enumerator notifies that system about a* specific checkpoint being triggered.** @param checkpointId The ID of the checkpoint for which the snapshot is created.* @return an object containing the state of the split enumerator.* @throws Exception when the snapshot cannot be taken.*/// 创建split enumerator的checkpoint// 需要假设所有操作在snapshot成功完成前发生。比如assignSplit操作不需要再snapshot中考虑CheckpointT snapshotState(long checkpointId) throws Exception;// 如果需要在快照中新增自定义的数据,可以在fromCollectionSnapshot中,自定义PendingSplitsCheckpoint这个类@Overridepublic PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long checkpointId)throws Exception {final PendingSplitsCheckpoint<FileSourceSplit> checkpoint =PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits(), pathsAlreadyProcessed);LOG.debug("Source Checkpoint is {}", checkpoint);return checkpoint;}
void close() throws IOException;
/*** Called to close the enumerator, in case it holds on to any resources, like threads or network* connections.*/@Overridevoid close() throws IOException;@Overridepublic void close() throws IOException {// no resources to close}
default void notifyCheckpointComplete(long checkpointId) throws Exception {}
/*** We have an empty default implementation here because most source readers do not have to* implement the method.** @see CheckpointListener#notifyCheckpointComplete(long)*/// checkpoint完成时候通知@Overridedefault void notifyCheckpointComplete(long checkpointId) throws Exception {}
空的默认方法,一般不会实现该方法。
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
/*** Handles a custom source event from the source reader.** <p>This method has a default implementation that does nothing, because it is only required to* be implemented by some sources, which have a custom event protocol between reader and* enumerator. The common events for reader registration and split requests are not dispatched* to this method, but rather invoke the {@link #addReader(int)} and {@link* #handleSplitRequest(int, String)} methods.** @param subtaskId the subtask id of the source reader who sent the source event.* @param sourceEvent the source event from the source reader.*/// 处理source reader的自定义SourceEventdefault void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}// 也是默认实现的空方法@Overridepublic void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {LOG.error("Received unrecognized event: {}", sourceEvent);}
接下来我们来看看,SplitEnumerator的具体两个的实现类ContinuousFileSplitEnumerator、StaticFileSplitEnumerator。
StaticFileSplitEnumerator源码
StaticFileSplitEnumerator主要是用于实现有界/批量的读取。枚举器会将配置文件下的目录下所有的文件进行读取,然后分配给reader。一旦所有文件都读取完后,source将会完成,这个static的类实现比较简便,而实际上的创建FileSourceSplits和决定哪个reader得到具体哪个split的逻辑主要是在FileEnumerator类和FileSplitAssigner类中。顺便一提,创建FileSourceSplits是在AbstractFileSource类createSplitEnumerator方法中。
// StaticFileSplitEnumerator的调用入口
if (continuousEnumerationSettings == null) {// bounded casereturn castGeneric(new StaticFileSplitEnumerator(fileSplitContext, splitAssigner));}
/*** A SplitEnumerator implementation for bounded / batch {@link FileSource} input.** <p>This enumerator takes all files that are present in the configured input directories and* assigns them to the readers. Once all files are processed, the source is finished.** <p>The implementation of this class is rather thin. The actual logic for creating the set of* FileSourceSplits to process, and the logic to decide which reader gets what split, are in {@link* FileEnumerator} and in {@link FileSplitAssigner}, respectively.*/
@Internal
public class StaticFileSplitEnumeratorimplements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>> {private static final Logger LOG = LoggerFactory.getLogger(StaticFileSplitEnumerator.class);private final SplitEnumeratorContext<FileSourceSplit> context;private final FileSplitAssigner splitAssigner;// ------------------------------------------------------------------------public StaticFileSplitEnumerator(SplitEnumeratorContext<FileSourceSplit> context, FileSplitAssigner splitAssigner) {this.context = checkNotNull(context);this.splitAssigner = checkNotNull(splitAssigner);}@Overridepublic void start() {// no resources to start}@Overridepublic void close() throws IOException {// no resources to close}@Overridepublic void addReader(int subtaskId) {// this source is purely lazy-pull-based, nothing to do upon registration}@Overridepublic void handleSplitRequest(int subtask, @Nullable String hostname) {if (!context.registeredReaders().containsKey(subtask)) {// reader failed between sending the request and now. skip this request.return;}if (LOG.isInfoEnabled()) {final String hostInfo =hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);}final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);if (nextSplit.isPresent()) {final FileSourceSplit split = nextSplit.get();context.assignSplit(split, subtask);LOG.info("Assigned split to subtask {} : {}", subtask, split);} else {context.signalNoMoreSplits(subtask);LOG.info("No more splits available for subtask {}", subtask);}}@Overridepublic void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {LOG.error("Received unrecognized event: {}", sourceEvent);}@Overridepublic void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {LOG.debug("File Source Enumerator adds splits back: {}", splits);splitAssigner.addSplits(splits);}@Overridepublic PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long checkpointId) {return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());}
}
以上源码主要是handleSplitRequest这个方法,而start、close、addreader方法都是空实现的。
@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {// 如果reader没有被注册/注册了但是发送完请求后挂了,忽略这个请求if (!context.registeredReaders().containsKey(subtask)) {// reader failed between sending the request and now. skip this request.return;}if (LOG.isInfoEnabled()) {final String hostInfo =hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);}// 从split分配器(后面分析),拿到一个期望分配给hostname的splitfinal Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);if (nextSplit.isPresent()) {// 如果拿到了,分配这个splitfinal FileSourceSplit split = nextSplit.get();context.assignSplit(split, subtask);LOG.info("Assigned split to subtask {} : {}", subtask, split);} else {// 如果没拿到,发送没有更多的split信号context.signalNoMoreSplits(subtask);LOG.info("No more splits available for subtask {}", subtask);}
}
ContinuousFileSplitEnumerator源码
ContinuousFileSplitEnumerator主要是用于流模式读取的FileSource,周期运行发现机制,然后分配给reader。
public void start()
@Override
public void start() {// 定时调用fileEnumerator.enumerateSplits(paths, 1)方法// 返回结果交给processDiscoveredSplits方法处理// 初始延迟时间和间隔时间都为discoveryIntervalcontext.callAsync(() -> enumerator.enumerateSplits(paths, 1),this::processDiscoveredSplits,discoveryInterval,discoveryInterval);
}// NonSplittingRecursiveEnumerator类中的实现@Overridepublic Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)throws IOException {final ArrayList<FileSourceSplit> splits = new ArrayList<>();for (Path path : paths) {final FileSystem fs = path.getFileSystem();final FileStatus status = fs.getFileStatus(path);addSplitsForPath(status, fs, splits);}return splits;}private void addSplitsForPath(FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target)throws IOException {if (!fileFilter.test(fileStatus.getPath())) {return;}// 如果不是目录,就直接进行转换切分if (!fileStatus.isDir()) {convertToSourceSplits(fileStatus, fs, target);return;}// 是目录那么就获取该目录下的文件状态final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());// 通过递归迭代的方式,遍历目录下所有的子目录与文件,直到遍历所有的文件for (FileStatus containedStatus : containedFiles) {addSplitsForPath(containedStatus, fs, target);}}protected void convertToSourceSplits(final FileStatus file, final FileSystem fs, final List<FileSourceSplit> target)throws IOException {// 获取datanode里面的host的信息final String[] hosts =getHostsFromBlockLocations(fs.getFileBlockLocations(file, 0L, file.getLen()));// 创建新的FileSourceSplit,并且加入到arraylist中。target.add(new FileSourceSplit(getNextId(), file.getPath(), 0, file.getLen(), hosts));}// 获取下一个id,并且返回的是string类型protected final String getNextId() {// because we just increment numbers, we increment the char representation directly,// rather than incrementing an integer and converting it to a string representation// every time again (requires quite some expensive conversion logic).incrementCharArrayByOne(currentId, currentId.length - 1);return new String(currentId);}private static String[] getHostsFromBlockLocations(BlockLocation[] blockLocations)throws IOException {if (blockLocations.length == 0) {return StringUtils.EMPTY_STRING_ARRAY;}if (blockLocations.length == 1) {return blockLocations[0].getHosts();}final LinkedHashSet<String> allHosts = new LinkedHashSet<>();for (BlockLocation block : blockLocations) {// 将一个block对应的所有的host都加入到hashset中,后续知道具体去哪个datanode中读取data split数据。allHosts.addAll(Arrays.asList(block.getHosts()));}return allHosts.toArray(new String[allHosts.size()]);}private static void incrementCharArrayByOne(char[] array, int pos) {char c = array[pos];c++;if (c > '9') {c = '0';incrementCharArrayByOne(array, pos - 1);}array[pos] = c;}
private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error)
// 定时任务去发现split的逻辑方法private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error) {// 判断是否有errorif (error != null) {LOG.error("Failed to enumerate files", error);return;}// 过滤掉已经处理过的path对应的splitfinal Collection<FileSourceSplit> newSplits =splits.stream().filter((split) -> pathsAlreadyProcessed.add(split.path())).collect(Collectors.toList());// 为SplitAssigner添加可分配的split,在分配split的时候通过getNext获取splitAssigner.addSplits(newSplits);// 指派splitassignSplits();}
private void assignSplits()
private void assignSplits() {// 获取所有等待读取split的reader的iteratorfinal Iterator<Map.Entry<Integer, String>> awaitingReader =readersAwaitingSplit.entrySet().iterator();while (awaitingReader.hasNext()) {// 遍历等待的readerfinal Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();// if the reader that requested another split has failed in the meantime, remove// it from the list of waiting readers// 忽略掉已经请求其他split,但是中途失败的readerif (!context.registeredReaders().containsKey(nextAwaiting.getKey())) {awaitingReader.remove();continue;}// 获取请求split reader所在hostname和subtask IDfinal String hostname = nextAwaiting.getValue();final int awaitingSubtask = nextAwaiting.getKey();// 获取下一个splitfinal Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);if (nextSplit.isPresent()) {// 调用SplitEnumeratorContext的指派split方法context.assignSplit(nextSplit.get(), awaitingSubtask);// 从map中移除这个readerawaitingReader.remove();} else {break;}}
}
而分配split的方法在SplitEnumeratorContext中,代码如下
/*** Assign the splits.** @param newSplitAssignments the new split assignments to add.*/void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);/*** Assigns a single split.** <p>When assigning multiple splits, it is more efficient to assign all of them in a single* call to the {@link #assignSplits(SplitsAssignment)} method.** @param split The new split* @param subtask The index of the operator's parallel subtask that shall receive the split.*/default void assignSplit(SplitT split, int subtask) {assignSplits(new SplitsAssignment<>(split, subtask));}
它具体实现的assignSplits
方法位于SourceCoordinatorContext
中:
@Override
public void assignSplits(SplitsAssignment<SplitT> assignment) {// Ensure the split assignment is done by the coordinator executor.// 确保下面代码在coordinator线程中执行callInCoordinatorThread(() -> {// Ensure all the subtasks in the assignment have registered.// 逐个检查这些subTask ID是否都被注册// operator发送ReaderRegistrationEvent事件用来注册reader,处理方法位于SourceCoordinator的handleEventFromOperator方法for (Integer subtaskId : assignment.assignment().keySet()) {if (!registeredReaders.containsKey(subtaskId)) {throw new IllegalArgumentException(String.format("Cannot assign splits %s to subtask %d because the subtask is not registered.",registeredReaders.get(subtaskId), subtaskId));}}// 记录assigment到assignment追踪器assignmentTracker.recordSplitAssignment(assignment);assignment.assignment().forEach((id, splits) -> {// 遍历所有的assignment,通过SubtaskGateway向subtask发送AddSplitEventfinal OperatorCoordinator.SubtaskGateway gateway =getGatewayAndCheckReady(id);final AddSplitEvent<SplitT> addSplitEvent;try {addSplitEvent =new AddSplitEvent<>(splits, splitSerializer);} catch (IOException e) {throw new FlinkRuntimeException("Failed to serialize splits.", e);}gateway.sendEvent(addSplitEvent);});return null;},String.format("Failed to assign splits %s due to ", assignment));
}
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname)
@Overridepublic void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {// 将发来response请求的reader加入到等待的reader集合中,排队领取splitreadersAwaitingSplit.put(subtaskId, requesterHostname);// 调用指派split方法assignSplits();}
参考
数据源 | Apache Flink
Flink 源码之新 Source 架构 - 简书
这篇关于Flink1.14 SplitEnumerator概念入门讲解与源码解析 (二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!