【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

本文主要是介绍【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:表的join操作(内联接、外联接以及联接自定义函数等)


本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:表的join操作(内联接、外联接以及联接自定义函数等)

本部分介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。
关于自定义函数的联接将在flink 自定义函数中介绍,因为使用函数和联接本身关系不是非常密切。
19、Flink 的Table API 和 SQL 中的自定义函数(2)

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestTableAPIJoinOperationDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private double balance;private Long rowtime;}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Order {private long id;private long user_id;private double amount;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan", 18, 1698742358391L), new User(2L, "alan", 19, 1698742359396L), new User(3L, "alan", 25, 1698742360407L),new User(4L, "alanchan", 28, 1698742361409L), new User(5L, "alanchan", 29, 1698742362424L));final static List<Order> orderList = Arrays.asList(new Order(1L, 1, 18, 1698742358391L), new Order(2L, 2, 19, 1698742359396L), new Order(3L, 1, 25, 1698742360407L),new Order(4L, 3, 28, 1698742361409L), new Order(5L, 1, 29, 1698742362424L),new Order(6L, 4, 49, 1698742362424L));static void testInnerJoin() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));DataStream<Order> orders = env.fromCollection(orderList);Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));Table result = left.join(right).where($("user_id").isEqual($("userId"))).select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);resultDS.print();
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])env.execute();}/*** 和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。* @throws Exception*/static void testOuterJoin() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));DataStream<Order> orders = env.fromCollection(orderList);Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));Table leftOuterResult = left.leftOuterJoin(right, $("user_id").isEqual($("userId"))).select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));DataStream<Tuple2<Boolean, Row>> leftOuterResultDS = tenv.toRetractStream(leftOuterResult, Row.class);
//		leftOuterResultDS.print();
//		12> (true,+I[null, null, null, null, alan, 18])
//		3> (true,+I[null, null, null, null, alanchan, 28])
//		12> (false,-D[null, null, null, null, alan, 18])
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[null, null, null, null, alan, 19])
//		3> (false,-D[null, null, null, null, alanchan, 28])
//		12> (false,-D[null, null, null, null, alan, 19])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		3> (true,+I[null, null, null, null, alanchan, 29])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])Table rightOuterResult = left.rightOuterJoin(right, $("user_id").isEqual($("userId"))).select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));DataStream<Tuple2<Boolean, Row>> rightOuterResultDS = tenv.toRetractStream(rightOuterResult, Row.class);
//		rightOuterResultDS.print();
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])Table fullOuterResult = left.fullOuterJoin(right, $("user_id").isEqual($("userId"))).select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));DataStream<Tuple2<Boolean, Row>> fullOuterResultDS = tenv.toRetractStream(fullOuterResult, Row.class);fullOuterResultDS.print();
//		3> (true,+I[6, 4, 49.0, 1698742362424, null, null])
//		12> (true,+I[1, 1, 18.0, 1698742358391, null, null])
//		15> (true,+I[4, 3, 28.0, 1698742361409, null, null])
//		12> (false,-D[1, 1, 18.0, 1698742358391, null, null])
//		3> (false,-D[6, 4, 49.0, 1698742362424, null, null])
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18])
//		3> (true,+I[6, 4, 49.0, 1698742362424, alanchan, 28])
//		3> (true,+I[null, null, null, null, alanchan, 29])
//		12> (true,+I[2, 2, 19.0, 1698742359396, null, null])
//		12> (false,-D[2, 2, 19.0, 1698742359396, null, null])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19])
//		15> (false,-D[4, 3, 28.0, 1698742361409, null, null])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18])
//		15> (true,+I[4, 3, 28.0, 1698742361409, alan, 25])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18])env.execute();}/*** Interval join 是可以通过流模式处理的常规 join 的子集。* Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。* 这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。* @throws Exception*/static void testIntervalJoin() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));DataStream<Order> orders = env.fromCollection(orderList);Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));Table right = ordersTable.select($("id").as("orderId"), $("user_id"), $("amount"),$("rowtime").as("o_rowtime"));Table result = left.join(right).where(and($("user_id").isEqual($("userId")),$("user_id").isLess(3)
//					        $("u_rowtime").isGreaterOrEqual($("o_rowtime").minus(lit(5).minutes())),
//					        $("u_rowtime").isLess($("o_rowtime").plus(lit(10).minutes())))).select($("orderId"), $("user_id"), $("amount"),$("o_rowtime"),$("name"),$("balance"));result.printSchema();DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);resultDS.print();
//		12> (true,+I[1, 1, 18.0, 1698742358391, alan, 18.0])
//		12> (true,+I[2, 2, 19.0, 1698742359396, alan, 19.0])
//		12> (true,+I[3, 1, 25.0, 1698742360407, alan, 18.0])
//		12> (true,+I[5, 1, 29.0, 1698742362424, alan, 18.0])env.execute();}/*** join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。 * 如果表函数调用返回空结果,则删除左侧(外部)表的一行。* 该示例为示例性的,具体的验证将在自定义函数中进行说明* * @throws Exception*/static void testInnerJoinWithUDTF() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 注册 User-Defined Table FunctionTableFunction<Tuple3<String,String,String>> split = new SplitFunction();tenv.registerFunction("split", split);// joinDataStream<Order> orders = env.fromCollection(orderList);Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));Table result = ordersTable.joinLateral(call("split", $("c")).as("s", "t", "v")).select($("a"), $("b"), $("s"), $("t"), $("v"));env.execute();}/*** join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。* 如果表函数调用返回空结果,则保留相应的 outer(外部连接)行并用空值填充右侧结果。* 目前,表函数左外连接的谓词只能为空或字面(常量)真。* 该示例为示例性的,具体的验证将在自定义函数中进行说明* * @throws Exception*/static void testLeftOuterJoinWithUDTF() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 注册 User-Defined Table FunctionTableFunction<Tuple3<String,String,String>> split = new SplitFunction();tenv.registerFunction("split", split);// joinDataStream<Order> orders = env.fromCollection(orderList);Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));Table result = ordersTable.leftOuterJoinLateral(call("split", $("c")).as("s", "t", "v")).select($("a"), $("b"), $("s"), $("t"), $("v"));env.execute();}/*** Temporal table 是跟踪随时间变化的表。* Temporal table 函数提供对特定时间点 temporal table 状态的访问。* 表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。* 目前仅支持与 temporal table 的 inner join。* * @throws Exception*/static void testJoinWithTemporalTable() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);Table ratesHistory = tenv.from("RatesHistory");// 注册带有时间属性和主键的 temporal table functionTemporalTableFunction rates = ratesHistory.createTemporalTableFunction($("r_proctime"),$("r_currency"));tenv.registerFunction("rates", rates);// 基于时间属性和键与“Orders”表关联Table orders = tenv.from("Orders");Table result = orders.joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")));env.execute();}/*** @param args* @throws Exception */public static void main(String[] args) throws Exception {
//		testInnerJoin();
//		testOuterJoin();
//		testIntervalJoin();testInnerJoinWithUDTF();}static class SplitFunction extends TableFunction<Tuple3<String,String,String>>{public void eval(Tuple3<String,String,String> tp) {//		    for (String s : str.split(",")) {
//		      // use collect(...) to emit a rowcollect(Row.of(s, s.length()));
//		    }}}
}

