【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

本文主要是介绍【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

  • 1.Lookup Join(维表 Join)
  • 2.Array Expansion(数组列转行)
  • 3.Table Function(自定义列转行)

1.Lookup Join(维表 Join)

Lookup Join 定义(支持 Batch / Streaming):Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

应用场景:小伙伴萌会问,我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是 流与流之间的 Join,而 Lookup Join 是流与 Redis,MySQL,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,MySQL,HBase 中,这就是 Lookup Join 的由来。

实际案例:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。

  • 曝光用户日志流(show_log)数据(数据存储在 Kafka 中)
log_id  timestamp            user_id
1       2021-11-01 00:01:03  a
2       2021-11-01 00:03:00  b
3       2021-11-01 00:05:00  c
4       2021-11-01 00:06:00  b
5       2021-11-01 00:07:00  c
  • 用户画像维表(user_profile)数据(数据存储在 Redis 中)
user_id(主键)   age     sex
a               12-18   男
b               18-24   女
c               18-24   男

注意:Redis 中的数据结构存储是按照 Key-Value 去存储的。其中 Key 为 user_id,Value 为 agesex 的 JSON。

具体 SQL:

CREATE TABLE show_log (log_id BIGINT,`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),user_id STRING,proctime AS PROCTIME()
)
WITH ('connector' = 'datagen','rows-per-second' = '10','fields.user_id.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE user_profile (user_id STRING,age STRING,sex STRING) WITH ('connector' = 'redis','hostname' = '127.0.0.1','port' = '6379','format' = 'json','lookup.cache.max-rows' = '500','lookup.cache.ttl' = '3600','lookup.max-retries' = '1'
);CREATE TABLE sink_table (log_id BIGINT,`timestamp` TIMESTAMP(3),user_id STRING,proctime TIMESTAMP(3),age STRING,sex STRING
) WITH ('connector' = 'print'
);-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT s.log_id as log_id, s.`timestamp` as `timestamp`, s.user_id as user_id, s.proctime as proctime, u.sex as sex, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id

输出数据如下:

log_id  timestamp            user_id  age     sex
1       2021-11-01 00:01:03  a        12-182       2021-11-01 00:03:00  b        18-243       2021-11-01 00:05:00  c        18-244       2021-11-01 00:06:00  b        18-245       2021-11-01 00:07:00  c        18-24

注意:实时的 Lookup 维表关联能使用 处理时间 去做关联。

  • 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 1 1 1 的数据在 08 : 00 08:00 08:00age12-18 变为了 18-24,那么当我们的任务在 08 : 01 08:01 08:01 failover 之后从 07 : 59 07:59 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
  • 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据。

再说说维表常见的性能问题及优化思路。

所有的维表性能问题都可以总结为:高 QPS 下访问维表存储引擎产生的任务背压,数据产出延迟问题

举个例子:

  • 在没有使用维表的情况下:一条数据从输入 Flink 任务到输出 Flink 任务的时延假如为 0.1 m s 0.1\ ms 0.1 ms,那么并行度为 1 1 1 的任务的吞吐可以达到 1 q u e r y / 0.1 m s = 10000 q p s 1\ query\ /\ 0.1\ ms = 10000\ qps 1 query / 0.1 ms=10000 qps
  • 在使用维表之后:每条数据访问维表的外部存储的时长为 2 m s 2\ ms 2 ms,那么一条数据从输入 Flink 任务到输出 Flink 任务的时延就会变成 2.1 m s 2.1\ ms 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1 q u e r y / 2.1 m s = 476 q p s 1\ query\ /\ 2.1\ ms = 476\ qps 1 query / 2.1 ms=476 qps。两者的吞吐量相差 21 21 21 倍。

这就是为什么维表 Join 的算子会产生背压,任务产出会延迟。

那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:

  • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过的 local cache 即可。这样就可以把访问外部存储 2.1 m s 2.1\ ms 2.1 ms 处理一个 Query 变为访问内存的 0.1 m s 0.1\ ms 0.1 ms 处理一个 Query 的时长。
  • 2️⃣ 异步访问外存:DataStream API 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 2.1 m s 2.1\ ms 2.1 ms 处理 10 10 10 个 Query。吞吐可变优化到 10 q u e r y / 2.1 m s = 4761 q p s 10\ query\ /\ 2.1\ ms = 4761\ qps 10 query / 2.1 ms=4761 qps
  • 3️⃣ 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 Redis 维表的 1 1 1 Query 占用 2.1 m s 2.1\ ms 2.1 ms 时长中,其中可能有 2 m s 2\ ms 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 m s 0.1\ ms 0.1 ms 是 Redis Server 处理请求的时长。那么我们就可以使用 Redis 提供的 pipeline 能力,在客户端(也就是 Flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 Redis Sever。这样就可以把 2.1 m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 7 m s = 2 m s + 50 ∗ 0.1 m s 7\ ms=2\ ms + 50 * 0.1\ ms 7 ms=2 ms+500.1 ms 处理 50 50 50 个 Query。吞吐可变为 50 q u e r y / 7 m s = 7143 q p s 50\ query\ /\ 7\ ms = 7143\ qps 50 query / 7 ms=7143 qps

博主认为上述优化效果中,最好用的是 1️⃣ + 3️⃣,2️⃣ 相比 3️⃣ 还是一条一条发请求,性能会差一些。

既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作

  • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:SQL 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udafuser defined aggregation function)中做访问 Redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 Redis 压力。在博主实现的 Redis Connector 中,内置了 local cache 的实现。
  • 2️⃣ 异步访问外存:目前博主实现的 Redis Connector 不支持异步访问,但是官方实现的 HBase Connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
  • 3️⃣ 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 Redis 的批量访问外存优化的功能。

2.Array Expansion(数组列转行)

应用场景(支持 Batch / Streaming):将表中 ARRAY 类型字段(列)拍平,转为多行。

实际案例:比如某些场景下,日志是合并、攒批上报的,就可以使用这种方式将一个 Array 转为多行。

CREATE TABLE show_log_table (log_id BIGINT,show_params ARRAY<STRING>
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (log_id BIGINT,show_param STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTlog_id,t.show_param as show_param
FROM show_log_table
-- array 炸开语法
CROSS JOIN UNNEST(show_params) AS t (show_param)

show_log_table 原始数据:

+I[7, [a, b, c]]
+I[5, [d, e, f]]

输出结果如下所示:

-- +I[7, [a, b, c]] 一行转为 3 行
+I[7, a]
+I[7, b]
+I[7, b]
-- +I[5, [d, e, f]] 一行转为 3 行
+I[5, d]
+I[5, e]
+I[5, f]

3.Table Function(自定义列转行)

应用场景(支持 Batch / Streaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑。

Table Function 使用分类:

  • Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 1 1 行转为 0 0 0 行,这行数据直接被丢弃。
  • Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值。
public class TableFunctionInnerJoin_Test {public static void main(String[] args) throws Exception {FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);String sql = "CREATE FUNCTION user_profile_table_func AS 'flink.examples.sql._07.query._06_joins._06_table_function"+ "._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction';\n"+ "\n"+ "CREATE TABLE source_table (\n"+ "    user_id BIGINT NOT NULL,\n"+ "    name STRING,\n"+ "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n"+ "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"+ ") WITH (\n"+ "  'connector' = 'datagen',\n"+ "  'rows-per-second' = '10',\n"+ "  'fields.name.length' = '1',\n"+ "  'fields.user_id.min' = '1',\n"+ "  'fields.user_id.max' = '10'\n"+ ");\n"+ "\n"+ "CREATE TABLE sink_table (\n"+ "    user_id BIGINT,\n"+ "    name STRING,\n"+ "    age INT,\n"+ "    row_time TIMESTAMP(3)\n"+ ") WITH (\n"+ "  'connector' = 'print'\n"+ ");\n"+ "\n"+ "INSERT INTO sink_table\n"+ "SELECT user_id,\n"+ "       name,\n"+ "       age,\n"+ "       row_time\n"+ "FROM source_table,\n"// Table Function Join 语法对应 LATERAL TABLE+ "LATERAL TABLE(user_profile_table_func(user_id)) t(age)";Arrays.stream(sql.split(";")).forEach(flinkEnv.streamTEnv()::executeSql);}public static class UserProfileTableFunction extends TableFunction<Integer> {public void eval(long userId) {// 自定义输出逻辑if (userId <= 5) {// 一行转 1 行collect(1);} else {// 一行转 3 行collect(1);collect(2);collect(3);}}}
}

执行结果如下:

-- userId <= 5,则只有 1 行结果
+I[3, 7, 1, 2021-05-01T18:23:42.560]
-- userId > 5,则有行 3 结果
+I[8, e, 1, 2021-05-01T18:23:42.560]
+I[8, e, 2, 2021-05-01T18:23:42.560]
+I[8, e, 3, 2021-05-01T18:23:42.560]
-- userId <= 5,则只有 1 行结果
+I[4, 9, 1, 2021-05-01T18:23:42.561]
-- userId > 5,则有行 3 结果
+I[8, c, 1, 2021-05-01T18:23:42.561]
+I[8, c, 2, 2021-05-01T18:23:42.561]
+I[8, c, 3, 2021-05-01T18:23:42.561]

这篇关于【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/760981

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间