4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)

本文主要是介绍4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.基本程序结构

2.创建表环境

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

3.2 连接到文件系统(Csv 格式)

3.3 连接到 Kafka

4.代码示例


1.基本程序结构

        Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。具体操作流程如下:

StreamTableEnvironment tableEnv = ... // 创建表的执行环境// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");// 将结果表写入输出表中
result.insertInto("outputTable");

2.创建表环境

        创建表环境最简单的方式,就是基于流处理执行环境,调 create 方法直接创建:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的核心概念。它负责:⚫ 注册 catalog在内部 catalog 中注册表执行 SQL 查询注册用户自定义函数UDF ⚫ 将 DataStream 或 DataSet 转换为表 ⚫ 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

        在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数, 可以用来配置 TableEnvironment 的一些特性。

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

        TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的 map。

        表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database) 名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

        表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。

3.2 连接到文件系统(Csv 格式)

        连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink 内部已经提供了,就叫做 FileSystem()。代码如下:

tableEnv.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接.withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())) // 定义表结构.createTemporaryTable("inputTable"); // 创建临时表

        这是旧版本的 csv 格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被 弃用,以后会被一个符合 RFC-4180 标准的新 format 描述器取代。新的描述器就叫 Csv(),但 flink 没有直接提供,需要引入依赖 flink-csv:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version>
</dependency>

        代码非常类似,只需要把 withFormat 里的 OldCsv 改成 Csv 就可以了。

3.3 连接到 Kafka

        kafka 的连接器 flink-kafka-connector 中,1.10 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 ConnectorDescriptor。

tableEnv.connect(new Kafka().version("0.11") // 定义 kafka 的版本.topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");

        当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是类似的。

4.代码示例

package com.atguigu.apitest.tableapi;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class TableTest2_CommonApi {public static void main(String[] args) throws Exception{// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 基于老版本planner的流处理EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);// 1.2 基于老版本planner的批处理ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);// 1.3 基于Blink的流处理EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);// 1.4 基于Blink的批处理EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);// 2. 表的创建:连接外部系统,读取数据// 2.1 读取文件String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";tableEnv.connect( new FileSystem().path(filePath)).withFormat( new Csv()).withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();// 3. 查询转换// 3.1 Table API// 简单转换Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQLtableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");// 打印输出tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(aggTable, Row.class).print("agg");tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");env.execute();}
}

 

这篇关于4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754485

相关文章

浅析Java中如何优雅地处理null值

《浅析Java中如何优雅地处理null值》这篇文章主要为大家详细介绍了如何结合Lambda表达式和Optional,让Java更优雅地处理null值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录场景 1:不为 null 则执行场景 2:不为 null 则返回,为 null 则返回特定值或抛出异常场景

Python如何自动生成环境依赖包requirements

《Python如何自动生成环境依赖包requirements》:本文主要介绍Python如何自动生成环境依赖包requirements问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录生成当前 python 环境 安装的所有依赖包1、命令2、常见问题只生成当前 项目 的所有依赖包1、

SQL表间关联查询实例详解

《SQL表间关联查询实例详解》本文主要讲解SQL语句中常用的表间关联查询方式,包括:左连接(leftjoin)、右连接(rightjoin)、全连接(fulljoin)、内连接(innerjoin)、... 目录简介样例准备左外连接右外连接全外连接内连接交叉连接自然连接简介本文主要讲解SQL语句中常用的表

SQL server配置管理器找不到如何打开它

《SQLserver配置管理器找不到如何打开它》最近遇到了SQLserver配置管理器打不开的问题,尝试在开始菜单栏搜SQLServerManager无果,于是将自己找到的方法总结分享给大家,对SQ... 目录方法一:桌面图标进入方法二:运行窗口进入方法三:查找文件路径方法四:检查 SQL Server 安

MySQL 中的 LIMIT 语句及基本用法

《MySQL中的LIMIT语句及基本用法》LIMIT语句用于限制查询返回的行数,常用于分页查询或取部分数据,提高查询效率,:本文主要介绍MySQL中的LIMIT语句,需要的朋友可以参考下... 目录mysql 中的 LIMIT 语句1. LIMIT 语法2. LIMIT 基本用法(1) 获取前 N 行数据(

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

MySQL 分区与分库分表策略应用小结

《MySQL分区与分库分表策略应用小结》在大数据量、复杂查询和高并发的应用场景下,单一数据库往往难以满足性能和扩展性的要求,本文将详细介绍这两种策略的基本概念、实现方法及优缺点,并通过实际案例展示如... 目录mysql 分区与分库分表策略1. 数据库水平拆分的背景2. MySQL 分区策略2.1 分区概念

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2