【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

本文主要是介绍【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

  • 1)函数类(Function Classes)
  • 2)富函数类(Rich Function Classes)

用户自定义函数(user-defined functionUDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类匿名函数富函数类

1)函数类(Function Classes)

Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunctionFilterFunctionReduceFunction 等。所以用户可以自定义一个函数类,实现对应的接口。

需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

方式一:实现 FilterFunction 接口

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> filter = stream.filter(new UserFilter());filter.print();env.execute();}public static class UserFilter implementsFilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}

方式二:通过匿名类来实现 FilterFunction 接口

DataStream<String> stream = stream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}});

方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> stream = stream.filter(newFilterFunctionImpl("sensor_1"));public static class FilterFunctionImpl implementsFilterFunction<WaterSensor> {private String id;FilterFunctionImpl(String id) {this.id = id;}@Overridepublic boolean filter(WaterSensor value) throws Exception {return thid.id.equals(value.id);}}}

方式三:采用匿名函数(Lambda)

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));
//map 函数使用 Lambda 表达式,不需要进行类型声明SingleOutputStreamOperator<String> filter =stream.filter(sensor -> "sensor_1".equals(sensor.id));filter.print();env.execute();}
}

2)富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版 本 。 富函数类一般是以抽象类的形式出现的。例如:RichMapFunctionRichFilterFunctionRichReduceFunction 等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open() 方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map() 或者 filter() 方法被调用之前,open() 会首先被调用。

  • close() 方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

来看一个例子说明:

public class RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1, 2, 3, 4).map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic Integer map(Integer integer) throwsException {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}}).print();env.execute();}
}

这篇关于【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636000

相关文章

Elasticsearch 在 Java 中的使用教程

《Elasticsearch在Java中的使用教程》Elasticsearch是一个分布式搜索和分析引擎,基于ApacheLucene构建,能够实现实时数据的存储、搜索、和分析,它广泛应用于全文... 目录1. Elasticsearch 简介2. 环境准备2.1 安装 Elasticsearch2.2 J

Linux系统中卸载与安装JDK的详细教程

《Linux系统中卸载与安装JDK的详细教程》本文详细介绍了如何在Linux系统中通过Xshell和Xftp工具连接与传输文件,然后进行JDK的安装与卸载,安装步骤包括连接Linux、传输JDK安装包... 目录1、卸载1.1 linux删除自带的JDK1.2 Linux上卸载自己安装的JDK2、安装2.1

Kotlin 作用域函数apply、let、run、with、also使用指南

《Kotlin作用域函数apply、let、run、with、also使用指南》在Kotlin开发中,作用域函数(ScopeFunctions)是一组能让代码更简洁、更函数式的高阶函数,本文将... 目录一、引言:为什么需要作用域函数?二、作用域函China编程数详解1. apply:对象配置的 “流式构建器”最

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

Linux卸载自带jdk并安装新jdk版本的图文教程

《Linux卸载自带jdk并安装新jdk版本的图文教程》在Linux系统中,有时需要卸载预装的OpenJDK并安装特定版本的JDK,例如JDK1.8,所以本文给大家详细介绍了Linux卸载自带jdk并... 目录Ⅰ、卸载自带jdkⅡ、安装新版jdkⅠ、卸载自带jdk1、输入命令查看旧jdkrpm -qa

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义

Java使用Curator进行ZooKeeper操作的详细教程

《Java使用Curator进行ZooKeeper操作的详细教程》ApacheCurator是一个基于ZooKeeper的Java客户端库,它极大地简化了使用ZooKeeper的开发工作,在分布式系统... 目录1、简述2、核心功能2.1 CuratorFramework2.2 Recipes3、示例实践3

springboot简单集成Security配置的教程

《springboot简单集成Security配置的教程》:本文主要介绍springboot简单集成Security配置的教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录集成Security安全框架引入依赖编写配置类WebSecurityConfig(自定义资源权限规则

在java中如何将inputStream对象转换为File对象(不生成本地文件)

《在java中如何将inputStream对象转换为File对象(不生成本地文件)》:本文主要介绍在java中如何将inputStream对象转换为File对象(不生成本地文件),具有很好的参考价... 目录需求说明问题解决总结需求说明在后端中通过POI生成Excel文件流,将输出流(outputStre

MySQL Workbench 安装教程(保姆级)

《MySQLWorkbench安装教程(保姆级)》MySQLWorkbench是一款强大的数据库设计和管理工具,本文主要介绍了MySQLWorkbench安装教程,文中通过图文介绍的非常详细,对大... 目录前言:详细步骤:一、检查安装的数据库版本二、在官网下载对应的mysql Workbench版本,要是