1.18.2.8与DataStream和DataSet API结合,Scala隐式转换,通过DataSet或DataStream创建视图,将DataStream或DataSet转换成表 等

本文主要是介绍1.18.2.8与DataStream和DataSet API结合,Scala隐式转换,通过DataSet或DataStream创建视图,将DataStream或DataSet转换成表 等,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.18.2.8.与DataStream和DataSet API结合
1.18.2.8.1.Scala隐式转换
1.18.2.8.2.通过DataSet或DataStream创建视图
1.18.2.8.3.将DataStream或DataSet转换成表
1.18.2.8.4.将表转换成DataStream或DataSet
1.18.2.8.5.将表转换成DataStream
1.18.2.8.5.1.将表转换成DataSet
1.18.2.8.6.数据类型到Table Schema的映射
1.18.2.8.6.1.原子类型
1.18.2.8.6.2.Tuple类型(Scala和Java)和 Case Class类型(仅Scala)
1.18.2.8.6.3.POJO类型(Java和Scala)
1.18.2.8.6.4.Row类型

1.18.2.8.与DataStream和DataSet API结合

在流处理方面两种计划器都可以与 DataStream API 结合。只有旧计划器可以与 DataSet API 结合。在批处理方面,Blink 计划器不能同两种计划器中的任何一个结合。

**注意:**下文讨论的DataSet API只与旧计划器有关。

Table API 和 SQL 可以被很容易地集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查询外部表(例如从 RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据 join,然后使用 DataStream 或 DataSet API(以及在这些 API 之上构建的任何库,例如 CEP 或 Gelly)。相反,也可以将 Table API 或 SQL 查询应用于 DataStream 或 DataSet 程序的结果。

这种交互可以通过 DataStream 或 DataSet 与 Table 的相互转化实现。

1.18.2.8.1.Scala隐式转换

Scala Table API 含有对 DataSet、DataStream 和 Table 类的隐式转换。 通过为 Scala DataStream API 导入org.apache.flink.table.api.bridge.scala._ 包以及 org.apache.flink.api.scala._ 包,可以启用这些转换。

1.18.2.8.2.通过DataSet或DataStream创建视图

在TableEnvironment中可以将DataStream或DataSet注册成视图。结果视图的schema取决于注册的DataStream或DataSet的数据类型。请参阅文档“数据类型到table schema的映射”(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#mapping-of-data-types-to-table-schema)获取详细信息。

**注意:**通过 DataStream 或 DataSet 创建的视图只能注册成临时视图。
Java代码版

package com.toto.demo.sql;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import scala.Tuple2;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {// get StreamTableEnvironmentStreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" sectionDataStream<Tuple2<Long, String>> stream = ...// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));        
}}

Scala版

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)// or val bsTableEnv = TableEnvironment.create(bsSettings)// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString')}}
1.18.2.8.3.将DataStream或DataSet转换成表

与在TableEnvironment 注册 DataStream 或 DataSet 不同,DataStream 和 DataSet 还可以直接转换成Table。如果你想在 Table API 的查询中使用表,这将非常便捷。
Java代码版

package com.toto.demo.sql;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import scala.Tuple2;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {// get StreamTableEnvironmentStreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);DataStream<Tuple2<Long, String>> stream = ...//Convert the DataStream into a Table with default fields "f0", "f1"Table table1 = tableEnv.fromDataStream(stream);// Convert the DataStream into a Table with fields "myLong", "myString"Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));}}

Scala版

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)// or val bsTableEnv = TableEnvironment.create(bsSettings)// get TableEnvironment// registration of a DataSet is equivalentval tableEnv = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// convert the DataStream into a Table with default fields "_1", "_2"val table1: Table = tableEnv.fromDataStream(stream)// convert the DataStream into a Table with fields "myLong", "myString"val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")}}
1.18.2.8.4.将表转换成DataStream或DataSet

Table可以被转换成 DataStream 或 DataSet。通过这种方式,定制的 DataSet 或 DataStream 程序就可以在 Table API 或者 SQL 的查询结果上运行了。

将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即,Table 的每行数据要转换成的数据类型。通常最方便的选择是转换成 Row 。以下列表概述了不同选项的功能:
Row: 字段按位置映射,字段数量任意,支持 null 值,无类型安全(type-safe)检查。
POJO: 字段按名称映射(POJO 必须按Table 中字段名称命名),字段数量任意,支持 null 值,无类型安全检查。
Case Class: 字段按位置映射,不支持 null 值,有类型安全检查。
Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持 null 值,无类型安全检查。
Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。

