本文主要是介绍详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、表转换为 DataStream
/**Table API 中表到 DataStream 有两种模式:1.追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景。2.撤回模式(Retract Mode):用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据,Delete)
*/
public class TableTransformDataStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//注册表tableEnv.connect(new FileSystem().path("./sensor.txt")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())).createTemporaryTable("sensor");//获取表Table sensorTable = tableEnv.from("sensor");//简单查询Table resultTable = sensorTable.select("id, temperature").where("id = 'sensor_1'");//聚合查询统计Table aggTable = sensorTable.groupBy("id").select("id, id.count as cnt, temperature.avg as avgTemp");DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);DataStream<Tuple2<Boolean, Row>> aggStream = tableEnv.toRetractStream(aggTable, Row.class);resultStream.print("result");aggStream.print("agg");env.execute();}
}
二、DataStream 转换为表
public class DataStreamTransformTable {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//使用 fromDataStream 将 DataStream 转换为表Table table = tableEnv.fromDataStream(dataStream);//Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");env.execute();}
}
三、查看执行计划
/**Table API 提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或 TableEnvironment.explain() 方法完成的。explain 方法会返回一个字符串,描述三个计划:1.未优化的逻辑查询计划2.优化后的逻辑查询计划3.实际执行计划
*/
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);
这篇关于详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!