以上,本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

这篇关于【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/553865

相关文章

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

hdu1171(母函数或多重背包)

题意:把物品分成两份,使得价值最接近 可以用背包,或者是母函数来解,母函数(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v) 其中指数为价值,每一项的数目为(该物品数+1)个 代码如下: #include<iostream>#include<algorithm>

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

C++操作符重载实例(独立函数)

C++操作符重载实例,我们把坐标值CVector的加法进行重载,计算c3=c1+c2时,也就是计算x3=x1+x2,y3=y1+y2,今天我们以独立函数的方式重载操作符+(加号),以下是C++代码: c1802.cpp源代码: D:\YcjWork\CppTour>vim c1802.cpp #include <iostream>using namespace std;/*** 以独立函数

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)

Oracle type (自定义类型的使用)

oracle - type   type定义: oracle中自定义数据类型 oracle中有基本的数据类型,如number,varchar2,date,numeric,float....但有时候我们需要特殊的格式, 如将name定义为(firstname,lastname)的形式,我们想把这个作为一个表的一列看待,这时候就要我们自己定义一个数据类型 格式 :create or repla

zeroclipboard 粘贴板的应用示例, 兼容 Chrome、IE等多浏览器

zeroclipboard单个复制按钮和多个复制按钮的实现方法 最近网站改版想让复制代码功能在多个浏览器上都可以实现,最近看网上不少说我们的代码复制功能不好用的,我们最近将会增加代码高亮等功能,希望大家多多支持我们 zeroclipboard是一个跨浏览器的库类 它利用 Flash 进行复制,所以只要浏览器装有 Flash 就可以运行,而且比 IE 的

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大,但不同的语言在不同领域都有着自己的优势,为了强强联合,LabVIEW提供了强大的外部程序接口能力,包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等。通过DLL可以使用户很方便地调用C、C++、C#、VB等编程语言写的程序以及windows自带的大

动手学深度学习【数据操作+数据预处理】

import osos.makedirs(os.path.join('.', 'data'), exist_ok=True)data_file = os.path.join('.', 'data', 'house_tiny.csv')with open(data_file, 'w') as f:f.write('NumRooms,Alley,Price\n') # 列名f.write('NA