解析 flink sql 转化成flink job

2024-06-24 01:44

本文主要是介绍解析 flink sql 转化成flink job,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 背景
    • 流程
    • flink实例
    • 实现细节
      • 定义的规则
      • 定义的物理算子
      • 定义的flink exec node

背景

在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要

流程

在这里插入图片描述

flink实例

public class BatchExample {public static void main(String[] args) {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// 创建一个内置示例源表String sourceDDL = "CREATE TABLE users (\n" +"    id INT,\n" +"    name STRING,\n" +"    age INT\n" +") WITH (\n" +"    'connector' = 'filesystem',\n" +"    'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +"    'format' = 'csv'\n" +");";tableEnv.executeSql(sourceDDL);Table table = tableEnv.sqlQuery("select * from users limit 1 ");String explanation = tableEnv.explainSql("select * from users limit 1 ");System.out.println(explanation);table.execute().print();}
}

输出结果

== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])+- LogicalTableScan(table=[[default_catalog, default_database, users]])== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

实现细节

主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode

定义的规则

class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {/** Rule must only match if TableScan targets a bounded [[ScanTableSource]] *///规则只匹配有界的ScanTableSourceoverride def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0).asInstanceOf[TableScan]val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])tableSourceTable match {case tst: TableSourceTable =>tst.tableSource match {case sts: ScanTableSource =>sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBoundedcase _ => false}case _ => false}}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)//在这里转成自定义的relnode new BatchPhysicalTableSourceScan(rel.getCluster,newTrait,scan.getHints,scan.getTable.asInstanceOf[TableSourceTable])}
}

定义的物理算子

也是一个relnode,实现类BatchPhysicalTableSourceScan

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}

定义的flink exec node

BatchExecTableSourceScan 类

 /// 主要是这个方法,看下下面的实现就比较熟悉了public Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment env,InputFormat<RowData, ?> inputFormat,InternalTypeInfo<RowData> outputTypeInfo,String operatorName) {// env.createInput will use ContinuousFileReaderOperator, but it do not support multiple// paths. If read partitioned source, after partition pruning, we need let InputFormat// to read multiple partitions which are multiple paths.// We can use InputFormatSourceFunction directly to support InputFormat.final InputFormatSourceFunction<RowData> function =new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);return env.addSource(function, operatorName, outputTypeInfo).getTransformation();}

这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成

这篇关于解析 flink sql 转化成flink job的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088847

相关文章

java解析jwt中的payload的用法

《java解析jwt中的payload的用法》:本文主要介绍java解析jwt中的payload的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java解析jwt中的payload1. 使用 jjwt 库步骤 1:添加依赖步骤 2:解析 JWT2. 使用 N

Python中__init__方法使用的深度解析

《Python中__init__方法使用的深度解析》在Python的面向对象编程(OOP)体系中,__init__方法如同建造房屋时的奠基仪式——它定义了对象诞生时的初始状态,下面我们就来深入了解下_... 目录一、__init__的基因图谱二、初始化过程的魔法时刻继承链中的初始化顺序self参数的奥秘默认

SQL BETWEEN 的常见用法小结

《SQLBETWEEN的常见用法小结》BETWEEN操作符是SQL中非常有用的工具,它允许你快速选取某个范围内的值,本文给大家介绍SQLBETWEEN的常见用法,感兴趣的朋友一起看看吧... 在SQL中,BETWEEN是一个操作符,用于选取介于两个值之间的数据。它包含这两个边界值。BETWEEN操作符常用

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

MySql match against工具详细用法

《MySqlmatchagainst工具详细用法》在MySQL中,MATCH……AGAINST是全文索引(Full-Textindex)的查询语法,它允许你对文本进行高效的全文搜素,支持自然语言搜... 目录一、全文索引的基本概念二、创建全文索引三、自然语言搜索四、布尔搜索五、相关性排序六、全文索引的限制七

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

使用Java将DOCX文档解析为Markdown文档的代码实现

《使用Java将DOCX文档解析为Markdown文档的代码实现》在现代文档处理中,Markdown(MD)因其简洁的语法和良好的可读性,逐渐成为开发者、技术写作者和内容创作者的首选格式,然而,许多文... 目录引言1. 工具和库介绍2. 安装依赖库3. 使用Apache POI解析DOCX文档4. 将解析

Java字符串处理全解析(String、StringBuilder与StringBuffer)

《Java字符串处理全解析(String、StringBuilder与StringBuffer)》:本文主要介绍Java字符串处理全解析(String、StringBuilder与StringBu... 目录Java字符串处理全解析:String、StringBuilder与StringBuffer一、St

数据库面试必备之MySQL中的乐观锁与悲观锁

《数据库面试必备之MySQL中的乐观锁与悲观锁》:本文主要介绍数据库面试必备之MySQL中乐观锁与悲观锁的相关资料,乐观锁适用于读多写少的场景,通过版本号检查避免冲突,而悲观锁适用于写多读少且对数... 目录一、引言二、乐观锁(一)原理(二)应用场景(三)示例代码三、悲观锁(一)原理(二)应用场景(三)示例

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三