Flink流批一体计算(21):Flink SQL之Flink DDL

2023-11-29 10:30

本文主要是介绍Flink流批一体计算(21):Flink SQL之Flink DDL,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

执行 CREATE 语句

Python脚本

Java代码

SQL语句

列定义

物理/常规列

元数据列

计算列

WATERMARK

PRIMARY KEY

PARTITIONED BY

AS select_statement


Flink SQL是为了简化计算模型、降低您使用Flink门槛而设计的一套符合标准SQL语义的开发语言。

执行 CREATE 语句

Python脚本
table_env = TableEnvironment.create(...)# 对已经注册的表进行 SQL 查询# 注册名为 “Orders” 的表table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");# 在表上执行 SQL 查询,并把得到的结果作为一个新的表result = table_env.sql_query("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");# 对已注册的表进行 INSERT 操作# 注册 TableSinktable_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")# 在表上执行 INSERT 语句并向 TableSink 发出结果table_env \.execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Java代码
Environmentsettings settings = Environmentsettings.newInstance()...TableEnvironment tableEnv = TableEnvironment.create(settings);// 对已注册的表进行 SQL 查询// 注册名为 “Orders” 的表tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");// 在表上执行 SQL 查询,并把得到的结果作为一个新的表Table result = tableEnv.sqlQuery("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");// 对已注册的表进行 INSERT 操作// 注册 TableSinktableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");// 在表上执行 INSERT 语句并向 TableSink 发出结果tableEnv.executeSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

SQL语句
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);[INFO] Table has been created.Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);[INFO] Table has been created.Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';[INFO] Submitting SQL update statement to the cluster...

列定义

物理/常规列

物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此,物理列表示从外部系统读取和写入的有效数据。其他类型的列可以在物理列之间声明,但不会影响最终的物理模式。

以下语句创建一个仅包含常规列的表:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING

) WITH (

  ...

);

元数据列

元数据列是SQL标准的扩展,元数据列由metadata关键字指示。例如,可以使用元数据列从Kafka记录读取时间戳,并将时间戳写入Kafka,以进行基于时间的操作。连接器和格式文档列出了每个组件的可用元数据字段。但是,在表的模式中声明元数据列是可选的。

以下语句创建一个表,其中包含引用元数据字段时间戳的附加元数据列:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp

) WITH (

  'connector' = 'kafka'

  ...

);

每个元数据字段都由基于字符串的键标识,并具有文档化的数据类型。例如,Kafka连接器公开了一个元数据字段,该字段具有键时间戳和数据类型timestamp_LTZ3),可用于读取和写入记录。

在上面的示例中,元数据列record_time成为表模式的一部分,可以像常规列一样进行转换和存储,为方便起见,如果列名应用作标识元数据键,则可以省略FROM子句。

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `timestamp` TIMESTAMP_LTZ(3) METADATA    -- use column name as metadata key

) WITH (

  'connector' = 'kafka'

  ...

);

计算列

计算列是使用语法column_name AS Computed_column_expression生成的虚拟列。

计算列计算可以引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不物理存储在表中。列的数据类型是从给定表达式自动派生的,不必手动声明。

例如,计算列可以定义为

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `price` DOUBLE,

  `quantity` DOUBLE,

  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries

) WITH (

  'connector' = 'kafka'

  ...

);

Flink中通常使用计算列来定义CREATETABLE语句中的时间属性。

使用系统的PROCTIME()函数,可以通过proc AS PROCIME()轻松定义processing time属性。

Event time属性时间戳可以在WATERMARK声明之前预处理。例如,如果原始字段不是TIMESTAMP3)类型或嵌套在JSON字符串中,则可以使用计算列。

WATERMARK

WATERMARK 定义了表的Event time属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记event time的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。

使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。

Flink 提供了几种常用的 watermark 策略。

  • 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。

  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

发出到目前为止已观察到的最大时间戳减 1 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。

  • 有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一个 5 秒延迟的 watermark 策略。

CREATE TABLE Orders (

    `user` BIGINT,

    product STRING,

    order_time TIMESTAMP(3),

    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

) WITH (......);

PRIMARY KEY

主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。

Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。

Notes:  CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

PARTITIONED BY

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

AS select_statement

表也可以通过一个 CTAS 语句中的查询结果来创建并填充数据,CTAS 是一种简单、快捷的创建表并插入数据的方法。

CTAS 有两个部分,SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查询 CREATE 部分从 SELECT 查询中获取列信息,并创建目标表。  CREATE TABLE 类似,CTAS 要求必须在目标表的 WITH 子句中指定必填的表属性。

CTAS 的建表操作需要依赖目标 Catalog。比如,Hive Catalog 会自动在 Hive 中创建物理表。但是基于内存的 Catalog 只会将表的元信息注册在执行 SQL Client 的内存中。

CREATE TABLE my_ctas_table

WITH (

    'connector' = 'kafka',

    ...

)

AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意 CTAS 有如下约束:

  • 暂不支持创建临时表。
  • 暂不支持指定列信息。
  • 暂不支持指定 Watermark。
  • 暂不支持创建分区表。
  • 暂不支持主键约束。

注意 目前,CTAS 创建的目标表是非原子性的,如果在向表中插入数据时发生错误,该表不会被自动删除。

这篇关于Flink流批一体计算(21):Flink SQL之Flink DDL的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/432479

相关文章

Ubuntu中远程连接Mysql数据库的详细图文教程

《Ubuntu中远程连接Mysql数据库的详细图文教程》Ubuntu是一个以桌面应用为主的Linux发行版操作系统,这篇文章主要为大家详细介绍了Ubuntu中远程连接Mysql数据库的详细图文教程,有... 目录1、版本2、检查有没有mysql2.1 查询是否安装了Mysql包2.2 查看Mysql版本2.

基于SpringBoot+Mybatis实现Mysql分表

《基于SpringBoot+Mybatis实现Mysql分表》这篇文章主要为大家详细介绍了基于SpringBoot+Mybatis实现Mysql分表的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录基本思路定义注解创建ThreadLocal创建拦截器业务处理基本思路1.根据创建时间字段按年进

Python3.6连接MySQL的详细步骤

《Python3.6连接MySQL的详细步骤》在现代Web开发和数据处理中,Python与数据库的交互是必不可少的一部分,MySQL作为最流行的开源关系型数据库管理系统之一,与Python的结合可以实... 目录环境准备安装python 3.6安装mysql安装pymysql库连接到MySQL建立连接执行S

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

MySQL错误代码2058和2059的解决办法

《MySQL错误代码2058和2059的解决办法》:本文主要介绍MySQL错误代码2058和2059的解决办法,2058和2059的错误码核心都是你用的客户端工具和mysql版本的密码插件不匹配,... 目录1. 前置理解2.报错现象3.解决办法(敲重点!!!)1. php前置理解2058和2059的错误

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T