详述FlinkSql Join操作

2024-02-08 06:12
文章标签 操作 join 详述 flinksql

本文主要是介绍详述FlinkSql Join操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

FlinkSql 的 Join

Flink 官网将其分为了 Joins 和 Window Joins两个大类,其中里面又分了很多 Join 方式

参考文档:

Joins | Apache Flink

Window JOIN | Apache Flink

Joins

官网介绍共有6种方式:

  1. Regular Join:流与流的 Join,包括 Inner Join、Outer Equal Join

  2. Interval Join:流与流的 Join,两条流一段时间区间内的 Join

  3. Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join

  4. Lookup Join:流与外部维表的 Join

  5. Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行

  6. Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join

Regular Join

写法上和传统数据库没有区别,关联条件支持等值和非等值Join,有Inner Join 和 Outer Join(Left Join、Right Join、FULL JOIN)

有人问我为什么要特别区分内外连接,后面会用到

内连接是通过匹配两个表之间的共同列,返回满足连接条件的行。只有在连接条件匹配的情况下,才会返回结果。

外连接是在内连接的基础上,还包括了不满足连接条件的行。

SELECT order_id, uid, price, user_name 
FROM order a
Left JOIN user b
ON a.uid = b.uid

顺便了解一下流是怎么 Join 的:

和离线不同,离线是一批数据一起运算的,完成后输出结果

FlinkSql是Dynamic Table的概念,数据在 State 里面,每来一条数据就会对左右两边的数据进行关联

Regular Join 的 State 默认是永久保存的,为了避免 State 无限膨胀,可以根据情况决定是否设置状态清理:table.exec.state.ttl(目前是根据更新时间来判断是否过期,而非访问时间)

再来看看几种 Join ,其中outer Join产生的回撤流是和传统离线方式有很大区别的:

首先不考虑数据源有回撤的情况,Regular Join在 Outer Join 时会产生回撤流,L-左表、R-右表

  •  Inner Join:两条流 Join 到才输出 +[L, R],关联不上不会输出

  •  Left Join:当左流数据到达之后就会直接输出

        可以 Join 到右流则输出 +[L,R],Join 不到右流输出 +[L,null]

        如果之后右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据

        则会发起回撤流,先输出 -[L,null],然后在输出一条 +[L,R]

  •  Right Join:有 Left Join 一样,只是逻辑相反

  • Full Join:和Left原理一样,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出,如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流

        对右流来说:Join 到输出 +[L,R],没 Join 到输出 +[null,R],左流数据到达后回撤 -[null,R],输出 +[L,R]

        对左流来说:Join 到输出 +[L,R],没 Join 到输出 +[L,null]),右流数据到达后回撤 -[L, null],输出 +[L,R]     

图解:

Regular Join 过程图

inner join 和 lef join 输出结果示例:

inner join
+I[5, d, 5, f]
+I[5, d, 5, 8]
+I[3, 4, 3, 0]
left join
+I[3, 4ab, null, null]
+I[5, f3c, 5, c05]
+I[5, 6e2, 5, c05]
-D[3, 4ab, null, null]
+I[3, 4ab, 3, 765]

关于 Regular Join 的注意事项:

  • 实时 Regular Join 可以不是 等值 join等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联

  •  Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出,如果是outer join会立即输出之后产生回撤流

  • 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。

Interval Join

Interval Join 只支持普通 Append 数据流,不支持含 Retract 的动态表

Interval Join 左右表仅在某个时间范围(给定上界和下界)内进行关联,这个时间区间支持event time 和 processing time两种语义,如果是 event time,会根据区间和Watermark自动清理状态

场景示例:用户下单产生订单信息,用户必须在下单后一个小时以内付款,输出付款的订单信息

SELECTo.orderId,o.productName,p.payType,o.orderTime,cast(payTime as timestamp) as payTime
FROM Orders o 
JOIN Payment p 
ON  o.orderId = p.orderId 
AND p.payTime BETWEEN orderTime AND orderTime + INTERVAL ‘1’ HOUR

Interval Join 几种方式,需要注意 Interval Join 不会产生回撤流:

  •  Inner Join:只有两条流 Join 到才输出,输出 +[L, R]

  • Left Join:和 Regular Join 不同,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,这时输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null](这时候其实已经延迟了),如果右流 State 中的数据过期了,就直接从 State 中删除

  • Right Join:同 Left Join,逻辑相反

  • Full Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R]

图解:

图片来自阿里云社区

inner join不用多说,看看 left join 输出结果示例:

+I[6, e, 6, 7]
+I[11, d, null, null]
+I[7, b, null, null]
+I[8, 0, 8, 3]
+I[13, 6, null, null]

关于 Interval Join 的注意事项:

  • 实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出

  •  outer join 不会产生回撤流,关联不上会在 State 过期时发送数据,会有延迟

Temporal Joins

这种关联方式同样是传统数据库没有的,但是会发现和数仓的拉链表Join有点类似

Temporal Join 支持和 Verisoned Table 进行关联,也支持 event time 和 processing time 两种语义,支持inner join 和 left join 两种方式

事件时间 ,在解决多版本问题时有奇效:

  1.  事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark

  2. 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中

