【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

本文主要是介绍【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

  • 1)函数类(Function Classes)
  • 2)富函数类(Rich Function Classes)

用户自定义函数(user-defined functionUDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类匿名函数富函数类

1)函数类(Function Classes)

Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunctionFilterFunctionReduceFunction 等。所以用户可以自定义一个函数类,实现对应的接口。

需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

方式一:实现 FilterFunction 接口

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> filter = stream.filter(new UserFilter());filter.print();env.execute();}public static class UserFilter implementsFilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}

方式二:通过匿名类来实现 FilterFunction 接口

DataStream<String> stream = stream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}});

方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> stream = stream.filter(newFilterFunctionImpl("sensor_1"));public static class FilterFunctionImpl implementsFilterFunction<WaterSensor> {private String id;FilterFunctionImpl(String id) {this.id = id;}@Overridepublic boolean filter(WaterSensor value) throws Exception {return thid.id.equals(value.id);}}}

方式三:采用匿名函数(Lambda)

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));
//map 函数使用 Lambda 表达式,不需要进行类型声明SingleOutputStreamOperator<String> filter =stream.filter(sensor -> "sensor_1".equals(sensor.id));filter.print();env.execute();}
}

2)富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版 本 。 富函数类一般是以抽象类的形式出现的。例如:RichMapFunctionRichFilterFunctionRichReduceFunction 等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open() 方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map() 或者 filter() 方法被调用之前,open() 会首先被调用。

  • close() 方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

来看一个例子说明:

public class RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1, 2, 3, 4).map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic Integer map(Integer integer) throwsException {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}}).print();env.execute();}
}

这篇关于【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636000

相关文章

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

MySQL count()聚合函数详解

《MySQLcount()聚合函数详解》MySQL中的COUNT()函数,它是SQL中最常用的聚合函数之一,用于计算表中符合特定条件的行数,本文给大家介绍MySQLcount()聚合函数,感兴趣的朋... 目录核心功能语法形式重要特性与行为如何选择使用哪种形式?总结深入剖析一下 mysql 中的 COUNT

MySQL 中 ROW_NUMBER() 函数最佳实践

《MySQL中ROW_NUMBER()函数最佳实践》MySQL中ROW_NUMBER()函数,作为窗口函数为每行分配唯一连续序号,区别于RANK()和DENSE_RANK(),特别适合分页、去重... 目录mysql 中 ROW_NUMBER() 函数详解一、基础语法二、核心特点三、典型应用场景1. 数据分

MySQL数据库的内嵌函数和联合查询实例代码

《MySQL数据库的内嵌函数和联合查询实例代码》联合查询是一种将多个查询结果组合在一起的方法,通常使用UNION、UNIONALL、INTERSECT和EXCEPT关键字,下面:本文主要介绍MyS... 目录一.数据库的内嵌函数1.1聚合函数COUNT([DISTINCT] expr)SUM([DISTIN

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实

Python get()函数用法案例详解

《Pythonget()函数用法案例详解》在Python中,get()是字典(dict)类型的内置方法,用于安全地获取字典中指定键对应的值,它的核心作用是避免因访问不存在的键而引发KeyError错... 目录简介基本语法一、用法二、案例:安全访问未知键三、案例:配置参数默认值简介python是一种高级编

python 常见数学公式函数使用详解(最新推荐)

《python常见数学公式函数使用详解(最新推荐)》文章介绍了Python的数学计算工具,涵盖内置函数、math/cmath标准库及numpy/scipy/sympy第三方库,支持从基础算术到复杂数... 目录python 数学公式与函数大全1. 基本数学运算1.1 算术运算1.2 分数与小数2. 数学函数