Flink事件时间、水印和迟到数据处理

2024-09-06 21:08

本文主要是介绍Flink事件时间、水印和迟到数据处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

事件时间与水印

所谓事件时间,就是Flink DataStream中的数据元素自身带有的、在其实际发生时记录的时间戳,具有业务含义,并与系统时间独立。很显然,由于外部系统产生的数据往往不能及时、按序到达Flink系统,所以事件时间比处理时间有更强的不可预测性。为了能够准确地表达事件时间的处理进度,就必须用到水印。

Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

为了形象地说明水印的作用,参考一下下面的图,是一个乱序的基于事件时间的数据流示例。

https://www.ververica.com/blog/how-apache-flink-enables-new-streaming-applications-part-1图中的方框就是数据元素,其中的数字表示事件时间,W(x)就表示时间戳是x的水印,并有长度为4个时间单位的滚动窗口。假设时间单位为秒,可见事件时间为2、3、1s的元素都会进入区间为[1s, 4s]的窗口,而事件时间为7s的元素会进入区间为[5s, 8s]的窗口。当水印W(4)到达时,表示已经没有t <= 4s的元素了,[1s, 4s]窗口会被触发并计算。同理,水印W(9)到达时,[5s, 8s]窗口会被触发并计算,以此类推。

不过图中暂时没有示出迟到数据。如果事件时间为6的元素出现在W(9)后面,就算是迟到了。迟到数据的处理后面再说。

上面的示例只有一个并行度,那么在有多个并行度的情况下,就会有多个流产生水印,窗口触发时该采用哪个水印呢?答案是所有流入水印中时间戳最小的那个。来自官方文档的图能够说明问题。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html容易理解,如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。

提取事件时间、产生水印

上面说了这么多,那么事件时间是如何从数据中提取的,水印又是如何产生的呢?Flink提供了统一的DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印,毕竟它们在处理过程中是紧密联系的。

assignTimestampsAndWatermarks()方法接受的参数类型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(即由事件本身的属性触发)水印,它们的类图如下所示。

周期性水印

顾名思义,使用AssignerWithPeriodicWatermarks时,水印是周期性产生的。该周期默认为200ms,也能通过ExecutionConfig.setAutoWatermarkInterval()方法来指定新的周期。

由类图容易看出,我们需要通过实现extractTimestamp()方法来提取事件时间,实现getCurrentWatermark()方法产生水印。但好在Flink已经提供了3种内置的实现类,所以我们直接用就可以了,省事。

AscendingTimestampExtractor总说话口干舌燥的(?),还是看代码吧。public abstract long extractAscendingTimestamp(T element);

    @Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp = extractAscendingTimestamp(element);if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}
    @Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}
AscendingTimestampExtractor产生的时间戳和水印必须是单调非递减的,用户通过覆写extractAscendingTimestamp()方法抽取时间戳。如果产生了递减的时间戳,就要使用名为MonotonyViolationHandler的组件处理异常,有两种方式:打印警告日志(默认)和抛出RuntimeException。

单调递增的事件时间并不太符合实际情况,所以AscendingTimestampExtractor用得不多。

BoundedOutOfOrdernessTimestampExtractor它的出镜率就非常高了。还是看代码先。public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds() < 0) {throw new RuntimeException("Tried to set the maximum allowed " +"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");}this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;}

    public abstract long extractTimestamp(T element);
    @Overridepublic final Watermark getCurrentWatermark() {long potentialWM = currentMaxTimestamp - maxOutOfOrderness;if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);}
    @Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp = extractTimestamp(element);if (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}return timestamp;}
如名字所述,BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

当然,乱序区间的长度要根据实际环境谨慎设定,设定得太短会丢较多的数据,设定得太长会导致窗口触发延迟,实时性减弱。

IngestionTimeExtractor@Overridepublic long extractTimestamp(T element, long previousElementTimestamp) {final long now = Math.max(System.currentTimeMillis(), maxTimestamp);maxTimestamp = now;return now;}

    @Overridepublic Watermark getCurrentWatermark() {final long now = Math.max(System.currentTimeMillis(), maxTimestamp);maxTimestamp = now;return new Watermark(now - 1);}
IngestionTimeExtractor基于当前系统时钟生成时间戳和水印,其实就是Flink三大时间特征里的摄入时间了。

打点水印

打点水印比周期性水印用的要少不少,并且Flink没有内置的实现,那么就写个最简单的栗子吧。

    sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;}
      @Overridepublic long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {return element.getTimestamp();}});
AssignerWithPunctuatedWatermarks适用于需要依赖于事件本身的某些属性决定是否发射水印的情况。我们实现checkAndGetNextWatermark()方法来产生水印,产生的时机完全由用户控制。上面例子中是收取到用户ID末位为0的数据时才发射。

还有三点需要提醒:

不管使用哪种方式产生水印,都不能过于频繁。因为Watermark对象是会全部流向下游的,也会实打实地占用内存,水印过多会造成系统性能下降。水印的生成要尽量早,一般是在接入Source之后就产生,或者在Source经过简单的变换(map、filter等)之后产生。如果需求方对事件时间carry的业务意义并不关心,可以直接使用处理时间,简单方便。迟到数据处理

如上所述,水印的乱序区间能够保证一些迟到数据不被丢弃,但是乱序区间往往不很长,那些真正迟到了的数据该怎么办呢?有两种方法来兜底,可以说是Flink为迟到数据提供的第二重保障。

窗口允许延迟

Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说,正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算。当然,窗口也是吃资源大户,所以allowedLateness的值要适当。给个完整的代码示例如下。

      sourceStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {private static final long serialVersionUID = 1L;@Overridepublic long extractTimestamp(UserActionRecord record) {return record.getTimestamp();}}).keyBy("platform").window(TumblingEventTimeWindows.of(Time.minutes(1))).allowedLateness(Time.seconds(30)).aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())// ......
allowedLateness机制实际上就是DataFlow模型中的回填(backfill)策略的实现。对于滑动窗口和滚动窗口是累积(accumulating)策略,对于会话窗口则是累积与回撤(accumulating & retracting)策略。之前讲DataFlow模型时提到过,不废话了。

侧输出迟到数据

侧输出(side output)是Flink的分流机制。迟到数据本身可以当做特殊的流,我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去,再进行下一步处理(比如存到外部存储或消息队列)。代码如下。

      // 侧输出的OutputTagOutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");
      sourceStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {private static final long serialVersionUID = 1L;@Overridepublic long extractTimestamp(UserActionRecord record) {return record.getTimestamp();}}).keyBy("platform").window(TumblingEventTimeWindows.of(Time.minutes(1))).allowedLateness(Time.seconds(30)).sideOutputLateData(lateOutputTag)   // 侧输出.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())// ......
      // 获取迟到数据并写入对应Sinkstream.getSideOutput(lateOutputTag).addSink(lateDataSink);

The End

作者:LittleMagic链接:https://www.jianshu.com/p/c612e95a5028声明:本号所有文章除特殊注明,都为原创,公众号读者拥有优先阅读权,未经作者本人允许不得转载,否则追究侵权责任。

关注我的公众号,后台回复【JAVAPDF】获取200页面试题!5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

大数据技术与架构

这篇关于Flink事件时间、水印和迟到数据处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143125

相关文章

Python xmltodict实现简化XML数据处理

《Pythonxmltodict实现简化XML数据处理》Python社区为提供了xmltodict库,它专为简化XML与Python数据结构的转换而设计,本文主要来为大家介绍一下如何使用xmltod... 目录一、引言二、XMLtodict介绍设计理念适用场景三、功能参数与属性1、parse函数2、unpa

Python数据处理之导入导出Excel数据方式

《Python数据处理之导入导出Excel数据方式》Python是Excel数据处理的绝佳工具,通过Pandas和Openpyxl等库可以实现数据的导入、导出和自动化处理,从基础的数据读取和清洗到复杂... 目录python导入导出Excel数据开启数据之旅:为什么Python是Excel数据处理的最佳拍档

Python 标准库time时间的访问和转换问题小结

《Python标准库time时间的访问和转换问题小结》time模块为Python提供了处理时间和日期的多种功能,适用于多种与时间相关的场景,包括获取当前时间、格式化时间、暂停程序执行、计算程序运行时... 目录模块介绍使用场景主要类主要函数 - time()- sleep()- localtime()- g

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

如何使用 Bash 脚本中的time命令来统计命令执行时间(中英双语)

《如何使用Bash脚本中的time命令来统计命令执行时间(中英双语)》本文介绍了如何在Bash脚本中使用`time`命令来测量命令执行时间,包括`real`、`user`和`sys`三个时间指标,... 使用 Bash 脚本中的 time 命令来统计命令执行时间在日常的开发和运维过程中,性能监控和优化是不

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

Java将时间戳转换为Date对象的方法小结

《Java将时间戳转换为Date对象的方法小结》在Java编程中,处理日期和时间是一个常见需求,特别是在处理网络通信或者数据库操作时,本文主要为大家整理了Java中将时间戳转换为Date对象的方法... 目录1. 理解时间戳2. Date 类的构造函数3. 转换示例4. 处理可能的异常5. 考虑时区问题6.

Python中的异步:async 和 await以及操作中的事件循环、回调和异常

《Python中的异步:async和await以及操作中的事件循环、回调和异常》在现代编程中,异步操作在处理I/O密集型任务时,可以显著提高程序的性能和响应速度,Python提供了asyn... 目录引言什么是异步操作?python 中的异步编程基础async 和 await 关键字asyncio 模块理论

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

禁止平板,iPad长按弹出默认菜单事件

通过监控按下抬起时间差来禁止弹出事件,把以下代码写在要禁止的页面的页面加载事件里面即可     var date;document.addEventListener('touchstart', event => {date = new Date().getTime();});document.addEventListener('touchend', event => {if (new