1.18.2.8.5.将表转换成DataStream

流式查询(streaming query)的结果表会动态更新,即,当新纪录到达查询的输入流时,查询结果会改变。因此,像这样将动态查询结果转换成 DataStream 需要对表的更新方式进行编码。

将Table 转换为 DataStream 有两种模式:
1.Append Mode: 仅当动态Table 仅通过INSERT更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
2.Retract Mode: 任何情形都可以使用此模式。它使用 boolean 值对 INSERT 和 DELETE 操作的数据进行标记。
Java代码

package com.toto.demo.sql;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Demo {public static void main(String[] args) {// get StreamTableEnvironmentStreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);Table table = ...// convert the Table into an append DataStream of Row by specifying the classDataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);// convert the Table into an append DataStream of Tuple2<String, Integer>//   via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING,Types.INT);DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);//convert the Table into a retract DataStream of Row.//A retract stream of type X  is a DataStream<Tuple2<Boolean,X>>.//The boolean field indicates the type of change.//True is INSERT,false is DELETE.DataStream<Tuple2<Boolean,Row>> retractStream =tableEnv.toRetractStream(table, Row.class);}}

Scala代码

package com.toto.demo.sql;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Demo {public static void main(String[] args) {// get StreamTableEnvironmentStreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);Table table = ...// convert the Table into an append DataStream of Row by specifying the classDataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);// convert the Table into an append DataStream of Tuple2<String, Integer>//   via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING,Types.INT);DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);//convert the Table into a retract DataStream of Row.//A retract stream of type X  is a DataStream<Tuple2<Boolean,X>>.//The boolean field indicates the type of change.//True is INSERT,false is DELETE.DataStream<Tuple2<Boolean,Row>> retractStream =tableEnv.toRetractStream(table, Row.class);}}

注意: 文档动态表给出了有关动态表及其属性的详细讨论。
**注意:**一旦Table被转化为DataStream,必须使用StreamExecutionEnvironment 的 execute 方法执行该 DataStream作业。

1.18.2.8.5.1.将表转换成DataSet

将Table转换成DataSet的过程如下:
Java代码

package com.toto.demo.sql;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class Demo {public static void main(String[] args) {ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);// Table with two fields (String name, Integer age)Table table = ...// convert the Table into a DataSet of Row by specifying a classDataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);//convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String,Integer>> tupleType = new TupleTypeInfo<>(Types.STRING,Types.INT);DataSet<Tuple2<String,Integer>> dsTuple = tableEnv.toDataSet(table,tupleType);}}

Scala代码:

package com.toto.learn.sqlimport org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.types.Rowobject Demo {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval tableEnv = BatchTableEnvironment.create(env)// Table with two fields (String name, Integer age)val table: Table = ...// convert the Table into a DataSet of Rowval dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)// convert the Table into a DataSet of Tuple2[String, Int]val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)}}

**注意:**一旦Table被转化为DataSet,必须使用ExecutionEnvironment 的 execute 方法执行该 DataSet作业。

1.18.2.8.6.数据类型到Table Schema的映射

Flink 的 DataStream 和 DataSet APIs 支持多样的数据类型。例如 Tuple(Scala 内置以及Flink Java tuple)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 DataStream 转换成 Table 的样例。

数据类型到table schema的映射有两种方式:基于字段位置或基于字段名称。

基于位置映射
基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射(参见下一章)。可以将字段投影出来,但不能使用as重命名。

定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则 API 会假定应该基于字段名称进行映射。如果未指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。

Java代码

package com.toto.demo.sql;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);DataStream<Tuple2<Long, Integer>> stream = ...// convert DataStream into Table with default field names "f0" and "f1"Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with field "myLong" onlyTable table2 = tableEnv.fromDataStream(stream, $("myLong"));// convert DataStream into Table with field names "myLong" and "myInt"Table table3 = tableEnv.fromDataStream(stream, $("myLong"), $("myInt"));}}

Scala代码

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)// or val tableEnv = TableEnvironment.create(bsSettings)val stream: DataStream[(Long, Int)] = ...// convert DataStream into Table with default field names "_1" and "_2"val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with field "myLong" onlyval table: Table = tableEnv.fromDataStream(stream, $"myLong")// convert DataStream into Table with field names "myLong" and "myInt"val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt")}}

基于名称的映射
基于名称的映射适用于任何数据类型包括POJO类型。这是定义table schema映射最灵活的方式。映射中的所有字段均按名称引用,并且可以通过as重命名。字段可以被重新排序和映射。

若果没有指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
Java代码

package com.toto.demo.sql;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);DataStream<Tuple2<Long, Integer>> stream = ...// convert DataStream into Table with default field names "f0" and "f1"Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with swapped fieldsTable table2 = tableEnv.fromDataStream(stream, $("f1"), $("f0"));// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"Table table3 = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));}}

Scala代码

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)// or val tableEnv = TableEnvironment.create(bsSettings)val stream: DataStream[(Long, Int)] = ...// convert DataStream into Table with default field names "_1" and "_2"val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with field "_2" onlyval table: Table = tableEnv.fromDataStream(stream, $"_2")// convert DataStream into Table with swapped fieldsval table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong")}}
1.18.2.8.6.1.原子类型

Flink将基础数据类型(Integer、Double、String)或者通用数据类型(不可再拆分的数据类型)视为原子类型。原子类型的DataStream 或者 DataSet 会被转换成只有一条属性的Table。属性的数据类型可以由原子类型推断出,还可以重新命名属性。
Java代码:

package com.toto.demo.sql;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);DataStream<Long> stream = ...// convert DataStream into Table with default field name "f0"Table table = tableEnv.fromDataStream(stream);// convert DataStream into Table with field name "myLong"Table table2 = tableEnv.fromDataStream(stream, $("myLong"));}}

Scala代码

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)// or val tableEnv = TableEnvironment.create(bsSettings)val stream: DataStream[(Long, Int)] = ...// convert DataStream into Table with default field names "_1" and "_2"val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with field "myLong"val table: Table = tableEnv.fromDataStream(stream, $"myLong")}}
1.18.2.8.6.2.Tuple类型(Scala和Java)和 Case Class类型(仅Scala)

Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类型。两种 tuple 的 DataStream 和 DataSet 都能被转换成表。可以通过提供所有字段名称来重命名字段(基于位置映射)。如果没有指明任何字段名称,则会使用默认的字段名称。如果引用了原始字段名称(对于 Flink tuple 为f0、f1 … …,对于 Scala tuple 为_1、_2 … …),则 API 会假定映射是基于名称的而不是基于位置的。基于名称的映射可以通过 as 对字段和投影进行重新排序。
Java代码:

package com.toto.demo.sql;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import scala.Tuple2;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);DataStream<Tuple2<Long, String>> stream = ...// convert DataStream into Table with default field names "f0", "f1"Table table1 = tableEnv.fromDataStream(stream);// convert DataStream into Table with renamed field names "myLong", "myString" (position-based)Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));// convert DataStream into Table with reordered fields "f1", "f0" (name-based)Table table3 = tableEnv.fromDataStream(stream, $("f1"), $("f0"));// convert DataStream into Table with projected field "f1" (name-based)Table table4 = tableEnv.fromDataStream(stream, $("f1"));// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)Table table5 = tableEnv.fromDataStream(stream, $("f1").as("myString"), $("f0").as("myLong"));}}

Scala代码:

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)// or val tableEnv = TableEnvironment.create(bsSettings)val stream: DataStream[(Long, Int)] = ...// convert DataStream into Table with renamed default field names '_1, '_2val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with field names "myLong", "myString" (position-based)val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")// convert DataStream into Table with reordered fields "_2", "_1" (name-based)val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")// convert DataStream into Table with projected field "_2" (name-based)val table: Table = tableEnv.fromDataStream(stream, $"_2")// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong")// define case classcase class Person(name: String, age: Int)val streamCC: DataStream[Person] = ...// convert DataStream into Table with default field names 'name, 'ageval table = tableEnv.fromDataStream(streamCC)// convert DataStream into Table with field names 'myName, 'myAge (position-based)val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge")// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")}}
1.18.2.8.6.3.POJO类型(Java和Scala)

FLINK支持POJO类型作为复合类型

在不指定字段名称的情况下将 POJO 类型的 DataStream 或 DataSet 转换成 Table 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 as 关键字)来重命名,重新排序和投影。

Java代码:

package com.toto.demo.sql;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);// Person is a POJO with fields "name" and "age"DataStream<Person> stream = ...// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)Table table1 = tableEnv.fromDataStream(stream);// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)Table table2 = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));// convert DataStream into Table with projected field "name" (name-based)Table table3 = tableEnv.fromDataStream(stream, $("name"));// convert DataStream into Table with projected and renamed field "myName" (name-based)Table table4 = tableEnv.fromDataStream(stream, $("name").as("myName"));}}

