【大数据】Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

2024-03-01 00:20

本文主要是介绍【大数据】Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

  • 1.EXPLAIN 子句
  • 2.USE 子句
  • 3.SHOW 子句
  • 4.LOAD、UNLOAD 子句
  • 5.SET、RESET 子句
  • 6.SQL Hints

1.EXPLAIN 子句

EXPLAIN 子句其实就是用于查看当前这个 SQL 查询的逻辑计划以及优化的执行计划。

SQL 语法标准:

EXPLAIN PLAN FOR <query_statement_or_insert_statement>

实际案例:

public class Explain_Test {public static void main(String[] args) throws Exception {FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);flinkEnv.env().setParallelism(1);String sql = "CREATE TABLE source_table (\n"+ "    user_id BIGINT COMMENT '用户 id',\n"+ "    name STRING COMMENT '用户姓名',\n"+ "    server_timestamp BIGINT COMMENT '用户访问时间戳',\n"+ "    proctime AS PROCTIME()\n"+ ") WITH (\n"+ "  'connector' = 'datagen',\n"+ "  'rows-per-second' = '1',\n"+ "  'fields.name.length' = '1',\n"+ "  'fields.user_id.min' = '1',\n"+ "  'fields.user_id.max' = '10',\n"+ "  'fields.server_timestamp.min' = '1',\n"+ "  'fields.server_timestamp.max' = '100000'\n"+ ");\n"+ "\n"+ "CREATE TABLE sink_table (\n"+ "    user_id BIGINT,\n"+ "    name STRING,\n"+ "    server_timestamp BIGINT\n"+ ") WITH (\n"+ "  'connector' = 'print'\n"+ ");\n"+ "\n"+ "EXPLAIN PLAN FOR\n"+ "INSERT INTO sink_table\n"+ "select user_id,\n"+ "       name,\n"+ "       server_timestamp\n"+ "from (\n"+ "      SELECT\n"+ "          user_id,\n"+ "          name,\n"+ "          server_timestamp,\n"+ "          row_number() over(partition by user_id order by proctime) as rn\n"+ "      FROM source_table\n"+ ")\n"+ "where rn = 1";/*** 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}*      -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}*/for (String innerSql : sql.split(";")) {TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);tableResult.print();}}
}

上述代码执行结果如下:

1. 抽象语法树
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])+- LogicalFilter(condition=[=($3, 1)])+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])+- LogicalTableScan(table=[[default_catalog, default_database, source_table]])2. 优化后的物理计划
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])+- Exchange(distribution=[hash[user_id]])+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])3. 优化后的执行计划
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])+- Exchange(distribution=[hash[user_id]])+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

2.USE 子句

如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,USE 子句通常被用于切换库,那么在 Flink SQL 体系中,它的作用也是和 MySQL 中 USE 子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module。

  • 切换 Catalog
USE CATALOG catalog_name
  • 使用 Module
USE MODULES module_name1[, module_name2, ...]
  • 切换 Database
USE db名称

实际案例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// create a catalog
tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// |    catalog name |
// +-----------------+
// | default_catalog |
// | cat1            |
// +-----------------+// change default catalog
tEnv.executeSql("USE CATALOG cat1");tEnv.executeSql("SHOW DATABASES").print();
// databases are empty
// +---------------+
// | database name |
// +---------------+
// +---------------+// create a database
tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
tEnv.executeSql("SHOW DATABASES").print();
// +---------------+
// | database name |
// +---------------+
// |        db1    |
// +---------------+// change default database
tEnv.executeSql("USE db1");// change module resolution order and enabled status
tEnv.executeSql("USE MODULES hive");
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        hive |  true |
// |        core | false |
// +-------------+-------+

3.SHOW 子句

如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,SHOW 子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持 SHOW 以下内容。

