Flink事件时间、水印和迟到数据处理

2024-09-06 21:08

本文主要是介绍Flink事件时间、水印和迟到数据处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

事件时间与水印

所谓事件时间,就是Flink DataStream中的数据元素自身带有的、在其实际发生时记录的时间戳,具有业务含义,并与系统时间独立。很显然,由于外部系统产生的数据往往不能及时、按序到达Flink系统,所以事件时间比处理时间有更强的不可预测性。为了能够准确地表达事件时间的处理进度,就必须用到水印。

Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

为了形象地说明水印的作用,参考一下下面的图,是一个乱序的基于事件时间的数据流示例。

https://www.ververica.com/blog/how-apache-flink-enables-new-streaming-applications-part-1图中的方框就是数据元素,其中的数字表示事件时间,W(x)就表示时间戳是x的水印,并有长度为4个时间单位的滚动窗口。假设时间单位为秒,可见事件时间为2、3、1s的元素都会进入区间为[1s, 4s]的窗口,而事件时间为7s的元素会进入区间为[5s, 8s]的窗口。当水印W(4)到达时,表示已经没有t <= 4s的元素了,[1s, 4s]窗口会被触发并计算。同理,水印W(9)到达时,[5s, 8s]窗口会被触发并计算,以此类推。

不过图中暂时没有示出迟到数据。如果事件时间为6的元素出现在W(9)后面,就算是迟到了。迟到数据的处理后面再说。

上面的示例只有一个并行度,那么在有多个并行度的情况下,就会有多个流产生水印,窗口触发时该采用哪个水印呢?答案是所有流入水印中时间戳最小的那个。来自官方文档的图能够说明问题。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html容易理解,如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。

提取事件时间、产生水印

上面说了这么多,那么事件时间是如何从数据中提取的,水印又是如何产生的呢?Flink提供了统一的DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印,毕竟它们在处理过程中是紧密联系的。

assignTimestampsAndWatermarks()方法接受的参数类型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(即由事件本身的属性触发)水印,它们的类图如下所示。

周期性水印

顾名思义,使用AssignerWithPeriodicWatermarks时,水印是周期性产生的。该周期默认为200ms,也能通过ExecutionConfig.setAutoWatermarkInterval()方法来指定新的周期。

由类图容易看出,我们需要通过实现extractTimestamp()方法来提取事件时间,实现getCurrentWatermark()方法产生水印。但好在Flink已经提供了3种内置的实现类,所以我们直接用就可以了,省事。

AscendingTimestampExtractor总说话口干舌燥的(?),还是看代码吧。public abstract long extractAscendingTimestamp(T element);

    @Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp = extractAscendingTimestamp(element);if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}
    @Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}
AscendingTimestampExtractor产生的时间戳和水印必须是单调非递减的,用户通过覆写extractAscendingTimestamp()方法抽取时间戳。如果产生了递减的时间戳,就要使用名为MonotonyViolationHandler的组件处理异常,有两种方式:打印警告日志(默认)和抛出RuntimeException。

单调递增的事件时间并不太符合实际情况,所以AscendingTimestampExtractor用得不多。

BoundedOutOfOrdernessTimestampExtractor它的出镜率就非常高了。还是看代码先。public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds() < 0) {throw new RuntimeException("Tried to set the maximum allowed " +"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");}this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;}

    public abstract long extractTimestamp(T element);
    @Overridepublic final Watermark getCurrentWatermark() {long potentialWM = currentMaxTimestamp - maxOutOfOrderness;if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);}
    @Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp = extractTimestamp(element);if (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}return timestamp;}
如名字所述,BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

当然,乱序区间的长度要根据实际环境谨慎设定,设定得太短会丢较多的数据,设定得太长会导致窗口触发延迟,实时性减弱。

IngestionTimeExtractor@Overridepublic long extractTimestamp(T element, long previousElementTimestamp) {final long now = Math.max(System.currentTimeMillis(), maxTimestamp);maxTimestamp = now;return now;}

    @Overridepublic Watermark getCurrentWatermark() {final long now = Math.max(System.currentTimeMillis(), maxTimestamp);maxTimestamp = now;return new Watermark(now - 1);}
IngestionTimeExtractor基于当前系统时钟生成时间戳和水印,其实就是Flink三大时间特征里的摄入时间了。

打点水印

打点水印比周期性水印用的要少不少,并且Flink没有内置的实现,那么就写个最简单的栗子吧。

    sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;}
      @Overridepublic long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {return element.getTimestamp();}});
AssignerWithPunctuatedWatermarks适用于需要依赖于事件本身的某些属性决定是否发射水印的情况。我们实现checkAndGetNextWatermark()方法来产生水印,产生的时机完全由用户控制。上面例子中是收取到用户ID末位为0的数据时才发射。

还有三点需要提醒:

不管使用哪种方式产生水印,都不能过于频繁。因为Watermark对象是会全部流向下游的,也会实打实地占用内存,水印过多会造成系统性能下降。水印的生成要尽量早,一般是在接入Source之后就产生,或者在Source经过简单的变换(map、filter等)之后产生。如果需求方对事件时间carry的业务意义并不关心,可以直接使用处理时间,简单方便。迟到数据处理

如上所述,水印的乱序区间能够保证一些迟到数据不被丢弃,但是乱序区间往往不很长,那些真正迟到了的数据该怎么办呢?有两种方法来兜底,可以说是Flink为迟到数据提供的第二重保障。

窗口允许延迟

Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说,正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算。当然,窗口也是吃资源大户,所以allowedLateness的值要适当。给个完整的代码示例如下。

      sourceStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {private static final long serialVersionUID = 1L;@Overridepublic long extractTimestamp(UserActionRecord record) {return record.getTimestamp();}}).keyBy("platform").window(TumblingEventTimeWindows.of(Time.minutes(1))).allowedLateness(Time.seconds(30)).aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())// ......
allowedLateness机制实际上就是DataFlow模型中的回填(backfill)策略的实现。对于滑动窗口和滚动窗口是累积(accumulating)策略,对于会话窗口则是累积与回撤(accumulating & retracting)策略。之前讲DataFlow模型时提到过,不废话了。

侧输出迟到数据

侧输出(side output)是Flink的分流机制。迟到数据本身可以当做特殊的流,我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去,再进行下一步处理(比如存到外部存储或消息队列)。代码如下。

      // 侧输出的OutputTagOutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");
      sourceStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {private static final long serialVersionUID = 1L;@Overridepublic long extractTimestamp(UserActionRecord record) {return record.getTimestamp();}}).keyBy("platform").window(TumblingEventTimeWindows.of(Time.minutes(1))).allowedLateness(Time.seconds(30)).sideOutputLateData(lateOutputTag)   // 侧输出.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())// ......
      // 获取迟到数据并写入对应Sinkstream.getSideOutput(lateOutputTag).addSink(lateDataSink);

The End

作者:LittleMagic链接:https://www.jianshu.com/p/c612e95a5028声明:本号所有文章除特殊注明,都为原创,公众号读者拥有优先阅读权,未经作者本人允许不得转载,否则追究侵权责任。

关注我的公众号,后台回复【JAVAPDF】获取200页面试题!5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

大数据技术与架构

这篇关于Flink事件时间、水印和迟到数据处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143125

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

禁止平板,iPad长按弹出默认菜单事件

