Flink SQL自定义表值函数(Table Function)

2023-11-10 11:20

本文主要是介绍Flink SQL自定义表值函数(Table Function),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** 输入数据:* nc -lk 8888* a,bb,cc* * 输出结果:* * res1=>:5> +I[a,bb,cc, a, 1]* res1=>:7> +I[a,bb,cc, cc, 2]* res1=>:6> +I[a,bb,cc, bb, 2]* res8=>:4> +I[a,bb,cc, a, 1]* res8=>:5> +I[a,bb,cc, bb, 2]* res8=>:6> +I[a,bb,cc, cc, 2]* res4=>:3> +I[a,bb,cc, cc, 2]* res4=>:1> +I[a,bb,cc, a, 1]* res4=>:2> +I[a,bb,cc, bb, 2]* res7=>:8> +I[a,bb,cc, bb, 2]* res7=>:1> +I[a,bb,cc, cc, 2]* res7=>:7> +I[a,bb,cc, a, 1]* res2=>:2> +I[a,bb,cc, cc, 2]* res2=>:8> +I[a,bb,cc, a, 1]* res2=>:1> +I[a,bb,cc, bb, 2]* res6=>:1> +I[a,bb,cc, cc, 2]* res6=>:7> +I[a,bb,cc, a, 1]* res6=>:8> +I[a,bb,cc, bb, 2]* res3=>:6> +I[a,bb,cc, bb, 2]* res3=>:7> +I[a,bb,cc, cc, 2]* res3=>:5> +I[a,bb,cc, a, 1]* res5=>:7> +I[a,bb,cc, bb, 2]* res5=>:8> +I[a,bb,cc, cc, 2]* res5=>:6> +I[a,bb,cc, a, 1]*/
public class TableFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);Table table = tEnv.fromDataStream(source, "field");tEnv.createTemporaryView("SourceTable", table);// 在 Table API ⾥可以直接调⽤ UDFTable res1 = tEnv.from("SourceTable").joinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));Table res2 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));// 在 Table API ⾥重命名 UDF 的结果字段Table res3 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).as("myField", "newWord", "newLength").select($("myField"), $("newWord"), $("newLength"));// 注册函数tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);// 在 Table API ⾥调⽤注册好的 UDFTable res4 = tEnv.from("SourceTable").joinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));Table res5 = tEnv.from("SourceTable").leftOuterJoinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));// 在 SQL ⾥调⽤注册好的 UDFTable res6 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable, LATERAL TABLE(SplitFunction(field))");Table res7 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");// 在 SQL ⾥重命名 UDF 字段Table res8 = tEnv.sqlQuery("SELECT field, newWord, newLength " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");tEnv.toDataStream(res4).print("res4=>");tEnv.toDataStream(res5).print("res5=>");tEnv.toDataStream(res6).print("res6=>");tEnv.toDataStream(res7).print("res7=>");tEnv.toDataStream(res8).print("res8=>");env.execute();}@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(",")) {// 输出结果collect(Row.of(s, s.length()));}}}
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

在这里插入图片描述

这篇关于Flink SQL自定义表值函数(Table Function)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/382321

相关文章

C/C++错误信息处理的常见方法及函数

《C/C++错误信息处理的常见方法及函数》C/C++是两种广泛使用的编程语言,特别是在系统编程、嵌入式开发以及高性能计算领域,:本文主要介绍C/C++错误信息处理的常见方法及函数,文中通过代码介绍... 目录前言1. errno 和 perror()示例:2. strerror()示例:3. perror(

Ubuntu中远程连接Mysql数据库的详细图文教程

《Ubuntu中远程连接Mysql数据库的详细图文教程》Ubuntu是一个以桌面应用为主的Linux发行版操作系统,这篇文章主要为大家详细介绍了Ubuntu中远程连接Mysql数据库的详细图文教程,有... 目录1、版本2、检查有没有mysql2.1 查询是否安装了Mysql包2.2 查看Mysql版本2.

基于SpringBoot+Mybatis实现Mysql分表

《基于SpringBoot+Mybatis实现Mysql分表》这篇文章主要为大家详细介绍了基于SpringBoot+Mybatis实现Mysql分表的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录基本思路定义注解创建ThreadLocal创建拦截器业务处理基本思路1.根据创建时间字段按年进

Python3.6连接MySQL的详细步骤

《Python3.6连接MySQL的详细步骤》在现代Web开发和数据处理中,Python与数据库的交互是必不可少的一部分,MySQL作为最流行的开源关系型数据库管理系统之一,与Python的结合可以实... 目录环境准备安装python 3.6安装mysql安装pymysql库连接到MySQL建立连接执行S

Kotlin 作用域函数apply、let、run、with、also使用指南

《Kotlin作用域函数apply、let、run、with、also使用指南》在Kotlin开发中,作用域函数(ScopeFunctions)是一组能让代码更简洁、更函数式的高阶函数,本文将... 目录一、引言:为什么需要作用域函数?二、作用域函China编程数详解1. apply:对象配置的 “流式构建器”最

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp