4.2.3 Flink-流处理框架-Table API 与 SQL-流转表+表转流+创建临时视图(Temporary View)

本文主要是介绍4.2.3 Flink-流处理框架-Table API 与 SQL-流转表+表转流+创建临时视图(Temporary View),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.DataStream流转表Table

1.1 基本概念

1.2 实现代码

1.3 数据类型与Table schema 的对应

2.表转流

2.1 基本概念

2.2 Table API 中表到 DataStream 有两种模式

2.3 实现代码

3.创建临时视图(Temporary View)


1.DataStream流转表Table

1.1 基本概念

        Flink 允许我们把 Table 和 DataStream 做转换:我们可以基于一个 DataStream,先流式 地读取数据源,然后 map 成 POJO,再把它转成 Table。Table 的列字段(column fields),就是 POJO 里的字段,这样就不用再麻烦地定义 schema 了。

1.2 实现代码

        代码中实现非常简单,直接用 tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次 map 操作(或者 Table API 的 select 操作)。

        代码具体如下:

DataStream<String> inputStream = env.readTextFile("sensor.txt");
DataStream<SensorReading> dataStream = inputStream.map( line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));} );
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");

1.3 数据类型与Table schema 的对应

        在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用 as 做重命名。

Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");

        Flink 的 DataStream 和 DataSet API 支持多种类型。

        组合类型,比如元组(内置 Scala 和 Java 元组)、POJO、Scala case 类和 Flink 的 Row 类型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类型,则被视为原子类型。

2.表转流

2.1 基本概念

        表可以转换为 DataStream 或 DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了。将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是 Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。

2.2 Table API 中表到 DataStream 有两种模式

        表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样 需要对表的更新操作进行编码,进而有不同的转换模式。

2.3 实现代码

        没有经过 groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable, Row.class);resultStream.print("result");
aggResultStream.print("aggResult");

3.创建临时视图(Temporary View)

        创建临时视图的第一种方式,就是直接从 DataStream 转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。代码如下:

tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");

        另外,当然还可以基于 Table 创建视图

tableEnv.createTemporaryView("sensorView", sensorTable);

这篇关于4.2.3 Flink-流处理框架-Table API 与 SQL-流转表+表转流+创建临时视图(Temporary View)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754486

相关文章

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

使用Python和Pyecharts创建交互式地图

《使用Python和Pyecharts创建交互式地图》在数据可视化领域,创建交互式地图是一种强大的方式,可以使受众能够以引人入胜且信息丰富的方式探索地理数据,下面我们看看如何使用Python和Pyec... 目录简介Pyecharts 简介创建上海地图代码说明运行结果总结简介在数据可视化领域,创建交互式地

Pandas透视表(Pivot Table)的具体使用

《Pandas透视表(PivotTable)的具体使用》透视表用于在数据分析和处理过程中进行数据重塑和汇总,本文就来介绍一下Pandas透视表(PivotTable)的具体使用,感兴趣的可以了解一下... 目录前言什么是透视表?使用步骤1. 引入必要的库2. 读取数据3. 创建透视表4. 查看透视表总结前言

MySQL 中的 JSON 查询案例详解

《MySQL中的JSON查询案例详解》:本文主要介绍MySQL的JSON查询的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql 的 jsON 路径格式基本结构路径组件详解特殊语法元素实际示例简单路径复杂路径简写操作符注意MySQL 的 J

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

Windows 上如果忘记了 MySQL 密码 重置密码的两种方法

《Windows上如果忘记了MySQL密码重置密码的两种方法》:本文主要介绍Windows上如果忘记了MySQL密码重置密码的两种方法,本文通过两种方法结合实例代码给大家介绍的非常详细,感... 目录方法 1:以跳过权限验证模式启动 mysql 并重置密码方法 2:使用 my.ini 文件的临时配置在 Wi

一文详解Java异常处理你都了解哪些知识

《一文详解Java异常处理你都了解哪些知识》:本文主要介绍Java异常处理的相关资料,包括异常的分类、捕获和处理异常的语法、常见的异常类型以及自定义异常的实现,文中通过代码介绍的非常详细,需要的朋... 目录前言一、什么是异常二、异常的分类2.1 受检异常2.2 非受检异常三、异常处理的语法3.1 try-

MySQL重复数据处理的七种高效方法

《MySQL重复数据处理的七种高效方法》你是不是也曾遇到过这样的烦恼:明明系统测试时一切正常,上线后却频频出现重复数据,大批量导数据时,总有那么几条不听话的记录导致整个事务莫名回滚,今天,我就跟大家分... 目录1. 重复数据插入问题分析1.1 问题本质1.2 常见场景图2. 基础解决方案:使用异常捕获3.