Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析)

2024-05-02 07:18

本文主要是介绍Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

基于flink1.14的源码做解析

公司内有很多业务方都在使用我们Flink sql平台做TopN的计算,今天同事突然问到我,Flink sql 是怎么实现topN的 ?

蒙圈了,这块源码没看过啊 ,业务要问起来怎么办,赶快打开源码补一下

拿到这个问题先冷静分析一下范围

首先肯定属于Flink sql模块,源码里面肯定是在flink-table-planner包里面,接着topN那不就是ROW_NUMBER嘛,是个函数呀

既然如此那就从flink源码的系统函数作为线索开始找起来,来到 org.apache.calcite.sql.fun.SqlStdOperatorTable类

62cbd27f6d6aeba1a0cdb8652fafd6ff.png

果然找到了,那calcite的某个rule肯定有个地方判断了它,继续查调用链

4a1a0811ae7d09f8432fc4ab6a0699f2.png

不出所料,FlinkLogicalRankRuleBase这个calcite的rule里面果然根据这个function的类型来确定rank的类型了

看下这个rule的匹配条件

8ba4b04a31435316c51a05d1cb564de5.png

 这里也好理解,overAgg的时候会判断这个rank以及对应的类型

ebd8f6c371d7d957d272e34ac347b706.png

这是只是做了一下简单的提取了rank的字段啊,提取谓语啊,提取表达式啊这一些拿信息的操作

然后直接生成新的relNode叫FlinkLogicalRank通过transformTo直接返回了这个等价节点

既然是relNode那肯定又会有calcite的rule去处理它,来找一找

c4fc899e183119a6565b03e2cf6cbda3.png

批处理的就不管了,从名字就可以看出来我们要找的类了

看个不带window的吧

7635ecfcdaaca214f6b1cdfcdc264277.png

 返回StreamPhysicalRank

这个类是一个FlinkPhysicalRel是可以转换成execNode的

这里在多说一句,

这里将partitionKey传入了,就是sql里面的partition by后面的,后面会用这个来创建transformation的keySelecter用来分流数据

6ad46b7648b447a2ed9aa7abd64818ce.png

返回的这个StreamExecRank就是可以转换成具体的Flink的算子了,具体逻辑就在里面了

接下来看下row_number的具体逻辑,找到方法translateToPlanInternal

根据策略主要分为三种类型

AppendFastStrategy  (输入仅包含插入时)

RetractStrategy   (输入包含update和delete)

UpdateFastStrategy     (输入不应包含删除且输入有给定的primaryKeys且按字段排序时)

来看个retractStrategy的吧

c69e6575ec8aad4dca5167a7eac95989.png

先通过sort的字段获取一个用于排序RowData的比较器 ComparableRecordComparator

根据比较器创建 RetractableTopNFunction

这个类还有两个主要的状态数据结构

17f3912aafbb8b203fd36bdff225378d.png

dataState这个map用来存放当key相同的所有数据会放在同一个list里面

treeMap这个可排序的map就是通过上面我们sql里面定义的sort by 来排序数据的,Long是指这个相同的key有多少个record

!!!!!!!!!!!  那就是用java的treeMap排序呗

继续往下看

a8a541f6abc24463e027b40c08c34aa6.png

 主逻辑就是这个了

每进入一条数据,会根据这条数据的类型划分

当数据是Insert , UPDATE_AFTER类型是会走 emitRecordsWithRowNumber()方法

当数据是UPDATE_BEFORE,DELETE类型会走 retractRecordWithRowNumber ()方法

来看下具体逻辑先看INSERT的

452086b9d3bf9bf74d4c612f0325e65d.png

 遍历treeMap

165b92d3bab16cd4c3c67ff0889cd052.png

解读一下,当数据是insert数据的时候

INSERT数据会先放到treeMap里面去,delete则不会

按顺序遍历treeMap

当遍历过程中发现遍历的key与当前数据的key相同时,和当前数据key相同的所有数据数据(dataState中的LIST),全部撤回并且更新他们的rowNumber+1

继续遍历treeMap

之后的数据全部撤回UpdateBefore,并且向下游发送UpdateAfter使rowNumber+1,遍历直到已经到第TopN个数据循环结束

当数据是DELETE类型的时候,会和Insert反过来,当前key之后的数据全部撤回,然后rowNumber-1

整个处理流程差不多就结束了,可以看到rowNumber当N较大且排序变化频繁的时候,性能消耗还是非常大的,极端情况下游的数据会翻很多倍

这个还需要注意在其他两个策略中还有一个参数,table.exec.topn.cache-size

43befe3eb8dbc3270fc476a887adf90b.png

 影响下面这个本地lruCache的大小

70d960ad16f949f02b4f716748f2c5ca.png

 调大可以减少状态的访问,可以按需要添加

本文地址:https://www.cnblogs.com/ljygz/p/15428840.html

end

Flink 从入门到精通 系列文章基于 Apache Flink 的实时监控告警系统
关于数据中台的深度思考与总结(干干货)
日志收集Agent,阴暗潮湿的地底世界

2b3a7dd27d2e596e5596fea658707508.png

701b350afc1517a778ca25978c60c0d5.png

公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug 👇

这篇关于Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/953681

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟 开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚 第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

[MySQL表的增删改查-进阶]

🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 💻💻💻数据库约束 🔭🔭🔭约束类型 not null: 指示某列不能存储 NULL 值unique: 保证某列的每行必须有唯一的值default: 规定没有给列赋值时的默认值.primary key: