Flink DataStream时间水印机制

2024-06-03 13:48

本文主要是介绍Flink DataStream时间水印机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
本节内容对应官方文档 ,本节内容对应示例源码

1 Time(时间)

所有由 Flink 事件-时间流应用生成的条目都必须伴随着一个时间戳。时间戳将一个条目与一个特定的时间点关联起来,一般这个时间点表示的是这条 record 发生的时间。不过 application 可以随意选择时间戳的含义,只要流中条目的时间戳是随着流的前进而递增即可。

支持的时间模型:

  • EventTime 是事件创建的时间,即事件产生时自带时间戳,在 Flink 处理计算中,事件时间难免有延迟,为了处理延迟,必须指定 Watermark 的生成方式
  • IngestionTime 是事件进入 Flink 的时间,即进入 source operator 时获取所在主机时间
  • ProcessingTime 是每一个算子操作的获取所在主机时间

时间模型比较

  • 性能: ProcessingTime> IngestTime> EventTime
  • 延迟: ProcessingTime< IngestTime< EventTime
  • 确定性: EventTime> IngestTime> ProcessingTime

注意:Flink 从数据流模型中实现了许多技术。有关事件时间和水印的一个很好的介绍,请查看下

  • 流式处理概念:时间域、窗口化 , 中文译文
  • 流式处理概念:水印、触发器、积累模式 , 中文译文
  • 流式处理概念:会话窗口 , 中文译文

如何设置时间域?

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 默认使用 TimeCharacteristic.ProcessTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)// 可选的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2 Watermark(水印/水位线)

实时系统中,由于各种原因造成的延时,造成某些消息发到 flink 的时间延时于事件产生的时间。
如果基于event time构建window,但是对于迟到的事件,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是 Watermark。

Watermark 作为数据处理流中的一部分进行传输,并且携带一个时间戳 t。 一个 Watermark(t) 表示流中应该不再有事件时间比 t 小的元素(某个事件的时间戳比 Watermark 时间大)

水印有两个基本属性:

  1. 它们必须单调递增,以确保任务的 event-time 时钟向前推进,而不是向后
  2. 它们与记录的时间戳是相关的。一个时间戳为 T 的水印表示的是:在它之后接下来的所有记录的时间戳,都必须大于 T

Watermarks(水印) 处理机制如下:

  1. 参考 google 的 DataFlow。
  2. 是 event time 处理进度的标志。
  3. 表示比 watermark 更早 (更老) 的事件都已经到达 (没有比水位线更低的数据 )。
  4. 基于 watermark 来进行窗口触发计算的判断。

2.1 有序流中 Watermarks

某些情况下,基于 Event Time 的数据流是有续的 (相对 event time)。
在有序流中,watermark 就是一个简单的周期性标记。

stream_watermark_in_order

2.2 乱序流中 Watermarks

在更多场景下,基于 Event Time 的数据流是无续的 (相对 event time)。

在无序流中,Watermarks 至关重要,他告诉 operator 比 Watermarks 更早 (更老/时间戳更小) 的事件已经到达,operator 可以将内部事件时间提前到 Watermarks 的时间戳 (可以触发 window 计算)

stream_watermark_out_of_order

2.3 并行流中的 Watermarks

通常情况下, watermark 在源函数中或源函数后生成。如果指定多次 watermark,后面指定的 watermark 会覆盖前面的值。 源函数的每个 sub-task 独立生成水印。

随着水印在算子操作中的流动,它们会提前到达其到达的算子操作的事件时间。每当算子操作提前其事件时间时,同时算子操作会为下游生成一个新的 watermark。

一些算子消耗多个输入流;例如,keyBy(…) or partition(…) function。这样的算子的当前事件时间是其输入流的事件时间中的最小值。随着其输入流更新其事件时间,算子也将更新。

现在详细的解释一下,一个 task 如何释放一个水印,并在收到一个新的水印时如何更新它自身的 event-time 时钟(clock)。Flink 会将数据流分成不同的分区(partition),对于每个分区,都会有不同的 operator task 处理,这些 task 并行工作处理整个数据流。每个分区都是记录(包含时间戳)与水印的数据流。对于一个 operator,基于它与上游/下游 operators 连接的方式,它的 tasks 可以从一个或多个输入分区接受 records 和水印,并释放 records 和水印到一个或多个输出分区。下面我们会详细的介绍一个 task 如何释放水印到多个 output tasks,以及它如何根据(从输入 tasks)收到的水印,推进它自身的 event-time 时钟。

