【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置

本文主要是介绍【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 一. create table hints
    • 1. 语法
    • 2. 示例
    • 3. 注意
  • 二. 实战:简化hive连接器参数设置
  • 三. select hints(ing)

SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。

SQL 提示一般可以用于以下:

  • 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行;
  • 增加元数据(或者统计信息):如"已扫描的表索引"和"一些混洗键(shuffle keys)的倾斜信息"的一些统计数据对于查询来说是动态的,用提示来配置它们会非常方便,因为我们从 planner
    获得的计划元数据通常不那么准确;
  • 算子(Operator)资源约束:在许多情况下,我们会为执行算子提供默认的资源配置,即最小并行度或托管内存(UDF 资源消耗)或特殊资源需求(GPU 或 SSD 磁盘)等,可以使用 SQL 提示非常灵活地为每个查询(非作业)配置资源

 

一. create table hints

动态表选项允许动态地指定或覆盖表选项,不同于用 SQL DDL 或 连接 API 定义的静态表选项,这些选项可以在每个查询的每个表范围内灵活地指定。

因此,它非常适合用于交互式终端中的特定查询,例如,在 SQL-CLI 中,你可以通过添加动态选项/*+ OPTIONS('csv.ignore-parse-errors'='true') */来指定忽略 CSV 源的解析错误。

 

1. 语法

为了不破坏 SQL 兼容性,我们使用 Oracle 风格的 SQL hints 语法:

table_path /*+ OPTIONS(key=val [, key=val]*) */key: string字符
val: string字符

 

2. 示例


CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);-- `覆盖`查询语句中源表的选项
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- 覆盖 join 中源表的选项
select * fromkafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1joinkafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2on t1.id = t2.id;-- 覆盖插入语句中结果表的选项
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

 

3. 注意

create table hints 传递的连接器中catalog的相关参数,即create table with下参数,具体到源代码是:context.getCatalogTable().getOptions()

 

如果传参无效且在日志中看到参数已经设置成功,那

可能将context.getConfiguration()中的参数传递到with参数下,比如:
hive连接器下:table.exec.hive.sink.statistic-auto-gather.enable 参数由DefaultDynamicTableContext的configuration来接收。此参数为flink sql的全局参数,此时可以通过set table.exec.hive.sink.statistic-auto-gather.enable=false 语法来设定参数。

 

二. 实战:简化hive连接器参数设置

对于hive连接器,Flink实现了通过catalog的方式来管理hive表,在使用hive表时需要使用hive相关语法,此时需要声明,hive dialect,如下:


CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'aaa','hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);SET table.sql-dialect=hive;-- 因为需要使用hive连接器中的写特性,所以需要create table ,此时sql语法为hive语法
CREATE TABLE hive_table (user_id STRING,order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file'
);-- 对于某些框架例如chunjun,此处不能很好的适配:
--
SET table.sql-dialect=default;
CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。
) WITH (...);-- streaming sql, insert into hive table
INSERT INTO TABLE myhive.aaa.hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

如下可以把写hive的一些行为通过sql hint方式,放到Flink sql语句中,如下整个Flink sql 会清爽很多。

CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'database_name','hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);CREATE TABLE source_kafka (`pv` string,`uv` string,`p_day_id` string
) WITH ('connector' = 'kafka-x','topic' = 'hive_kafka','properties.bootstrap.servers' = 'xxx:9092','properties.group.id' = 'luna_g','scan.startup.mode' = 'earliest-offset','json.timestamp-format.standard' = 'SQL','json.ignore-parse-errors' = 'true','format' = 'json','scan.parallelism' = '1');insert into myhive.database_name.table_name /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */select *  from source_kafka; 

 

三. select hints(ing)

查询提示(Query Hints)用于为优化器修改执行计划提供建议,该修改只能在当前查询提示所在的查询块中生效(Query block, 什么是查询块)。 目前,Flink 查询提示只支持联接提示(Join Hints)。

具体见:官网

https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/hints/#%E6%9F%A5%E8%AF%A2%E6%8F%90%E7%A4%BA

 

这篇关于【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/850265

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

wolfSSL参数设置或配置项解释

1. wolfCrypt Only 解释:wolfCrypt是一个开源的、轻量级的、可移植的加密库,支持多种加密算法和协议。选择“wolfCrypt Only”意味着系统或应用将仅使用wolfCrypt库进行加密操作,而不依赖其他加密库。 2. DTLS Support 解释:DTLS(Datagram Transport Layer Security)是一种基于UDP的安全协议,提供类似于

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount