4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)

本文主要是介绍4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.基本程序结构

2.创建表环境

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

3.2 连接到文件系统(Csv 格式)

3.3 连接到 Kafka

4.代码示例


1.基本程序结构

        Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。具体操作流程如下:

StreamTableEnvironment tableEnv = ... // 创建表的执行环境// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");// 将结果表写入输出表中
result.insertInto("outputTable");

2.创建表环境

        创建表环境最简单的方式,就是基于流处理执行环境,调 create 方法直接创建:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的核心概念。它负责:⚫ 注册 catalog在内部 catalog 中注册表执行 SQL 查询注册用户自定义函数UDF ⚫ 将 DataStream 或 DataSet 转换为表 ⚫ 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

        在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数, 可以用来配置 TableEnvironment 的一些特性。

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

        TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的 map。

        表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database) 名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

        表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。

3.2 连接到文件系统(Csv 格式)

        连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink 内部已经提供了,就叫做 FileSystem()。代码如下:

tableEnv.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接.withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())) // 定义表结构.createTemporaryTable("inputTable"); // 创建临时表

        这是旧版本的 csv 格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被 弃用,以后会被一个符合 RFC-4180 标准的新 format 描述器取代。新的描述器就叫 Csv(),但 flink 没有直接提供,需要引入依赖 flink-csv:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version>
</dependency>

        代码非常类似,只需要把 withFormat 里的 OldCsv 改成 Csv 就可以了。

3.3 连接到 Kafka

        kafka 的连接器 flink-kafka-connector 中,1.10 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 ConnectorDescriptor。

tableEnv.connect(new Kafka().version("0.11") // 定义 kafka 的版本.topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");

        当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是类似的。

4.代码示例

package com.atguigu.apitest.tableapi;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class TableTest2_CommonApi {public static void main(String[] args) throws Exception{// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 基于老版本planner的流处理EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);// 1.2 基于老版本planner的批处理ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);// 1.3 基于Blink的流处理EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);// 1.4 基于Blink的批处理EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);// 2. 表的创建:连接外部系统,读取数据// 2.1 读取文件String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";tableEnv.connect( new FileSystem().path(filePath)).withFormat( new Csv()).withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();// 3. 查询转换// 3.1 Table API// 简单转换Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQLtableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");// 打印输出tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(aggTable, Row.class).print("agg");tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");env.execute();}
}

 

这篇关于4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754485

相关文章

mysqld_multi在Linux服务器上运行多个MySQL实例

《mysqld_multi在Linux服务器上运行多个MySQL实例》在Linux系统上使用mysqld_multi来启动和管理多个MySQL实例是一种常见的做法,这种方式允许你在同一台机器上运行多个... 目录1. 安装mysql2. 配置文件示例配置文件3. 创建数据目录4. 启动和管理实例启动所有实例

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加

使用DeepSeek API 结合VSCode提升开发效率

《使用DeepSeekAPI结合VSCode提升开发效率》:本文主要介绍DeepSeekAPI与VisualStudioCode(VSCode)结合使用,以提升软件开发效率,具有一定的参考价值... 目录引言准备工作安装必要的 VSCode 扩展配置 DeepSeek API1. 创建 API 请求文件2.

解决IDEA使用springBoot创建项目,lombok标注实体类后编译无报错,但是运行时报错问题

《解决IDEA使用springBoot创建项目,lombok标注实体类后编译无报错,但是运行时报错问题》文章详细描述了在使用lombok的@Data注解标注实体类时遇到编译无误但运行时报错的问题,分析... 目录问题分析问题解决方案步骤一步骤二步骤三总结问题使用lombok注解@Data标注实体类,编译时

VScode连接远程Linux服务器环境配置图文教程

《VScode连接远程Linux服务器环境配置图文教程》:本文主要介绍如何安装和配置VSCode,包括安装步骤、环境配置(如汉化包、远程SSH连接)、语言包安装(如C/C++插件)等,文中给出了详... 目录一、安装vscode二、环境配置1.中文汉化包2.安装remote-ssh,用于远程连接2.1安装2

将sqlserver数据迁移到mysql的详细步骤记录

《将sqlserver数据迁移到mysql的详细步骤记录》:本文主要介绍将SQLServer数据迁移到MySQL的步骤,包括导出数据、转换数据格式和导入数据,通过示例和工具说明,帮助大家顺利完成... 目录前言一、导出SQL Server 数据二、转换数据格式为mysql兼容格式三、导入数据到MySQL数据

C++中使用vector存储并遍历数据的基本步骤

《C++中使用vector存储并遍历数据的基本步骤》C++标准模板库(STL)提供了多种容器类型,包括顺序容器、关联容器、无序关联容器和容器适配器,每种容器都有其特定的用途和特性,:本文主要介绍C... 目录(1)容器及简要描述‌php顺序容器‌‌关联容器‌‌无序关联容器‌(基于哈希表):‌容器适配器‌:(

MySQL分表自动化创建的实现方案

《MySQL分表自动化创建的实现方案》在数据库应用场景中,随着数据量的不断增长,单表存储数据可能会面临性能瓶颈,例如查询、插入、更新等操作的效率会逐渐降低,分表是一种有效的优化策略,它将数据分散存储在... 目录一、项目目的二、实现过程(一)mysql 事件调度器结合存储过程方式1. 开启事件调度器2. 创

SQL Server使用SELECT INTO实现表备份的代码示例

《SQLServer使用SELECTINTO实现表备份的代码示例》在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误,在SQLServer中,可以使用SELECTINT... 在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误。在 SQL Server 中,可以使用 SE

一文详解Python中数据清洗与处理的常用方法

《一文详解Python中数据清洗与处理的常用方法》在数据处理与分析过程中,缺失值、重复值、异常值等问题是常见的挑战,本文总结了多种数据清洗与处理方法,文中的示例代码简洁易懂,有需要的小伙伴可以参考下... 目录缺失值处理重复值处理异常值处理数据类型转换文本清洗数据分组统计数据分箱数据标准化在数据处理与分析过