Scala代码:

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name")// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
1.18.2.8.6.4.Row类型

Row 类型支持任意数量的字段以及具有 null 值的字段。字段名称可以通过 RowTypeInfo 指定,也可以在将 Row 的 DataStream 或 DataSet 转换为Table 时指定。Row类型的字段映射支持基于名称和基于位置两种方式。字段可以通过提供所有字段的名称的方式重命名(基于位置映射)或者分别选择进行投影/排序/重命名(基于名称映射)。
Java代码

package com.toto.demo.sql;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);// Person is a POJO with fields "name" and "age"DataStream<Row> stream = ...// convert DataStream into Table with default field names "name", "age"Table table1 = tableEnv.fromDataStream(stream);// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)Table table2 = tableEnv.fromDataStream(stream, $("myName"), $("myAge"));// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)Table table3 = tableEnv.fromDataStream(stream, $("name").as("myName"), $("age").as("myAge"));// convert DataStream into Table with projected field "name" (name-based)Table table4 = tableEnv.fromDataStream(stream, $("name"));// convert DataStream into Table with projected and renamed field "myName" (name-based)Table table5 = tableEnv.fromDataStream(stream, $("name").as("myName"));}}

Scala代码

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Rowobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)// or val tableEnv = TableEnvironment.create(bsSettings)// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`val stream: DataStream[Row] = ...// convert DataStream into Table with default field names "name", "age"val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)val table: Table = tableEnv.fromDataStream(stream, $"myName", $"myAge")// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName", $"age" as "myAge")// convert DataStream into Table with projected field "name" (name-based)val table: Table = tableEnv.fromDataStream(stream, $"name")// convert DataStream into Table with projected and renamed field "myName" (name-based)val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")}}

这篇关于1.18.2.8与DataStream和DataSet API结合,Scala隐式转换,通过DataSet或DataStream创建视图,将DataStream或DataSet转换成表 等的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/918116

相关文章

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

手把手教你idea中创建一个javaweb(webapp)项目详细图文教程

《手把手教你idea中创建一个javaweb(webapp)项目详细图文教程》:本文主要介绍如何使用IntelliJIDEA创建一个Maven项目,并配置Tomcat服务器进行运行,过程包括创建... 1.启动idea2.创建项目模板点击项目-新建项目-选择maven,显示如下页面输入项目名称,选择

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

Java将时间戳转换为Date对象的方法小结

《Java将时间戳转换为Date对象的方法小结》在Java编程中,处理日期和时间是一个常见需求,特别是在处理网络通信或者数据库操作时,本文主要为大家整理了Java中将时间戳转换为Date对象的方法... 目录1. 理解时间戳2. Date 类的构造函数3. 转换示例4. 处理可能的异常5. 考虑时区问题6.

基于C#实现将图片转换为PDF文档

《基于C#实现将图片转换为PDF文档》将图片(JPG、PNG)转换为PDF文件可以帮助我们更好地保存和分享图片,所以本文将介绍如何使用C#将JPG/PNG图片转换为PDF文档,需要的可以参考下... 目录介绍C# 将单张图片转换为PDF文档C# 将多张图片转换到一个PDF文档介绍将图片(JPG、PNG)转

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

webm怎么转换成mp4?这几种方法超多人在用!

webm怎么转换成mp4?WebM作为一种新兴的视频编码格式,近年来逐渐进入大众视野,其背后承载着诸多优势,但同时也伴随着不容忽视的局限性,首要挑战在于其兼容性边界,尽管WebM已广泛适应于众多网站与软件平台,但在特定应用环境或老旧设备上,其兼容难题依旧凸显,为用户体验带来不便,再者,WebM格式的非普适性也体现在编辑流程上,由于它并非行业内的通用标准,编辑过程中可能会遭遇格式不兼容的障碍,导致操

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

多重背包转换成0-1背包

http://acm.hdu.edu.cn/showproblem.php?pid=2191 多重背包特点: 一种物品有C个(既不是固定的1个,也不是无数个) 优化的方法: 运用神奇的二进制,进行物品拆分,转化成01背包 物品拆分,把13个相同的物品分成4组(1,2,4,6) 用这4组可以组成任意一个1~13之间的数! 原理:一个数总可以用2^