【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

本文主要是介绍【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

  • 1.Lookup Join(维表 Join)
  • 2.Array Expansion(数组列转行)
  • 3.Table Function(自定义列转行)

1.Lookup Join(维表 Join)

Lookup Join 定义(支持 Batch / Streaming):Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

应用场景:小伙伴萌会问,我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是 流与流之间的 Join,而 Lookup Join 是流与 Redis,MySQL,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,MySQL,HBase 中,这就是 Lookup Join 的由来。

实际案例:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。

  • 曝光用户日志流(show_log)数据(数据存储在 Kafka 中)
log_id  timestamp            user_id
1       2021-11-01 00:01:03  a
2       2021-11-01 00:03:00  b
3       2021-11-01 00:05:00  c
4       2021-11-01 00:06:00  b
5       2021-11-01 00:07:00  c
  • 用户画像维表(user_profile)数据(数据存储在 Redis 中)
user_id(主键)   age     sex
a               12-18   男
b               18-24   女
c               18-24   男

注意:Redis 中的数据结构存储是按照 Key-Value 去存储的。其中 Key 为 user_id,Value 为 agesex 的 JSON。

具体 SQL:

CREATE TABLE show_log (log_id BIGINT,`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),user_id STRING,proctime AS PROCTIME()
)
WITH ('connector' = 'datagen','rows-per-second' = '10','fields.user_id.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE user_profile (user_id STRING,age STRING,sex STRING) WITH ('connector' = 'redis','hostname' = '127.0.0.1','port' = '6379','format' = 'json','lookup.cache.max-rows' = '500','lookup.cache.ttl' = '3600','lookup.max-retries' = '1'
);CREATE TABLE sink_table (log_id BIGINT,`timestamp` TIMESTAMP(3),user_id STRING,proctime TIMESTAMP(3),age STRING,sex STRING
) WITH ('connector' = 'print'
);-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT s.log_id as log_id, s.`timestamp` as `timestamp`, s.user_id as user_id, s.proctime as proctime, u.sex as sex, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id

输出数据如下:

log_id  timestamp            user_id  age     sex
1       2021-11-01 00:01:03  a        12-182       2021-11-01 00:03:00  b        18-243       2021-11-01 00:05:00  c        18-244       2021-11-01 00:06:00  b        18-245       2021-11-01 00:07:00  c        18-24

注意:实时的 Lookup 维表关联能使用 处理时间 去做关联。

  • 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 1 1 1 的数据在 08 : 00 08:00 08:00age12-18 变为了 18-24,那么当我们的任务在 08 : 01 08:01 08:01 failover 之后从 07 : 59 07:59 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
  • 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据。

再说说维表常见的性能问题及优化思路。

所有的维表性能问题都可以总结为:高 QPS 下访问维表存储引擎产生的任务背压,数据产出延迟问题

举个例子:

  • 在没有使用维表的情况下:一条数据从输入 Flink 任务到输出 Flink 任务的时延假如为 0.1 m s 0.1\ ms 0.1 ms,那么并行度为 1 1 1 的任务的吞吐可以达到 1 q u e r y / 0.1 m s = 10000 q p s 1\ query\ /\ 0.1\ ms = 10000\ qps 1 query / 0.1 ms=10000 qps
  • 在使用维表之后:每条数据访问维表的外部存储的时长为 2 m s 2\ ms 2 ms,那么一条数据从输入 Flink 任务到输出 Flink 任务的时延就会变成 2.1 m s 2.1\ ms 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1 q u e r y / 2.1 m s = 476 q p s 1\ query\ /\ 2.1\ ms = 476\ qps 1 query / 2.1 ms=476 qps。两者的吞吐量相差 21 21 21 倍。

这就是为什么维表 Join 的算子会产生背压,任务产出会延迟。

那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:

  • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过的 local cache 即可。这样就可以把访问外部存储 2.1 m s 2.1\ ms 2.1 ms 处理一个 Query 变为访问内存的 0.1 m s 0.1\ ms 0.1 ms 处理一个 Query 的时长。
  • 2️⃣ 异步访问外存:DataStream API 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 2.1 m s 2.1\ ms 2.1 ms 处理 10 10 10 个 Query。吞吐可变优化到 10 q u e r y / 2.1 m s = 4761 q p s 10\ query\ /\ 2.1\ ms = 4761\ qps 10 query / 2.1 ms=4761 qps
  • 3️⃣ 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 Redis 维表的 1 1 1 Query 占用 2.1 m s 2.1\ ms 2.1 ms 时长中,其中可能有 2 m s 2\ ms 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 m s 0.1\ ms 0.1 ms 是 Redis Server 处理请求的时长。那么我们就可以使用 Redis 提供的 pipeline 能力,在客户端(也就是 Flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 Redis Sever。这样就可以把 2.1 m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 7 m s = 2 m s + 50 ∗ 0.1 m s 7\ ms=2\ ms + 50 * 0.1\ ms 7 ms=2 ms+500.1 ms 处理 50 50 50 个 Query。吞吐可变为 50 q u e r y / 7 m s = 7143 q p s 50\ query\ /\ 7\ ms = 7143\ qps 50 query / 7 ms=7143 qps

博主认为上述优化效果中,最好用的是 1️⃣ + 3️⃣,2️⃣ 相比 3️⃣ 还是一条一条发请求,性能会差一些。

既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作

  • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:SQL 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udafuser defined aggregation function)中做访问 Redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 Redis 压力。在博主实现的 Redis Connector 中,内置了 local cache 的实现。
  • 2️⃣ 异步访问外存:目前博主实现的 Redis Connector 不支持异步访问,但是官方实现的 HBase Connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
  • 3️⃣ 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 Redis 的批量访问外存优化的功能。

2.Array Expansion(数组列转行)

应用场景(支持 Batch / Streaming):将表中 ARRAY 类型字段(列)拍平,转为多行。

实际案例:比如某些场景下,日志是合并、攒批上报的,就可以使用这种方式将一个 Array 转为多行。

CREATE TABLE show_log_table (log_id BIGINT,show_params ARRAY<STRING>
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (log_id BIGINT,show_param STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTlog_id,t.show_param as show_param
FROM show_log_table
-- array 炸开语法
CROSS JOIN UNNEST(show_params) AS t (show_param)

show_log_table 原始数据:

+I[7, [a, b, c]]
+I[5, [d, e, f]]

输出结果如下所示:

-- +I[7, [a, b, c]] 一行转为 3 行
+I[7, a]
+I[7, b]
+I[7, b]
-- +I[5, [d, e, f]] 一行转为 3 行
+I[5, d]
+I[5, e]
+I[5, f]

3.Table Function(自定义列转行)

应用场景(支持 Batch / Streaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑。

Table Function 使用分类:

  • Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 1 1 行转为 0 0 0 行,这行数据直接被丢弃。
  • Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值。
public class TableFunctionInnerJoin_Test {public static void main(String[] args) throws Exception {FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);String sql = "CREATE FUNCTION user_profile_table_func AS 'flink.examples.sql._07.query._06_joins._06_table_function"+ "._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction';\n"+ "\n"+ "CREATE TABLE source_table (\n"+ "    user_id BIGINT NOT NULL,\n"+ "    name STRING,\n"+ "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n"+ "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"+ ") WITH (\n"+ "  'connector' = 'datagen',\n"+ "  'rows-per-second' = '10',\n"+ "  'fields.name.length' = '1',\n"+ "  'fields.user_id.min' = '1',\n"+ "  'fields.user_id.max' = '10'\n"+ ");\n"+ "\n"+ "CREATE TABLE sink_table (\n"+ "    user_id BIGINT,\n"+ "    name STRING,\n"+ "    age INT,\n"+ "    row_time TIMESTAMP(3)\n"+ ") WITH (\n"+ "  'connector' = 'print'\n"+ ");\n"+ "\n"+ "INSERT INTO sink_table\n"+ "SELECT user_id,\n"+ "       name,\n"+ "       age,\n"+ "       row_time\n"+ "FROM source_table,\n"// Table Function Join 语法对应 LATERAL TABLE+ "LATERAL TABLE(user_profile_table_func(user_id)) t(age)";Arrays.stream(sql.split(";")).forEach(flinkEnv.streamTEnv()::executeSql);}public static class UserProfileTableFunction extends TableFunction<Integer> {public void eval(long userId) {// 自定义输出逻辑if (userId <= 5) {// 一行转 1 行collect(1);} else {// 一行转 3 行collect(1);collect(2);collect(3);}}}
}

执行结果如下:

-- userId <= 5,则只有 1 行结果
+I[3, 7, 1, 2021-05-01T18:23:42.560]
-- userId > 5,则有行 3 结果
+I[8, e, 1, 2021-05-01T18:23:42.560]
+I[8, e, 2, 2021-05-01T18:23:42.560]
+I[8, e, 3, 2021-05-01T18:23:42.560]
-- userId <= 5,则只有 1 行结果
+I[4, 9, 1, 2021-05-01T18:23:42.561]
-- userId > 5,则有行 3 结果
+I[8, c, 1, 2021-05-01T18:23:42.561]
+I[8, c, 2, 2021-05-01T18:23:42.561]
+I[8, c, 3, 2021-05-01T18:23:42.561]

这篇关于【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/760981

相关文章

Ubuntu中远程连接Mysql数据库的详细图文教程

《Ubuntu中远程连接Mysql数据库的详细图文教程》Ubuntu是一个以桌面应用为主的Linux发行版操作系统,这篇文章主要为大家详细介绍了Ubuntu中远程连接Mysql数据库的详细图文教程,有... 目录1、版本2、检查有没有mysql2.1 查询是否安装了Mysql包2.2 查看Mysql版本2.

基于SpringBoot+Mybatis实现Mysql分表

《基于SpringBoot+Mybatis实现Mysql分表》这篇文章主要为大家详细介绍了基于SpringBoot+Mybatis实现Mysql分表的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录基本思路定义注解创建ThreadLocal创建拦截器业务处理基本思路1.根据创建时间字段按年进

Python3.6连接MySQL的详细步骤

《Python3.6连接MySQL的详细步骤》在现代Web开发和数据处理中,Python与数据库的交互是必不可少的一部分,MySQL作为最流行的开源关系型数据库管理系统之一,与Python的结合可以实... 目录环境准备安装python 3.6安装mysql安装pymysql库连接到MySQL建立连接执行S

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分