--官网案例
CREATE TABLE orders (order_id    STRING,price       DECIMAL(32,2),currency    STRING,order_time  TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (/* ... */);-- 必须定义一个 versioned table
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka'/* ... */
);SELECT order_id,price,orders.currency,conversion_rate,order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;order_id  price  currency  conversion_rate  order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00

Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据,然后根据左表Orders的事件时间关联到对应时间的 Versioned Table 的汇率

Processing Time,由于是处理时间,只维护了最新的状态数据,不需要关心历史版本的数据,直接根据LeftTable数据到达的时间关联最新的数据

另外还支持 Temporal Table Functionv Join,但是一般不怎么用(至少我基本不这样写)

SELECTo_amount, r_rate
FROMOrders,LATERAL TABLE (Rates(o_proctime))
WHEREr_currency = o_currency

Lookup Join

Lookup Join 通常用于关联外部系统数据(比如Mysql、Hbase等),但是目前只支持 processing time,只能以处理时间关联最新的数据(这个最新是有代价的)

实际用起来其实会发现功能上和 version table 的processing 类似

-- 官网案例,需要定义一个外部存储的表
CREATE TEMPORARY TABLE Customers (id INT,name STRING,country STRING,zip STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://mysqlhost:3306/customerdb','table-name' = 'customers'
);-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;

待办:lookup支持cache,cache的异步查询原理,数据更新的延迟,参数调优等等

Array Expansion

常见的用法就是类似Spark 的 lateral view expload(arr)

SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tagArray) AS t (tag)

Table Function 

其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,并且支持自定义函数

Window Joins

见 FlinkSql 窗口函数

语法示例:

SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,COALESCE(L.window_start, R.window_start) as window_start,COALESCE(L.window_end, R.window_end) as window_end
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L
INNER JOIN (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) R
ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
SELECT *
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L WHERE EXISTS (SELECT * FROM (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);

这篇关于详述FlinkSql Join操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/690122

相关文章

动手学深度学习【数据操作+数据预处理】

import osos.makedirs(os.path.join('.', 'data'), exist_ok=True)data_file = os.path.join('.', 'data', 'house_tiny.csv')with open(data_file, 'w') as f:f.write('NumRooms,Alley,Price\n') # 列名f.write('NA

线程的四种操作

所属专栏:Java学习        1. 线程的开启 start和run的区别: run:描述了线程要执行的任务,也可以称为线程的入口 start:调用系统函数,真正的在系统内核中创建线程(创建PCB,加入到链表中),此处的start会根据不同的系统,分别调用不同的api,创建好之后的线程,再单独去执行run(所以说,start的本质是调用系统api,系统的api

Java IO 操作——个人理解

之前一直Java的IO操作一知半解。今天看到一个便文章觉得很有道理( 原文章),记录一下。 首先,理解Java的IO操作到底操作的什么内容,过程又是怎么样子。          数据来源的操作: 来源有文件,网络数据。使用File类和Sockets等。这里操作的是数据本身,1,0结构。    File file = new File("path");   字

MySQL——表操作

目录 一、创建表 二、查看表 2.1 查看表中某成员的数据 2.2 查看整个表中的表成员 2.3 查看创建表时的句柄 三、修改表 alter 3.1 重命名 rename 3.2 新增一列 add 3.3 更改列属性 modify 3.4 更改列名称 change 3.5 删除某列 上一篇博客介绍了库的操作,接下来来看一下表的相关操作。 一、创建表 create

封装MySQL操作时Where条件语句的组织

在对数据库进行封装的过程中,条件语句应该是相对难以处理的,毕竟条件语句太过于多样性。 条件语句大致分为以下几种: 1、单一条件,比如:where id = 1; 2、多个条件,相互间关系统一。比如:where id > 10 and age > 20 and score < 60; 3、多个条件,相互间关系不统一。比如:where (id > 10 OR age > 20) AND sco

PHP7扩展开发之流操作

前言 啥是流操作?简单来讲就是对一些文件,网络的IO操作。PHP已经把这些IO操作,封装成流操作。这节,我们将使用PHP扩展实现一个目录遍历的功能。PHP示例代码如下: <?phpfunction list_dir($dir) {if (is_dir($dir) === false) {return;} $dh = opendir($dir);if ($dh == false) {ret

浙大数据结构:树的定义与操作

四种遍历 #include<iostream>#include<queue>using namespace std;typedef struct treenode *BinTree;typedef BinTree position;typedef int ElementType;struct treenode{ElementType data;BinTree left;BinTre

浙大数据结构:04-树7 二叉搜索树的操作集

这道题答案都在PPT上,所以先学会再写的话并不难。 1、BinTree Insert( BinTree BST, ElementType X ) 递归实现,小就进左子树,大就进右子树。 为空就新建结点插入。 BinTree Insert( BinTree BST, ElementType X ){if(!BST){BST=(BinTree)malloc(sizeof(struct TNo

hibernate修改数据库已有的对象【简化操作】

陈科肇 直接上代码: /*** 更新新的数据并并未修改旧的数据* @param oldEntity 数据库存在的实体* @param newEntity 更改后的实体* @throws IllegalAccessException * @throws IllegalArgumentException */public void updateNew(T oldEntity,T newEntity

mysql中导入txt文件数据的操作指令

1 表tt的格式:    CREATE TABLE `tt` (   `ind` int NOT NULL auto_increment,   `name` char(100) default NULL,   PRIMARY KEY  (`ind`)  )   2 文件d.txt的内容示例:  1,a  2,b  3,c