Flink中异步AsyncIO的实现 (源码分析)

2024-05-13 21:58

本文主要是介绍Flink中异步AsyncIO的实现 (源码分析),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

先上张图整体了解Flink中的异步io

阿里贡献给flink的,优点就不说了嘛,官网上都有,就是写库不会柱塞性能更好

然后来看一下, Flink 中异步io主要分为两种

  一种是有序Ordered

  一种是无序UNordered

主要区别是往下游output的顺序(注意这里顺序不是写库的顺序既然都异步了写库的顺序自然是无法保证的),有序的会按接收的顺序继续往下游output发送,无序就是谁先处理完谁就先往下游发送

两张图了解这两种模式的实现

有序:record数据会通过异步线程写库,Emitter是一个守护进程,会不停的拉取queue头部的数据,如果头部的数据异步写库完成,Emitter将头数据往下游发送,如果头元素还没有异步写库完成,柱塞 www.wityx.com     

无序:record数据会通过异步线程写库,这里有两个queue,一开始放在uncompleteedQueue,当哪个record异步写库成功后就直接放到completedQueue中,Emitter是一个守护进程,completedQueue只要有数据,会不停的拉取queue数据往下游发送 

可以看到原理还是很简单的,两句话就总结完了,就是利用queue和java的异步线程,现在来看下源码

这里AsyncIO在Flink中被设计成operator中的一种,自然去OneInputStreamOperator的实现类中去找

于是来看一下AsyncWaitOperator.java

看到它的open方法(open方法会在taskmanager启动job的时候全部统一调用,可以翻一下以前的文章)

这里启动了一个守护线程Emitter,来看下线程具体做了什么

 1处拉取数据,2处就是常规的将拉取到的数据往下游emit,Emitter拉取数据,这里先不讲因为分为有序的和无序的

 这里已经知道了这个Emitter的作用是循环的拉取数据往下游发送

 回到AsyncWaitOperator.java在它的open方法初始化了Emitter,那它是如何处理接收到的数据的呢,看它的ProcessElement()方法

 其实主要就是三个个方法

先是!!!将record封装成了一个包装类StreamRecordQueueEntry,主要是这个包装类的构造方法中,创建了一个CompleteableFuture(这个的complete方法其实会等到用户代码执行的时候用户自己决定什么时候完成)

1处主要就是讲元素加入到了对应的queue,这里也分为两种有序和无序的

这里也先不讲这两种模式加入数据的区别

接着2处就是调用用户的代码了,来看看官网的异步io的例子

 给了一个Future作为参数,用户自己起了一个线程(这里思考一下就知道了为什么要新起一个异步线程去执行,因为如果不起线程的话,那processElement方法就柱塞了,无法异步了)去写库读库等,然后调用了这个参数的complete方法(也就是前面那个包装类中的CompleteableFuture)并且传入了一个结果

看下complete方法源码

 这个resultFuture是每个record的包装类StreamRecordQueueEntry的其中一个属性是一个CompletableFuture

 那现在就清楚了,用户代码在自己新起的线程中当自己的逻辑执行完以后会使这个异步线程结束,并输入一个结果

 那这个干嘛用的呢

最开始的图中看到有序和无序实现原理,有序用一个queue,无序用两个queue分别就对应了

OrderedStreamElementQueue类中

 UnorderedStreamElementQueue类中

回到前面有两个地方没有细讲,一是两种模式的Emitter是如何拉取数据的,二是两种模式下数据是如何加入OrderedStreamElementQueue的

有序模式:

1.先来看一下有序模式的,Emitter的数据拉取,和数据的加入

    其tryPut()方法

     onComplete方法

       onCompleteHandler方法

  这里比较绕,先将接收的数据加入queue中,然后onComplete()中当上一个异步线程getFuture() 其实就是每个元素包装类里面的那个CompletableFuture,当他结束时(会在用户方法用户调用complete时结束)异步调用传入的对象的 accept方法,accept方法中调用了onCompleteHandler()方法,onCompleteHandler方法中会判断queue是否为空,以及queue的头元素是否完成了用户的异步方法,当完成的时候,就会将headIsCompleted这个对象signalAll()唤醒

2.接着看有序模式Emitter的拉取数据

   这里有序方式拉取数据的逻辑很清晰,如果为空或者头元素没有完成用户的异步方法,headIsCompleted这个对象会wait住(上面可以知道,当加入元素的到queue且头元素完成异步方法的时候会signalAll())然后将头数据返回,往下游发送

这样就实现了有序发送,因为Emitter只拉取头元素且已经完成用户异步方法的头元素

无序模式: 

  这里和有序模式就大同小异了,只是变成了,接收数据后直接加入uncompletedQueue,当数据完成异步方法的时候就,放到completedQueue里面去并signalAll(),只要completedqueue里面有数据,Emitter就拉取往下发

这样就实现了无序模式,也就是异步写入谁先处理完就直接放到完成队列里面去,然后往下发,不用管接收数据的顺序

这篇关于Flink中异步AsyncIO的实现 (源码分析)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/986939

相关文章

Python如何使用__slots__实现节省内存和性能优化

《Python如何使用__slots__实现节省内存和性能优化》你有想过,一个小小的__slots__能让你的Python类内存消耗直接减半吗,没错,今天咱们要聊的就是这个让人眼前一亮的技巧,感兴趣的... 目录背景:内存吃得满满的类__slots__:你的内存管理小助手举个大概的例子:看看效果如何?1.

Python+PyQt5实现多屏幕协同播放功能

《Python+PyQt5实现多屏幕协同播放功能》在现代会议展示、数字广告、展览展示等场景中,多屏幕协同播放已成为刚需,下面我们就来看看如何利用Python和PyQt5开发一套功能强大的跨屏播控系统吧... 目录一、项目概述:突破传统播放限制二、核心技术解析2.1 多屏管理机制2.2 播放引擎设计2.3 专

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

idea中创建新类时自动添加注释的实现

《idea中创建新类时自动添加注释的实现》在每次使用idea创建一个新类时,过了一段时间发现看不懂这个类是用来干嘛的,为了解决这个问题,我们可以设置在创建一个新类时自动添加注释,帮助我们理解这个类的用... 目录前言:详细操作:步骤一:点击上方的 文件(File),点击&nbmyHIgsp;设置(Setti

SpringBoot实现MD5加盐算法的示例代码

《SpringBoot实现MD5加盐算法的示例代码》加盐算法是一种用于增强密码安全性的技术,本文主要介绍了SpringBoot实现MD5加盐算法的示例代码,文中通过示例代码介绍的非常详细,对大家的学习... 目录一、什么是加盐算法二、如何实现加盐算法2.1 加盐算法代码实现2.2 注册页面中进行密码加盐2.

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

一文详解如何从零构建Spring Boot Starter并实现整合

《一文详解如何从零构建SpringBootStarter并实现整合》SpringBoot是一个开源的Java基础框架,用于创建独立、生产级的基于Spring框架的应用程序,:本文主要介绍如何从... 目录一、Spring Boot Starter的核心价值二、Starter项目创建全流程2.1 项目初始化(

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

MySQL INSERT语句实现当记录不存在时插入的几种方法

《MySQLINSERT语句实现当记录不存在时插入的几种方法》MySQL的INSERT语句是用于向数据库表中插入新记录的关键命令,下面:本文主要介绍MySQLINSERT语句实现当记录不存在时... 目录使用 INSERT IGNORE使用 ON DUPLICATE KEY UPDATE使用 REPLACE