一个 task 对每个输入分区,都维护了一个分区水印。当 task 从一个分区收到一个水印,它会将对应分区的水印,更新为收到的水印最大值,并设置为当前值。然后,task 更新它的 event-time 时钟为所有分区水印中的最小值。如果 event-time 时钟相较之前有增加,则 task 处理所有被触发的计时器,并最终广播它的新事件-时间到所有下游 task,此操作通过释放一个对应的水印到所有连接的输出分区完成。

对于有多个输入流的(例如 Union 或 CoFlatMap 操作)operators,它们的 tasks 也会计算它们自身的 event-time 时钟,并作为所有分区水印的最小值– 他们并不(从不同的输入流中)区分 partition watermarks。这样做的结果是,两个不同的输入流中的数据会根据同一 event-time 时钟进行处理。但是,如果一个 application 的各个输入流的事件时间并不是一致的,则这个行为会导致问题。

Flink 的水印处理以及传播算法,确保了 operator task 恰当地释放一致时间戳的记录和水印。然而它依赖的基础是:所有分区持续提供递增的水印。一旦一个分区的水印不再递增,或者完全空闲(不再发送任何记录与水印),则 task 的事件-时间时钟不会再向前推进,并且 task 的计时器也不会被触发。在基于时间的、依赖于向前(advancing)时钟执行计算(并做清理)的 operators 中,便会造成问题。最终会导致处理延时、state 大小剧增(如果没有定期从所有的输入任务中接收到新的水印)。

若是两个输入流的水印差异太大,也会造成类似的影响。在有两个输入流的 task 中,它的事件-时钟会对应于较慢的流,并且较快的流的 records 或是中间结果一般会缓存到 state 中,直到 event-time 时钟允许处理它们。

parallel_streams_watermarks

3 指定 Timestamp 与生成 Watermarks

3.1 SourceFunction 直接定义

class GameSourceFunction[T <: GameModel](seq: Seq[T], millis: Long = 0) extends SourceFunction[T] {private var counter = 0private var isRunning = trueoverride def run(ctx: SourceFunction.SourceContext[T]): Unit = {while (isRunning && counter < seq.length) {// ctx.collect(seq(counter))val next = seq(counter)ctx.collectWithTimestamp(next, next.eventTimestamp) // 毫秒时间戳//      if (next.hasWatermarkTime) {//        ctx.emitWatermark(new Watermark(next.getWatermarkTime))//      }counter = counter + 1Thread.sleep(millis)}}override def cancel(): Unit = {isRunning = false}
}

3.2 通过 Flink 的 Timestamp Assigner 指定

Flink 提供了两个接口用于指定 Timestamp 与 Watermarks

  • AssignerWithPeriodicWatermarks 按时间间隔周期性生成 Watermarks
  • AssignerWithPunctuatedWatermarks 根据接入的事件触发条件生成 Watermarks

生成类图关系如下:
timestampAssigner_uml

DataStream支持指定Timestamp 与 Watermarks API

def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] // 已废弃 @Deprecated
def assignAscendingTimestamps(extractor: T => Long): DataStream[T] // 底层转换为 AssignerWithPeriodicWatermarks
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]

简单示例:

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置时间域
stream.assignTimestampsAndWatermarks(new GameAscendingTimestampExtractor[UserLogin]) // 设置水印生成器

####3.2.1 AssignerWithPeriodicWatermarks(周期性水印生成器)
通过定义生成水印的间隔(每 n 毫秒) ExecutionConfig.setAutoWatermarkInterval(...)
调用AssignerWithPeriodicWatermarksgetCurrentWatermark()方法,如果返回的水印非空且大于前一个水印,则覆盖以前的水印。

总结为:

  • 基于 Timer
  • ExecutionConfig.setAutoWatermarkInterval(msec) (默认是 200ms, 设置 Watermarks 发送的周期)
  • 实现 AssignerWithPeriodicWatermarks 接口
3.2.1.1 Flink-API 提供:时间戳单调递增的分配器

适用于 event-time 戳单调递增的场景,数据没有太多延时。

底层实现为 AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

3.2.1.2 Flink-API 提供:允许固定延迟的分配器

适用于预先知道最大延迟的场景 (例如最多比之前的元素延迟 3000ms)。

底层实现为 BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>

3.2.1.3 自定义实现 AssignerWithPeriodicWatermarks 示例