SQL 语法标准:

  • SHOW CATALOGS:展示所有 Catalog
  • SHOW CURRENT CATALOG:展示当前的 Catalog
  • SHOW DATABASES:展示当前 Catalog 下所有 Database
  • SHOW CURRENT DATABASE:展示当前的 Database
  • SHOW TABLES:展示当前 Database 下所有表
  • SHOW VIEWS:展示所有视图
  • SHOW FUNCTIONS:展示所有的函数
  • SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// show catalogs
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// |    catalog name |
// +-----------------+
// | default_catalog |
// +-----------------+// show current catalog
tEnv.executeSql("SHOW CURRENT CATALOG").print();
// +----------------------+
// | current catalog name |
// +----------------------+
// |      default_catalog |
// +----------------------+// show databases
tEnv.executeSql("SHOW DATABASES").print();
// +------------------+
// |    database name |
// +------------------+
// | default_database |
// +------------------+// show current database
tEnv.executeSql("SHOW CURRENT DATABASE").print();
// +-----------------------+
// | current database name |
// +-----------------------+
// |      default_database |
// +-----------------------+// create a table
tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
// show tables
tEnv.executeSql("SHOW TABLES").print();
// +------------+
// | table name |
// +------------+
// |   my_table |
// +------------+// create a view
tEnv.executeSql("CREATE VIEW my_view AS ...");
// show views
tEnv.executeSql("SHOW VIEWS").print();
// +-----------+
// | view name |
// +-----------+
// |   my_view |
// +-----------+// show functions
tEnv.executeSql("SHOW FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// |           mod |
// |        sha256 |
// |           ... |
// +---------------+// create a user defined function
tEnv.executeSql("CREATE FUNCTION f1 AS ...");
// show user defined functions
tEnv.executeSql("SHOW USER FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// |            f1 |
// |           ... |
// +---------------+// show modules
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        core |
// +-------------+// show full modules
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        core |  true |
// |        hive | false |
// +-------------+-------+

4.LOAD、UNLOAD 子句

我们可以使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。

SQL 语法标准:

-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]-- 卸载
UNLOAD MODULE module_name
  • LOAD 案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 加载 Flink SQL 体系内置的 Hive module
tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        core |
// |        hive |
// +-------------+
  • UNLOAD 案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 卸载唯一的一个 CoreModule
tEnv.executeSql("UNLOAD MODULE core");
tEnv.executeSql("SHOW MODULES").print();
// 结果啥 Moudle 都没有了

5.SET、RESET 子句

SET 子句可以用于修改一些 Flink SQL 的环境配置,RESET 子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。

SET (key = value)?RESET (key)?

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

Flink SQL> SET table.planner = blink;
[INFO] Session property has been set.Flink SQL> SET;
table.planner=blink;Flink SQL> RESET table.planner;
[INFO] Session property has been reset.Flink SQL> RESET;
[INFO] All session properties have been set to their default values.

6.SQL Hints

Hints(提示)是一种机制,用来告诉优化器按照我们的告诉它的方式生成执行计划。

比如有一个 Kafka 数据源表 kafka_table1,用户想直接从 latest-offset Select 一些数据出来预览,其元数据已经存储在 Hive MetaStore 中,但是 Hive MetaStore 中存储的配置中的 scan.startup.modeearliest-offset,通过 SQL Hints,用户可以在 DML 语句中将 scan.startup.mode 改为 latest-offset 查询,因此可以看出 SQL Hints 常用语这种比较临时的参数修改,比如 Ad-hoc 这种临时查询中,方便用户使用自定义的新的表参数而不是 Catalog 中已有的表参数。

以下 DML SQL 中的 /*+ OPTIONS(key=val [, key=val]*) */ 就是 SQL Hints。

SELECT *
FROM table_path /*+ OPTIONS(key=val [, key=val]*) */

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);-- 1. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- 2. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select * fromkafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1joinkafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2on t1.id = t2.id;-- 3. 使用 'sink.partitioner'='round-robin' 覆盖原来的 Sink 表的 sink.partitioner
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

这篇关于【大数据】Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/760578

相关文章

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内

将sqlserver数据迁移到mysql的详细步骤记录

《将sqlserver数据迁移到mysql的详细步骤记录》:本文主要介绍将SQLServer数据迁移到MySQL的步骤,包括导出数据、转换数据格式和导入数据,通过示例和工具说明,帮助大家顺利完成... 目录前言一、导出SQL Server 数据二、转换数据格式为mysql兼容格式三、导入数据到MySQL数据

C++中使用vector存储并遍历数据的基本步骤

《C++中使用vector存储并遍历数据的基本步骤》C++标准模板库(STL)提供了多种容器类型,包括顺序容器、关联容器、无序关联容器和容器适配器,每种容器都有其特定的用途和特性,:本文主要介绍C... 目录(1)容器及简要描述‌php顺序容器‌‌关联容器‌‌无序关联容器‌(基于哈希表):‌容器适配器‌:(

C#提取PDF表单数据的实现流程

《C#提取PDF表单数据的实现流程》PDF表单是一种常见的数据收集工具,广泛应用于调查问卷、业务合同等场景,凭借出色的跨平台兼容性和标准化特点,PDF表单在各行各业中得到了广泛应用,本文将探讨如何使用... 目录引言使用工具C# 提取多个PDF表单域的数据C# 提取特定PDF表单域的数据引言PDF表单是一

MySQL分表自动化创建的实现方案

《MySQL分表自动化创建的实现方案》在数据库应用场景中,随着数据量的不断增长,单表存储数据可能会面临性能瓶颈,例如查询、插入、更新等操作的效率会逐渐降低,分表是一种有效的优化策略,它将数据分散存储在... 目录一、项目目的二、实现过程(一)mysql 事件调度器结合存储过程方式1. 开启事件调度器2. 创

SQL Server使用SELECT INTO实现表备份的代码示例

《SQLServer使用SELECTINTO实现表备份的代码示例》在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误,在SQLServer中,可以使用SELECTINT... 在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误。在 SQL Server 中,可以使用 SE

一文详解Python中数据清洗与处理的常用方法

《一文详解Python中数据清洗与处理的常用方法》在数据处理与分析过程中,缺失值、重复值、异常值等问题是常见的挑战,本文总结了多种数据清洗与处理方法,文中的示例代码简洁易懂,有需要的小伙伴可以参考下... 目录缺失值处理重复值处理异常值处理数据类型转换文本清洗数据分组统计数据分箱数据标准化在数据处理与分析过

大数据小内存排序问题如何巧妙解决

《大数据小内存排序问题如何巧妙解决》文章介绍了大数据小内存排序的三种方法:数据库排序、分治法和位图法,数据库排序简单但速度慢,对设备要求高;分治法高效但实现复杂;位图法可读性差,但存储空间受限... 目录三种方法:方法概要数据库排序(http://www.chinasem.cn对数据库设备要求较高)分治法(常

mysql外键创建不成功/失效如何处理

《mysql外键创建不成功/失效如何处理》文章介绍了在MySQL5.5.40版本中,创建带有外键约束的`stu`和`grade`表时遇到的问题,发现`grade`表的`id`字段没有随着`studen... 当前mysql版本:SELECT VERSION();结果为:5.5.40。在复习mysql外键约

SQL注入漏洞扫描之sqlmap详解

《SQL注入漏洞扫描之sqlmap详解》SQLMap是一款自动执行SQL注入的审计工具,支持多种SQL注入技术,包括布尔型盲注、时间型盲注、报错型注入、联合查询注入和堆叠查询注入... 目录what支持类型how---less-1为例1.检测网站是否存在sql注入漏洞的注入点2.列举可用数据库3.列举数据库