详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换

2024-06-11 19:36

本文主要是介绍详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、表转换为 DataStream

/**Table API 中表到 DataStream 有两种模式:1.追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景。2.撤回模式(Retract Mode):用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据,Delete)
*/
public class TableTransformDataStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//注册表tableEnv.connect(new FileSystem().path("./sensor.txt")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())).createTemporaryTable("sensor");//获取表Table sensorTable = tableEnv.from("sensor");//简单查询Table resultTable = sensorTable.select("id, temperature").where("id = 'sensor_1'");//聚合查询统计Table aggTable = sensorTable.groupBy("id").select("id, id.count as cnt, temperature.avg as avgTemp");DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);DataStream<Tuple2<Boolean, Row>> aggStream = tableEnv.toRetractStream(aggTable, Row.class);resultStream.print("result");aggStream.print("agg");env.execute();}
}

二、DataStream 转换为表

public class DataStreamTransformTable {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//使用 fromDataStream 将 DataStream 转换为表Table table = tableEnv.fromDataStream(dataStream);//Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");env.execute();}
}

三、查看执行计划

/**Table API 提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或 TableEnvironment.explain() 方法完成的。explain 方法会返回一个字符串,描述三个计划:1.未优化的逻辑查询计划2.优化后的逻辑查询计划3.实际执行计划
*/
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

这篇关于详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1052090

相关文章

SQL server数据库如何下载和安装

《SQLserver数据库如何下载和安装》本文指导如何下载安装SQLServer2022评估版及SSMS工具,涵盖安装配置、连接字符串设置、C#连接数据库方法和安全注意事项,如混合验证、参数化查... 目录第一步:打开官网下载对应文件第二步:程序安装配置第三部:安装工具SQL Server Manageme

C#连接SQL server数据库命令的基本步骤

《C#连接SQLserver数据库命令的基本步骤》文章讲解了连接SQLServer数据库的步骤,包括引入命名空间、构建连接字符串、使用SqlConnection和SqlCommand执行SQL操作,... 目录建议配合使用:如何下载和安装SQL server数据库-CSDN博客1. 引入必要的命名空间2.

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

全面掌握 SQL 中的 DATEDIFF函数及用法最佳实践

《全面掌握SQL中的DATEDIFF函数及用法最佳实践》本文解析DATEDIFF在不同数据库中的差异,强调其边界计算原理,探讨应用场景及陷阱,推荐根据需求选择TIMESTAMPDIFF或inte... 目录1. 核心概念:DATEDIFF 究竟在计算什么?2. 主流数据库中的 DATEDIFF 实现2.1

MySQL 多列 IN 查询之语法、性能与实战技巧(最新整理)

《MySQL多列IN查询之语法、性能与实战技巧(最新整理)》本文详解MySQL多列IN查询,对比传统OR写法,强调其简洁高效,适合批量匹配复合键,通过联合索引、分批次优化提升性能,兼容多种数据库... 目录一、基础语法:多列 IN 的两种写法1. 直接值列表2. 子查询二、对比传统 OR 的写法三、性能分析

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

Spring Boot spring-boot-maven-plugin 参数配置详解(最新推荐)

《SpringBootspring-boot-maven-plugin参数配置详解(最新推荐)》文章介绍了SpringBootMaven插件的5个核心目标(repackage、run、start... 目录一 spring-boot-maven-plugin 插件的5个Goals二 应用场景1 重新打包应用

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

浅谈mysql的not exists走不走索引

《浅谈mysql的notexists走不走索引》在MySQL中,​NOTEXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引,下面就来介绍一下mysql的notexists走不走索... 在mysql中,​NOT EXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引。以下