// 设置水印生成器
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator[UserLogin]())
stream.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator[UserLogin]())/**
周期性水印生成器 = 示例 1
此生成器生成的水印支持处理给定延迟时间范围内的数据
支持延迟的时间动态计算 = 当前处理事件中的最大时间 - 支持最大延迟时间*/
class BoundedOutOfOrdernessGenerator[T <: GameModel] extends AssignerWithPeriodicWatermarks[T] {val maxOutOfOrderness = 3500L // 支持最大延迟时间 3.5 secondsvar currentMaxTimestamp: Long = _ // 当前最大时间override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {val timestamp = element.eventTimestampcurrentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)timestamp}// 返回水印为当前最大时间减去支持最大延迟时间override def getCurrentWatermark: Watermark =new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}/**
周期性水印生成器 = 示例 2
此生成器生成的水印支持处理给定延迟时间范围内的数据。
支持延迟的时间动态计算 = 当前系统时间 - 支持最大延迟时间*/
class TimeLagWatermarkGenerator[T <: GameModel] extends AssignerWithPeriodicWatermarks[T] {val maxTimeLag = 5000L // 支持最大延迟时间 5 secondsoverride def extractTimestamp(element: T, previousElementTimestamp: Long): Long =element.eventTimestamp// 返回水印为当前时间减去支持最大延迟时间override def getCurrentWatermark: Watermark =new Watermark(System.currentTimeMillis() - maxTimeLag)
}
3.2.2 AssignerWithPunctuatedWatermarks(条件水印生成器)

使用 AssignerWithPunctuatedWatermarks接口。
首先调用该 extractTimestamp(...)方法为元素分配时间戳,然后立即调用checkAndGetNextWatermark(...)方法。
如果返回的水印非空且大于前一个水印,则覆盖以前的水印。

总结为:

  • 实现 AssignerWithPunctuatedWatermarks 接口
  • 生成水印逻辑自定义

注意:可以在每个事件上生成水印。但是,由于每个水印都会在下游引起一些计算,因此过多的水印会降低性能。

// 设置水印生成器
stream.assignTimestampsAndWatermarks(new PunctuatedAssigner[UserLogin]())/** 带条件的水印生成器 = 示例
在特定事件规则,可能会生成新的水印时生成水印*/
class PunctuatedAssigner[T <: GameModel] extends AssignerWithPunctuatedWatermarks[T] {override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {element.eventTimestamp}override def checkAndGetNextWatermark(lastElement: T, extractedTimestamp: Long): Watermark = {if (lastElement.hasWatermarkMarker) new Watermark(extractedTimestamp) else null}
}

4.为每个 Kafka 分区分配时间戳/水印

当 kafka 作为数据源时,kafka 的每个 Partition 分区里面时间戳可能是升序或者乱序模式。通常情况,我们会多个 Partition 分区并行处理,我们可以为 kafka 配置水印。
kafka 内部为每个 Partition 分区维护一个水印,并且在流进行 shuffle 时以2.3 并行流中的 Watermarks进行水印合并

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})val stream: DataStream[MyType] = env.addSource(kafkaSource)

这篇关于Flink DataStream时间水印机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1027166

相关文章

Spring使用@Retryable实现自动重试机制

《Spring使用@Retryable实现自动重试机制》在微服务架构中,服务之间的调用可能会因为一些暂时性的错误而失败,例如网络波动、数据库连接超时或第三方服务不可用等,在本文中,我们将介绍如何在Sp... 目录引言1. 什么是 @Retryable?2. 如何在 Spring 中使用 @Retryable

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

如何使用 Bash 脚本中的time命令来统计命令执行时间(中英双语)

《如何使用Bash脚本中的time命令来统计命令执行时间(中英双语)》本文介绍了如何在Bash脚本中使用`time`命令来测量命令执行时间,包括`real`、`user`和`sys`三个时间指标,... 使用 Bash 脚本中的 time 命令来统计命令执行时间在日常的开发和运维过程中,性能监控和优化是不

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

Java将时间戳转换为Date对象的方法小结

《Java将时间戳转换为Date对象的方法小结》在Java编程中,处理日期和时间是一个常见需求,特别是在处理网络通信或者数据库操作时,本文主要为大家整理了Java中将时间戳转换为Date对象的方法... 目录1. 理解时间戳2. Date 类的构造函数3. 转换示例4. 处理可能的异常5. 考虑时区问题6.

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

【Tools】大模型中的自注意力机制

摇来摇去摇碎点点的金黄 伸手牵来一片梦的霞光 南方的小巷推开多情的门窗 年轻和我们歌唱 摇来摇去摇着温柔的阳光 轻轻托起一件梦的衣裳 古老的都市每天都改变模样                      🎵 方芳《摇太阳》 自注意力机制(Self-Attention)是一种在Transformer等大模型中经常使用的注意力机制。该机制通过对输入序列中的每个元素计算与其他元素之间的相似性,