4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)

本文主要是介绍4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.基本程序结构

2.创建表环境

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

3.2 连接到文件系统(Csv 格式)

3.3 连接到 Kafka

4.代码示例


1.基本程序结构

        Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。具体操作流程如下:

StreamTableEnvironment tableEnv = ... // 创建表的执行环境// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");// 将结果表写入输出表中
result.insertInto("outputTable");

2.创建表环境

        创建表环境最简单的方式,就是基于流处理执行环境,调 create 方法直接创建:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的核心概念。它负责:⚫ 注册 catalog在内部 catalog 中注册表执行 SQL 查询注册用户自定义函数UDF ⚫ 将 DataStream 或 DataSet 转换为表 ⚫ 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

        在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数, 可以用来配置 TableEnvironment 的一些特性。

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

        TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的 map。

        表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database) 名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

        表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。

3.2 连接到文件系统(Csv 格式)

        连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink 内部已经提供了,就叫做 FileSystem()。代码如下:

tableEnv.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接.withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())) // 定义表结构.createTemporaryTable("inputTable"); // 创建临时表

        这是旧版本的 csv 格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被 弃用,以后会被一个符合 RFC-4180 标准的新 format 描述器取代。新的描述器就叫 Csv(),但 flink 没有直接提供,需要引入依赖 flink-csv:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version>
</dependency>

        代码非常类似,只需要把 withFormat 里的 OldCsv 改成 Csv 就可以了。

3.3 连接到 Kafka

        kafka 的连接器 flink-kafka-connector 中,1.10 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 ConnectorDescriptor。

tableEnv.connect(new Kafka().version("0.11") // 定义 kafka 的版本.topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");

        当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是类似的。

4.代码示例

package com.atguigu.apitest.tableapi;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class TableTest2_CommonApi {public static void main(String[] args) throws Exception{// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 基于老版本planner的流处理EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);// 1.2 基于老版本planner的批处理ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);// 1.3 基于Blink的流处理EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);// 1.4 基于Blink的批处理EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);// 2. 表的创建:连接外部系统,读取数据// 2.1 读取文件String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";tableEnv.connect( new FileSystem().path(filePath)).withFormat( new Csv()).withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();// 3. 查询转换// 3.1 Table API// 简单转换Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQLtableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");// 打印输出tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(aggTable, Row.class).print("agg");tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");env.execute();}
}

 

这篇关于4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754485

相关文章

MySQL 8 中的一个强大功能 JSON_TABLE示例详解

《MySQL8中的一个强大功能JSON_TABLE示例详解》JSON_TABLE是MySQL8中引入的一个强大功能,它允许用户将JSON数据转换为关系表格式,从而可以更方便地在SQL查询中处理J... 目录基本语法示例示例查询解释应用场景不适用场景1. ‌jsON 数据结构过于复杂或动态变化‌2. ‌性能要

MySQL字符串常用函数详解

《MySQL字符串常用函数详解》本文给大家介绍MySQL字符串常用函数,本文结合实例代码给大家介绍的非常详细,对大家学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql字符串常用函数一、获取二、大小写转换三、拼接四、截取五、比较、反转、替换六、去空白、填充MySQL字符串常用函数一、

MySQL中比较运算符的具体使用

《MySQL中比较运算符的具体使用》本文介绍了SQL中常用的符号类型和非符号类型运算符,符号类型运算符包括等于(=)、安全等于(=)、不等于(/!=)、大小比较(,=,,=)等,感兴趣的可以了解一下... 目录符号类型运算符1. 等于运算符=2. 安全等于运算符<=>3. 不等于运算符<>或!=4. 小于运

虚拟机Centos7安装MySQL数据库实践

《虚拟机Centos7安装MySQL数据库实践》用户分享在虚拟机安装MySQL的全过程及常见问题解决方案,包括处理GPG密钥、修改密码策略、配置远程访问权限及防火墙设置,最终通过关闭防火墙和停止Net... 目录安装mysql数据库下载wget命令下载MySQL安装包安装MySQL安装MySQL服务安装完成

Java堆转储文件之1.6G大文件处理完整指南

《Java堆转储文件之1.6G大文件处理完整指南》堆转储文件是优化、分析内存消耗的重要工具,:本文主要介绍Java堆转储文件之1.6G大文件处理的相关资料,文中通过代码介绍的非常详细,需要的朋友可... 目录前言文件为什么这么大?如何处理这个文件?分析文件内容(推荐)删除文件(如果不需要)查看错误来源如何避

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java

MySQL逻辑删除与唯一索引冲突解决方案

《MySQL逻辑删除与唯一索引冲突解决方案》本文探讨MySQL逻辑删除与唯一索引冲突问题,提出四种解决方案:复合索引+时间戳、修改唯一字段、历史表、业务层校验,推荐方案1和方案3,适用于不同场景,感兴... 目录问题背景问题复现解决方案解决方案1.复合唯一索引 + 时间戳删除字段解决方案2:删除后修改唯一字

Zabbix在MySQL性能监控方面的运用及最佳实践记录

《Zabbix在MySQL性能监控方面的运用及最佳实践记录》Zabbix通过自定义脚本和内置模板监控MySQL核心指标(连接、查询、资源、复制),支持自动发现多实例及告警通知,结合可视化仪表盘,可有效... 目录一、核心监控指标及配置1. 关键监控指标示例2. 配置方法二、自动发现与多实例管理1. 实践步骤

使用Python构建一个高效的日志处理系统

《使用Python构建一个高效的日志处理系统》这篇文章主要为大家详细讲解了如何使用Python开发一个专业的日志分析工具,能够自动化处理、分析和可视化各类日志文件,大幅提升运维效率,需要的可以了解下... 目录环境准备工具功能概述完整代码实现代码深度解析1. 类设计与初始化2. 日志解析核心逻辑3. 文件处

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5