flink的自动类型推导:解决udf的通用类型问题

2024-03-14 20:04

本文主要是介绍flink的自动类型推导:解决udf的通用类型问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题背景

一开始编写了一个udf函数:

public class ArrayContains extends ScalarFunction {private static final int EXIST = 1;private static final int NOT_EXIST = -1;// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中public static int eval(List<Integer> array, List<Integer> targetArray) {if (CollectionUtils.isEmpty(array)) {return NOT_EXIST;}for (Object target : targetArray) {if (array.contains(target)) {return EXIST;}}return NOT_EXIST;}

功能其实很简单:判断数组中是否包含特定内容,包含任意之一就返回1,否则返回-1

之前的参数类型都是List<Integer>

然后新的需求来了:需要传入的参数类型是List<String>

显然,我们不能新建一个udf来处理List<String>的情况,但是如果我们简单改写为:

public class ArrayContains extends ScalarFunction {private static final int EXIST = 1;private static final int NOT_EXIST = -1;// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中public static int eval(List<Object> array, List<Object> targetArray) {if (CollectionUtils.isEmpty(array)) {return NOT_EXIST;}for (Object target : targetArray) {if (array.contains(target)) {return EXIST;}}return NOT_EXIST;}

会报错:

Could not extract a data type from 'java.util.List<java.lang.Object>' in parameter 0 of method 'eval' in class 'dp.udf.ArrayContains'. Please pass the required data type manually or allow RAW types

Cannot extract a data type from a pure 'java.lang.Object' class. Usually, this indicates that class information is missing or got lost. Please specify a more concrete class or treat it as a RAW type.

所以就要用到flink的自动类型推导,具体来说,有时我们希望一种求值方法可以同时处理多种数据类型,有时又要求对重载的多个求值方法仅声明一次通用的结果类型,就可以用@FunctionHint 注解来提供从入参数据类型到结果数据类型的映射。

(详细具体可以查看:自定义函数 | Apache Flink)

解决思路

具体udf修改如下:


import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.ScalarFunction;
import org.junit.Test;import java.util.Arrays;
import java.util.List;import static org.junit.Assert.assertEquals;/*** 判断数组中是否包含特定内容,包含任意之一就返回1,否则返回-1* flink1.13版本还没有ARRAY_CONTAINS内置函数*/
@FunctionHint(input = {@DataTypeHint("ARRAY<STRING>"), @DataTypeHint("ARRAY<Int>")}, output = @DataTypeHint("Int"))
@FunctionHint(input = {@DataTypeHint("ARRAY<Int>"), @DataTypeHint("ARRAY<Int>")}, output = @DataTypeHint("Int"))
public class ArrayContains extends ScalarFunction {private static final int EXIST = 1;private static final int NOT_EXIST = -1;// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中public static int eval(Object[] array, Object[] targetArray) {if (array == null) {return NOT_EXIST;}List arrayList = Arrays.asList(array);if (CollectionUtils.isEmpty(arrayList)) {return NOT_EXIST;}for (Object target : targetArray) {if (arrayList.contains(target)) {return EXIST;}}return NOT_EXIST;}

其实就是增加了类上面的 FunctionHint声明

验证

写了个几个测试用例,全部通过:

@Test
public void test() {assertEquals(eval(null, new Integer[]{101, 1}), NOT_EXIST);assertEquals(eval(new Integer[]{}, new Integer[]{101, 1}), NOT_EXIST);assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{101, 1}), EXIST);assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{1, 2}), EXIST);assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{3}), NOT_EXIST);assertEquals(eval(new Integer[]{2}, new Integer[]{1, 2, 3, 4, 5, 6, 8, 100, 101}), EXIST);assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{1, 2, 3, 4, 5, 6, 8, 100, 101}), EXIST);assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{99, 101}), EXIST);assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{99}), NOT_EXIST);assertEquals(eval(null, new String[]{"1", "101"}), NOT_EXIST);assertEquals(eval(new String[]{}, new String[]{"1", "101"}), NOT_EXIST);assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"1", "101"}), EXIST);assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"1", "2"}), EXIST);assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"3"}), NOT_EXIST);assertEquals(eval(new String[]{"2"}, new String[]{"1", "2", "3", "4", "5", "6", "8", "100", "101"}), EXIST);assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"1", "2", "3", "4", "5", "6", "8", "100", "101"}), EXIST);assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"101", "99"}), EXIST);assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"99"}), NOT_EXIST);}

这篇关于flink的自动类型推导:解决udf的通用类型问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/809572

相关文章

linux生产者,消费者问题

pthread_cond_wait() :用于阻塞当前线程,等待别的线程使用pthread_cond_signal()或pthread_cond_broadcast来唤醒它。 pthread_cond_wait() 必须与pthread_mutex 配套使用。pthread_cond_wait()函数一进入wait状态就会自动release mutex。当其他线程通过pthread

问题:第一次世界大战的起止时间是 #其他#学习方法#微信

问题:第一次世界大战的起止时间是 A.1913 ~1918 年 B.1913 ~1918 年 C.1914 ~1918 年 D.1914 ~1919 年 参考答案如图所示

2024.6.24 IDEA中文乱码问题(服务器 控制台 TOMcat)实测已解决

1.问题产生原因: 1.文件编码不一致:如果文件的编码方式与IDEA设置的编码方式不一致,就会产生乱码。确保文件和IDEA使用相同的编码,通常是UTF-8。2.IDEA设置问题:检查IDEA的全局编码设置和项目编码设置是否正确。3.终端或控制台编码问题:如果你在终端或控制台看到乱码,可能是终端的编码设置问题。确保终端使用的是支持你的文件的编码方式。 2.解决方案: 1.File -> S

vcpkg安装opencv中的特殊问题记录(无法找到opencv_corexd.dll)

我是按照网上的vcpkg安装opencv方法进行的(比如这篇:从0开始在visual studio上安装opencv(超详细,针对小白)),但是中间出现了一些别人没有遇到的问题,虽然原因没有找到,但是本人给出一些暂时的解决办法: 问题1: 我在安装库命令行使用的是 .\vcpkg.exe install opencv 我的电脑是x64,vcpkg在这条命令后默认下载的也是opencv2:x6

问题-windows-VPN不正确关闭导致网页打不开

为什么会发生这类事情呢? 主要原因是关机之前vpn没有关掉导致的。 至于为什么没关掉vpn会导致网页打不开,我猜测是因为vpn建立的链接没被更改。 正确关掉vpn的时候,会把ip链接断掉,如果你不正确关掉,ip链接没有断掉,此时你vpn又是没启动的,没有域名解析,所以就打不开网站。 你可以在打不开网页的时候,把vpn打开,你会发现网络又可以登录了。 方法一 注意:方法一虽然方便,但是可能会有

vue同页面多路由懒加载-及可能存在问题的解决方式

先上图,再解释 图一是多路由页面,图二是路由文件。从图一可以看出每个router-view对应的name都不一样。从图二可以看出层路由对应的组件加载方式要跟图一中的name相对应,并且图二的路由层在跟图一对应的页面中要加上components层,多一个s结尾,里面的的方法名就是图一路由的name值,里面还可以照样用懒加载的方式。 页面上其他的路由在路由文件中也跟图二是一样的写法。 附送可能存在

vue+elementui分页输入框回车与页面中@keyup.enter事件冲突解决

解决这个问题的思路只要判断事件源是哪个就好。el分页的回车触发事件是在按下时,抬起并不会再触发。而keyup.enter事件是在抬起时触发。 so,找不到分页的回车事件那就拿keyup.enter事件搞事情。只要判断这个抬起事件的$event中的锚点样式判断不等于分页特有的样式就可以了 @keyup.enter="allKeyup($event)" //页面上的//js中allKeyup(e

vue+elementui--$message提示框被dialog遮罩层挡住问题解决

最近碰到一个先执行this.$message提示内容,然后接着弹出dialog带遮罩层弹框。那么问题来了,message提示框会默认被dialog遮罩层挡住,现在就是要解决这个问题。 由于都是弹框,问题肯定是出在z-index比重问题。由于用$message方式是写在js中而不是写在html中所以不是很好直接去改样式。 不过好在message组件中提供了customClass 属性,我们可以利用

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

Visual Studio中,MSBUild版本问题

假如项目规定了MSBUild版本,那么在安装完Visual Studio后,假如带的MSBUild版本与项目要求的版本不符合要求,那么可以把需要的MSBUild添加到系统中,然后即可使用。步骤如下:            假如项目需要使用V12的MSBUild,而安装的Visual Studio带的MSBUild版本为V14。 ①到MSDN下载V12 MSBUild包,把V12包解压到目录(