flink udf 介绍

2024-08-28 14:58
文章标签 介绍 flink udf

本文主要是介绍flink udf 介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ScalarFunction:标量函数是实现将0,1,或者多个标量值转化为一个新值TableFunction:一个输入多个行或者多个列AggregateFunction:多个输入一个输出package org.fuwushe.sql;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.fuwushe.sql.udf.FromUnixTimeUDF;
import org.fuwushe.sql.udf.Split;import java.util.Iterator;public class SqlUdfTest {public static void main(String []args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);tableEnv.registerFunction("split", new Split("#"));tableEnv.registerFunction("from_unixtime", new FromUnixTimeUDF());tableEnv.registerFunction("wAvg", new WeightedAvg());DataSet<String> input = env.readTextFile("/load/data/udf.txt");DataSet<UdfData> topInput = input.map(new MapFunction<String,UdfData>() {@Overridepublic UdfData map(String s) throws Exception {return JSONObject.parseObject(s,UdfData.class);}});Table udfTable = tableEnv.fromDataSet(topInput);tableEnv.registerTable("udf_table", udfTable);//ScalarFunctionTable udfResult = tableEnv.sqlQuery(" select from_unixtime(`time`) as creatTime,itemId FROM udf_table order by  creatTime desc  ");tableEnv.toDataSet(udfResult, UdfResult.class).print();//TableFunctionTable udtfResult1 =  tableEnv.sqlQuery("SELECT action, word, length FROM udf_table, LATERAL TABLE(split(action)) as T(word, length)");Table udtfResult2 =  tableEnv.sqlQuery("SELECT  action, word, length FROM udf_table LEFT JOIN LATERAL TABLE(split(action)) as T(word, length) ON TRUE");tableEnv.toDataSet(udtfResult1, UdtfResult.class).print();tableEnv.toDataSet(udtfResult2, UdtfResult.class).print();//AggregateFunction 6 1Table udafResult = tableEnv.sqlQuery("SELECT itemId, wAvg(price,wegiht) AS avgPoints FROM udf_table GROUP BY itemId");tableEnv.toDataSet(udafResult, UdafResult.class).print();}/*** Accumulator for WeightedAvg.*/public static class WeightedAvgAccum {public long sum = 0;public int count = 0;}/*** Weighted Average user-defined aggregate function.*/public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() {return new WeightedAvgAccum();}@Overridepublic Long getValue(WeightedAvgAccum acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {Iterator<WeightedAvgAccum> iter = it.iterator();while (iter.hasNext()) {WeightedAvgAccum a = iter.next();acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccum acc) {acc.count = 0;acc.sum = 0L;}}public static class UdafResult {public UdafResult() {super();}public String itemId;public long avgPoints;public UdafResult(String itemId, long avgPoints) {this.itemId = itemId;this.avgPoints = avgPoints;}@Overridepublic String toString() {return "UdafResult{" + "itemId='" + itemId + '\'' + ", avgPoints=" + avgPoints + '}';}}public static class UdtfResult {public UdtfResult() {super();}public String action;public String word;public int length;public UdtfResult(String action, String word, int length) {this.action = action;this.word = word;this.length = length;}@Overridepublic String toString() {return "UdtfResult{" + "action='" + action + '\'' + ", word='" + word + '\'' + ", length=" + length + '}';}}public static class UdfResult {public UdfResult() {super();}public String itemId;public String creatTime;public UdfResult(String itemId, String creatTime) {this.itemId = itemId;this.creatTime = creatTime;}@Overridepublic String toString() {return "Result{" + "itemId='" + itemId + '\'' + ", creatTime='" + creatTime + '\'' + '}';}}public static class UdfData {public UdfData(String action, String itemId, String time, String unionId, Integer rankIndex, Integer wegiht,long price) {this.action = action;this.itemId = itemId;this.time = time;this.unionId = unionId;this.rankIndex = rankIndex;this.wegiht = wegiht;this.price = price;}public String action;public String itemId;public String time;public String unionId;public Integer rankIndex;public Integer wegiht;public long price;public UdfData() {super();}}
}

这篇关于flink udf 介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115087

相关文章

Pytest多环境切换的常见方法介绍

《Pytest多环境切换的常见方法介绍》Pytest作为自动化测试的主力框架,如何实现本地、测试、预发、生产环境的灵活切换,本文总结了通过pytest框架实现自由环境切换的几种方法,大家可以根据需要进... 目录1.pytest-base-url2.hooks函数3.yml和fixture结论你是否也遇到过

MySQL中慢SQL优化的不同方式介绍

《MySQL中慢SQL优化的不同方式介绍》慢SQL的优化,主要从两个方面考虑,SQL语句本身的优化,以及数据库设计的优化,下面小编就来给大家介绍一下有哪些方式可以优化慢SQL吧... 目录避免不必要的列分页优化索引优化JOIN 的优化排序优化UNION 优化慢 SQL 的优化,主要从两个方面考虑,SQL 语

C++中函数模板与类模板的简单使用及区别介绍

《C++中函数模板与类模板的简单使用及区别介绍》这篇文章介绍了C++中的模板机制,包括函数模板和类模板的概念、语法和实际应用,函数模板通过类型参数实现泛型操作,而类模板允许创建可处理多种数据类型的类,... 目录一、函数模板定义语法真实示例二、类模板三、关键区别四、注意事项 ‌在C++中,模板是实现泛型编程

Python实现html转png的完美方案介绍

《Python实现html转png的完美方案介绍》这篇文章主要为大家详细介绍了如何使用Python实现html转png功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 1.增强稳定性与错误处理建议使用三层异常捕获结构:try: with sync_playwright(

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir

JAVA SE包装类和泛型详细介绍及说明方法

《JAVASE包装类和泛型详细介绍及说明方法》:本文主要介绍JAVASE包装类和泛型的相关资料,包括基本数据类型与包装类的对应关系,以及装箱和拆箱的概念,并重点讲解了自动装箱和自动拆箱的机制,文... 目录1. 包装类1.1 基本数据类型和对应的包装类1.2 装箱和拆箱1.3 自动装箱和自动拆箱2. 泛型2

四种Flutter子页面向父组件传递数据的方法介绍

《四种Flutter子页面向父组件传递数据的方法介绍》在Flutter中,如果父组件需要调用子组件的方法,可以通过常用的四种方式实现,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录方法 1:使用 GlobalKey 和 State 调用子组件方法方法 2:通过回调函数(Callb

Python进阶之Excel基本操作介绍

《Python进阶之Excel基本操作介绍》在现实中,很多工作都需要与数据打交道,Excel作为常用的数据处理工具,一直备受人们的青睐,本文主要为大家介绍了一些Python中Excel的基本操作,希望... 目录概述写入使用 xlwt使用 XlsxWriter读取修改概述在现实中,很多工作都需要与数据打交

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Python实现NLP的完整流程介绍

《Python实现NLP的完整流程介绍》这篇文章主要为大家详细介绍了Python实现NLP的完整流程,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 编程安装和导入必要的库2. 文本数据准备3. 文本预处理3.1 小写化3.2 分词(Tokenizatio