通过监控按下抬起时间差来禁止弹出事件,把以下代码写在要禁止的页面的页面加载事件里面即可     var date;document.addEventListener('touchstart', event => {date = new Date().getTime();});document.addEventListener('touchend', event => {if (new

MiniGPT-3D, 首个高效的3D点云大语言模型,仅需一张RTX3090显卡,训练一天时间,已开源

项目主页:https://tangyuan96.github.io/minigpt_3d_project_page/ 代码:https://github.com/TangYuan96/MiniGPT-3D 论文:https://arxiv.org/pdf/2405.01413 MiniGPT-3D在多个任务上取得了SoTA,被ACM MM2024接收,只拥有47.8M的可训练参数,在一张RTX

批处理以当前时间为文件名创建文件

批处理以当前时间为文件名创建文件 批处理创建空文件 有时候,需要创建以当前时间命名的文件,手动输入当然可以,但是有更省心的方法吗? 假设我是 windows 操作系统,打开命令行。 输入以下命令试试: echo %date:~0,4%_%date:~5,2%_%date:~8,2%_%time:~0,2%_%time:~3,2%_%time:~6,2% 输出类似: 2019_06

【MRI基础】TR 和 TE 时间概念

重复时间 (TR) 磁共振成像 (MRI) 中的 TR(重复时间,repetition time)是施加于同一切片的连续脉冲序列之间的时间间隔。具体而言,TR 是施加一个 RF(射频)脉冲与施加下一个 RF 脉冲之间的持续时间。TR 以毫秒 (ms) 为单位,主要控制后续脉冲之前的纵向弛豫程度(T1 弛豫),使其成为显著影响 MRI 中的图像对比度和信号特性的重要参数。 回声时间 (TE)

FreeRTOS内部机制学习03(事件组内部机制)

文章目录 事件组使用的场景事件组的核心以及Set事件API做的事情事件组的特殊之处事件组为什么不关闭中断xEventGroupSetBitsFromISR内部是怎么做的? 事件组使用的场景 学校组织秋游,组长在等待: 张三:我到了 李四:我到了 王五:我到了 组长说:好,大家都到齐了,出发! 秋游回来第二天就要提交一篇心得报告,组长在焦急等待:张三、李四、王五谁先写好就交谁的

Python:豆瓣电影商业数据分析-爬取全数据【附带爬虫豆瓣,数据处理过程,数据分析,可视化,以及完整PPT报告】

**爬取豆瓣电影信息,分析近年电影行业的发展情况** 本文是完整的数据分析展现,代码有完整版,包含豆瓣电影爬取的具体方式【附带爬虫豆瓣,数据处理过程,数据分析,可视化,以及完整PPT报告】   最近MBA在学习《商业数据分析》,大实训作业给了数据要进行数据分析,所以先拿豆瓣电影练练手,网络上爬取豆瓣电影TOP250较多,但对于豆瓣电影全数据的爬取教程很少,所以我自己做一版。 目

【经验交流】修复系统事件查看器启动不能时出现的4201错误

方法1,取得『%SystemRoot%\LogFiles』文件夹和『%SystemRoot%\System32\wbem』文件夹的权限(包括这两个文件夹的所有子文件夹的权限),简单点说,就是使你当前的帐户拥有这两个文件夹以及它们的子文件夹的绝对控制权限。这是最简单的方法,不少老外说,这样一弄,倒是解决了问题。不过对我的系统,没用; 方法2,以不带网络的安全模式启动,运行命令行,输入“ne

BT天堂网站挂马事件后续:“大灰狼”远控木马分析及幕后真凶调查

9月初安全团队披露bt天堂网站挂马事件,该网站被利用IE神洞CVE-2014-6332挂马,如果用户没有打补丁或开启安全软件防护,电脑会自动下载执行大灰狼远控木马程序。 鉴于bt天堂电影下载网站访问量巨大,此次挂马事件受害者甚众,安全团队专门针对该木马进行严密监控,并对其幕后真凶进行了深入调查。 一、“大灰狼”的伪装 以下是10月30日一天内大灰狼远控的木马样本截图,可以看到该木马变种数量不

LeetCode:64. 最大正方形 动态规划 时间复杂度O(nm)

64. 最大正方形 题目链接 题目描述 给定一个由 0 和 1 组成的二维矩阵,找出只包含 1 的最大正方形,并返回其面积。 示例1: 输入: 1 0 1 0 01 0 1 1 11 1 1 1 11 0 0 1 0输出: 4 示例2: 输入: 0 1 1 0 01 1 1 1 11 1 1 1 11 1 1 1 1输出: 9 解题思路 这道题的思路是使用动态规划