一文弄明白KeyedProcessFunction函数

2024-02-23 20:20

本文主要是介绍一文弄明白KeyedProcessFunction函数,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧

正文

了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下

/*** A keyed function that processes elements of a stream.** <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only* available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.** <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {@link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.*/

上面简单来说就是以下四点

  1. Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
  2. 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
  3. 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
  4. 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面

processElement方法解析

  1. Flink会调用processElement方法处理输入流中的每一条数据
  2. KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
  3. 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据

onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务

以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用

实战简介

本次实战的目标是学习KeyedProcessFunction,内容如下:

  1. 监听本机7777端口读取字符串
  2. 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
  3. 将Tuple2实例集合通过f0字段分区,得到KeyedStream
  4. KeyedSteam通过自定义KeyedProcessFunction处理
  5. 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发

使用代码例子

首先定义pojo类

public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return ++count;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp = lastQuestTimestamp;}
}

接着实现KeyedProcessFunction类

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {private ValueState<CountWithTimestampNew> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));}// 实现数据处理逻辑的地方@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();if (countWithTimestampNew == null) {countWithTimestampNew = new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器,十秒后触发long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout = false;if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {//out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout = true;}//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}
}

最后是启动类

public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口,读取字符串DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 对收到的字符串用空格做分割,得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器,用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据,交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}
}

演示

在启动服务前,先通过linux指令监听端口 nc -lk 7777

  1. 启动Flink服务后,往7777端口里面发送数据
    在这里插入图片描述

  2. 通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
    在这里插入图片描述

  3. 那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
    在这里插入图片描述

  4. 再输入点其他的试试
    在这里插入图片描述

  5. 通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
    在这里插入图片描述

思考题

  1. open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差异是什么

参考资料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

这篇关于一文弄明白KeyedProcessFunction函数的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/739822

相关文章

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

MySQL count()聚合函数详解

《MySQLcount()聚合函数详解》MySQL中的COUNT()函数,它是SQL中最常用的聚合函数之一,用于计算表中符合特定条件的行数,本文给大家介绍MySQLcount()聚合函数,感兴趣的朋... 目录核心功能语法形式重要特性与行为如何选择使用哪种形式?总结深入剖析一下 mysql 中的 COUNT

一文详解Git中分支本地和远程删除的方法

《一文详解Git中分支本地和远程删除的方法》在使用Git进行版本控制的过程中,我们会创建多个分支来进行不同功能的开发,这就容易涉及到如何正确地删除本地分支和远程分支,下面我们就来看看相关的实现方法吧... 目录技术背景实现步骤删除本地分支删除远程www.chinasem.cn分支同步删除信息到其他机器示例步骤

MySQL 中 ROW_NUMBER() 函数最佳实践

《MySQL中ROW_NUMBER()函数最佳实践》MySQL中ROW_NUMBER()函数,作为窗口函数为每行分配唯一连续序号,区别于RANK()和DENSE_RANK(),特别适合分页、去重... 目录mysql 中 ROW_NUMBER() 函数详解一、基础语法二、核心特点三、典型应用场景1. 数据分

MySQL数据库的内嵌函数和联合查询实例代码

《MySQL数据库的内嵌函数和联合查询实例代码》联合查询是一种将多个查询结果组合在一起的方法,通常使用UNION、UNIONALL、INTERSECT和EXCEPT关键字,下面:本文主要介绍MyS... 目录一.数据库的内嵌函数1.1聚合函数COUNT([DISTINCT] expr)SUM([DISTIN

Python get()函数用法案例详解

《Pythonget()函数用法案例详解》在Python中,get()是字典(dict)类型的内置方法,用于安全地获取字典中指定键对应的值,它的核心作用是避免因访问不存在的键而引发KeyError错... 目录简介基本语法一、用法二、案例:安全访问未知键三、案例:配置参数默认值简介python是一种高级编

python 常见数学公式函数使用详解(最新推荐)

《python常见数学公式函数使用详解(最新推荐)》文章介绍了Python的数学计算工具,涵盖内置函数、math/cmath标准库及numpy/scipy/sympy第三方库,支持从基础算术到复杂数... 目录python 数学公式与函数大全1. 基本数学运算1.1 算术运算1.2 分数与小数2. 数学函数

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam

一文深入详解Python的secrets模块

《一文深入详解Python的secrets模块》在构建涉及用户身份认证、权限管理、加密通信等系统时,开发者最不能忽视的一个问题就是“安全性”,Python在3.6版本中引入了专门面向安全用途的secr... 目录引言一、背景与动机:为什么需要 secrets 模块?二、secrets 模块的核心功能1. 基