Flink DataStream ProcessFunction

2024-06-03 13:48

本文主要是介绍Flink DataStream ProcessFunction,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
本节内容对应官方文档

1 ProcessFunction 是什么

ProcessFunction 是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块:

  1. 事件 (event)(流元素)。
  2. 状态 (state)(容错性,一致性,仅在keyed stream中)。
  3. 定时器 (timers)(event time 和 processing time, 仅在keyed stream中)。

ProcessFunction 可以看作是一个具有 keyed state 和 timers 访问权的 FlatMapFunction

  1. 通过 RuntimeContext 访问 keyed state 。
  2. 计时器允许应用程序对处理时间和事件时间中的更改作出响应。对 processElement(…) 函数的每次调用都获得一个 Context 对象,该对象可以访问元素的 event time timestamp 和 TimerService。
  3. TimerService 可用于为将来的 event/process time 注册回调。当定时器的达到定时时间时,会调用 onTimer(…) 方法。

注意如果要访问 keyed state 和 timers,则必须 ProcessFunction 在keyed stream上应用:
stream.keyBy(...).process(new MyProcessFunction())

2 应用实例-低阶 Join

要在两个输入上实现低阶 JOIN 操作,应用程序可以使用CoProcessFunctionKeyedCoProcessFunction
此函数绑定到两个不同的输入,并从两个不同的输入中获取单独的调用,processElement1(...)/processElement2(...)

该操作遵循以下模式:

  • 为一个输入(或两个输入)创建一个状态对象
  • 接收到来自其输入的元素时,更新状态
  • 接收到来自其他输入的元素后,探测状态并生成 join 的结果

例如:当为客户数据保存状态时,你可能会 join 客户数据和财务交易

3 应用实例-代码实战

在以下示例中,KeyedProcessFunction 为每个键维护一个计数,并且会把一分钟 (事件时间) 内没有更新的键/值对输出

  • 计数,键以及最后更新的时间戳会存储在 ValueState 中,ValueState 由 key 隐含定义。
  • 对于每条记录,KeyedProcessFunction 增加计数器并修改最后的时间戳。
  • 该函数还会在一分钟后调用回调(事件时间)。
  • 每次调用回调时,都会检查存储计数的最后修改时间与回调的事件时间时间戳,如果匹配则发送键/计数键值对(即在一分钟内没有更新)

这个简单的例子可以用会话窗口实现。在这里使用 KeyedProcessFunction 只是用来说明它的基本模式。对应示例源码

object ProcessFunction extends StreamExecutionEnvironmentApp {val stream = GameData.DataStream.userLogin(this, 10)stream.map(o => (o.uid, o.status)) // 转换为元组模式// .filter(_._1 == "2|2527").startNewChain() // 过滤数据,加快调试.keyBy(0).process(new CountWithTimeoutFunction()).print()sEnv.execute(this.getClass.getName)
}/*** 状态中存储的数据类型定义*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)/*** 维护计数和超时的 ProcessFunction 的实现*/
class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {/** 此过程功能所维护的状态 */lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))override def processElement(value: (String, String),ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,out: Collector[(String, Long)]): Unit = {// 初始化或检索/更新状态val current: CountWithTimestamp = state.value match {case null =>CountWithTimestamp(value._1, 1, ctx.timestamp) // 使用 ProcessingTime 时 ctx.timestamp 可能为 nullcase CountWithTimestamp(key, count, _) =>CountWithTimestamp(key, count + 1, ctx.timestamp)}// 写回状态state.update(current)// 从当前事件时间开始注册一个的定时器ctx.timerService.registerProcessingTimeTimer(current.lastModified + 10)}override def onTimer(timestamp: Long,ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,out: Collector[(String, Long)]): Unit = {state.value match {// 检查定时器是过时定时器还是最新定时器case CountWithTimestamp(key, count, lastModified) if timestamp == lastModified + 10 =>out.collect((key, count))case _ =>}}
}

4 定时器

TimerService 在内部维护两种类型的定时器(处理时间和事件时间定时器)并排队执行。

TimerService 会删除每个键和时间戳重复的定时器,即每个键在每个时间戳上最多有一个定时器。如果为同一时间戳注册了多个定时器,则只会调用一次 onTimer() 方法。

Flink 同步调用 onTimer() 和 processElement() 方法。因此,不必担心状态的并发修改。

5 容错

定时器具有容错能力,并且与应用程序的状态一起进行快照。如果故障恢复或从保存点启动应用程序,就会恢复定时器。

在故障恢复之前应该触发的处理时间定时器会被立即触发。当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。

6 定时器合并

由于 Flink 仅为每个键和时间戳维护一个定时器,因此可以通过降低定时器的频率来进行合并以减少定时器的数量。

对于频率为 1 秒的定时器(事件时间或处理时间),我们可以将目标时间向下舍入为整秒数。定时器最多提前 1 秒触发,但不会迟于我们的要求,精确到毫秒。因此,每个键每秒最多有一个定时器。

val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)

由于事件时间定时器仅当 Watermark 到达时才会触发,因此我们可以将当前 Watermark 与下一个 Watermark 的定时器一起调度和合并:

val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)

按以下方式停止和删除计时器:

val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop) // 停止处理时间计时器
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop) // 停止事件时间计时器

这篇关于Flink DataStream ProcessFunction的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1027167

相关文章

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Apache-Flink深度解析-State

来源:https://dwz.cn/xrMCqbk5 Flink系列精华文章合集入门篇: Flink入门Flink DataSet&DataSteam APIFlink集群部署Flink重启策略Flink分布式缓存Flink重启策略Flink中的TimeFlink中的窗口Flink的时间戳和水印Flink广播变量Flink-Kafka-connetorFlink-Table&SQLFlink

Apache-Flink深度解析-Temporal-Table-JOIN

在《JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作流程如下:

Flink 原理与实现:Operator Chain原理

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇

Flink原理与实现:如何生成ExecutionGraph及物理执行图

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇

Flink原理与实现:Window的实现原理

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇

Flink原理与实现:详解Flink中的状态管理

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