【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

本文主要是介绍【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

  • 1)函数类(Function Classes)
  • 2)富函数类(Rich Function Classes)

用户自定义函数(user-defined functionUDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类匿名函数富函数类

1)函数类(Function Classes)

Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunctionFilterFunctionReduceFunction 等。所以用户可以自定义一个函数类,实现对应的接口。

需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

方式一:实现 FilterFunction 接口

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> filter = stream.filter(new UserFilter());filter.print();env.execute();}public static class UserFilter implementsFilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}

方式二:通过匿名类来实现 FilterFunction 接口

DataStream<String> stream = stream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}});

方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));DataStream<String> stream = stream.filter(newFilterFunctionImpl("sensor_1"));public static class FilterFunctionImpl implementsFilterFunction<WaterSensor> {private String id;FilterFunctionImpl(String id) {this.id = id;}@Overridepublic boolean filter(WaterSensor value) throws Exception {return thid.id.equals(value.id);}}}

方式三:采用匿名函数(Lambda)

public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));
//map 函数使用 Lambda 表达式,不需要进行类型声明SingleOutputStreamOperator<String> filter =stream.filter(sensor -> "sensor_1".equals(sensor.id));filter.print();env.execute();}
}

2)富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版 本 。 富函数类一般是以抽象类的形式出现的。例如:RichMapFunctionRichFilterFunctionRichReduceFunction 等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open() 方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map() 或者 filter() 方法被调用之前,open() 会首先被调用。

  • close() 方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

来看一个例子说明:

public class RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1, 2, 3, 4).map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic Integer map(Integer integer) throwsException {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}}).print();env.execute();}
}

这篇关于【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636000

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

hdu1171(母函数或多重背包)

题意:把物品分成两份,使得价值最接近 可以用背包,或者是母函数来解,母函数(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v) 其中指数为价值,每一项的数目为(该物品数+1)个 代码如下: #include<iostream>#include<algorithm>

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

C++操作符重载实例(独立函数)

C++操作符重载实例,我们把坐标值CVector的加法进行重载,计算c3=c1+c2时,也就是计算x3=x1+x2,y3=y1+y2,今天我们以独立函数的方式重载操作符+(加号),以下是C++代码: c1802.cpp源代码: D:\YcjWork\CppTour>vim c1802.cpp #include <iostream>using namespace std;/*** 以独立函数

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)

沁恒CH32在MounRiver Studio上环境配置以及使用详细教程

目录 1.  RISC-V简介 2.  CPU架构现状 3.  MounRiver Studio软件下载 4.  MounRiver Studio软件安装 5.  MounRiver Studio软件介绍 6.  创建工程 7.  编译代码 1.  RISC-V简介         RISC就是精简指令集计算机(Reduced Instruction SetCom