本文主要是介绍4.2.3 Flink-流处理框架-Table API 与 SQL-流转表+表转流+创建临时视图(Temporary View),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
1.DataStream流转表Table
1.1 基本概念
1.2 实现代码
1.3 数据类型与Table schema 的对应
2.表转流
2.1 基本概念
2.2 Table API 中表到 DataStream 有两种模式
2.3 实现代码
3.创建临时视图(Temporary View)
1.DataStream流转表Table
1.1 基本概念
Flink 允许我们把 Table 和 DataStream 做转换:我们可以基于一个 DataStream,先流式 地读取数据源,然后 map 成 POJO,再把它转成 Table。Table 的列字段(column fields),就是 POJO 里的字段,这样就不用再麻烦地定义 schema 了。
1.2 实现代码
代码中实现非常简单,直接用 tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次 map 操作(或者 Table API 的 select 操作)。
代码具体如下:
DataStream<String> inputStream = env.readTextFile("sensor.txt");
DataStream<SensorReading> dataStream = inputStream.map( line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));} );
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");
1.3 数据类型与Table schema 的对应
在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用 as 做重命名。
Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");
Flink 的 DataStream 和 DataSet API 支持多种类型。
组合类型,比如元组(内置 Scala 和 Java 元组)、POJO、Scala case 类和 Flink 的 Row 类型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类型,则被视为原子类型。
2.表转流
2.1 基本概念
表可以转换为 DataStream 或 DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了。将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是 Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。
2.2 Table API 中表到 DataStream 有两种模式
表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样 需要对表的更新操作进行编码,进而有不同的转换模式。
2.3 实现代码
没有经过 groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream。
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable, Row.class);resultStream.print("result");
aggResultStream.print("aggResult");
3.创建临时视图(Temporary View)
创建临时视图的第一种方式,就是直接从 DataStream 转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。代码如下:
tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");
另外,当然还可以基于 Table 创建视图:
tableEnv.createTemporaryView("sensorView", sensorTable);
这篇关于4.2.3 Flink-流处理框架-Table API 与 SQL-流转表+表转流+创建临时视图(Temporary View)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!