Apache Flink 事件时间处理和 Watermarks

2024-05-12 23:38

本文主要是介绍Apache Flink 事件时间处理和 Watermarks,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

扫码关注公众号免费阅读全文:冰山烈焰的黑板报
在这里插入图片描述

原文地址:Flink Event Time Processing and Watermarks

如果你正在搭建一个实时流程序,事件时间处理是你不久将不得不使用的一个功能之一。因为在现实世界中绝大多数用例的消息都是乱序的,你的系统应该有一个方法应对和处理可能延迟的消息。在这篇博客中,我们将会看到为什么我们需要事件时间处理和我们怎么在 Flink 中使用它。

EvenTime 是一个事件在现实世界中发生时的时间。ProcessingTime 是该事件被 Flink 处理时的时间。为了理解事件时间处理,我们先以一个基于处理时间的系统开始,看看它的缺点。

我们将创建一个大小10秒的滑动窗口(SlidingWindow),每5秒滑动一次。在窗口结束时,系统将提交在此期间收到的一些消息。一旦你理解事件时间处理和 SlidingWindow 的相关工作,就不难理解它和 TumblingWindow 是怎么一起使用的。让我们开始吧。

基于处理时间的系统

在这个例子中,我们期望的消息有这样的格式:vaue,timestamp,其中 value 是消息,timestamp 是这个消息在数据源生成时的时间。因为我们现在在搭建一个基于 ProcessingTime 的系统,下面的代码将忽略 timestamp 这部分。

理解消息生成时包含的信息是很重要的一个方面。Flink 或其他系统不是一个可以自己搞清楚这些的系统。稍后我们将看到事件时间处理提取 timestamp 信息用以处理延迟的消息。

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)
counts.print
senv.execute("ProcessingTime processing example")

示例1:无延迟的消息

假设数据源分别在第13秒、第13秒和第16秒的时候,生成3条 a 类型的消息。(这里使用小时和分钟都可以,因为窗口的大小仅有10秒)。
在这里插入图片描述
这些消息将如下所示进入窗口中。在第13秒生成的前两个消息将进入 window1[5s-10s] 和 window2 [10s-20s],在第16秒生成的第三个消息将进入 window2 [10s-20s] 和 window3[15s-25s]。每个窗口提交后最后的统计值将分别是 (a, 2),(a, 3) 和 (a, 1)。
在这里插入图片描述
这个输出可以被看作是预期的结果。现在我们将看下其中一条消息延迟进入系统时,会发生什么。

示例2:有延迟消息

现在假设其中一条消息(在第13秒生成的)延迟6秒达到(第19秒),可能由于网络阻塞。你能猜出来这条消息会落入哪一个窗口吗?
在这里插入图片描述
这条延迟的消息会落入 window2 和 window3,因为 19秒在 10s-20s 和 15s-25s 中。它不会对 window2 的计算造成任何影响(因为这条消息不管怎样还是落入了该窗口),但是它会影响 window1 和 window3 的结果。我们现在通过使用事件时间处理来修复这个问题。

基于 EventTime 的系统

为了能够使用事件时间处理,我们需要一个提取消息中事件时间信息的提取器。注意消息的格式是 vaue,timestamp。extractTimestamp() 方法获取 timestamp 并作为 Long 类型返回。请先忽略 getCurrentWatermark() 方法,我们将稍后讨论它。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {override def extractTimestamp(e: String, prevElementTimestamp: Long) = {e.split(",")(1).toLong }override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis)}
}

现在我们需要配置这个 timestamp 提取器,并且配置 TimeCharactersistic 为 EventTime。其余的代码和 ProcessingTime 的一样。

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999).assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)
counts.print
senv.execute("EventTime processing example")

上述代码的运行结果如下图所示:
在这里插入图片描述
这个结果看起来更好些,window2 和 window3 提交了正确的结果,但是 window1 还是错误的。Flink 不会把这条延迟的消息分配到 window3,因为它检测到这个消息的事件时间,知道它没有落入该窗口。但是,它为什么不分配这条消息到 window1 呢?原因是这条延迟的消息进入系统的时候,window1 已经计算完成了。现在让我们使用 Watermark 来解决这个问题。

注意在 window2 中,这条延迟的消息出现在第19秒的位置,而不是第13秒(它的事件时间)。图中的展示是有意表明在该窗口中的这条消息是不会根据它的事件时间排序的。(这或许在将来会有改变)。

Watermarks

Watermark 是非常重要且有趣的点子。我将尽力给你一个简要的概述。如果你有兴趣了解更多的信息,可以观看 Google 这个非常棒的演讲,以及阅读 dataArtisans 这篇博客。Watermark 本质上是一个时间戳。当 Flink 的一个算子接收到一个 Watermark 时,它知道(假设)不会再有比这个时间戳更晚的消息了(译者注:可以理解为不会再有比这个时间戳更早的事件时间的消息进入系统)。所以 Watermark 也可以被认为是告诉 Flink 在 EventTime 中有多远的一种方式。

就本例而言,可以把他看作是告诉 Flink 一条消息可以延迟多久的方式。最后一次尝试,我们现在设置 Watermark 为 current time - 5 seconds,告诉 Flink 期望消息的最大延时是5秒——这是因为只有当 Watermark 通过窗口时,窗口才会计算。由于我们的 Watermark 是 current time - 5 seconds,第一个窗口 [5s-15s] 将在第20秒的时候计算。类似地,窗口 [10s-20s] 将在第25秒的时候计算,以此类推。

override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis - 5000)}

这里我们假设 EventTime 比当前系统时间晚5秒,但并不总是这样。大多数情况下,最好保存到目前为止接收到的最大时间戳(从消息中提取的),然后减去预期的延迟。

修改之后的代码的运行结果:
在这里插入图片描述
最终,我们得到了正确的结果,三个窗口都按预期提交统计值——(a, 2),(a, 3) 和 (a, 1)。

Allowed Lateness

在我们使用 Watermark - delay 的早期方法中,窗口不会触发直到 Watermark 超过了 window_length + delay。如果你想适应延迟事件,并且希望窗口能够准时启动,您可以使用 Allowed Lateness。如果 Allowed Lateness 被设置了,Flink 将不会丢弃消息直到它超过 window_end_time + allowed lateness。一旦一条延迟的消息被接收,Flink 将会提取它的时间戳,并检查它是否在 allowed lateness 里,然后它会检查是否触发窗口(根据触发器集)。所以,注意这种方式中,一个窗口可能被多次触发,如果只需要一次处理,您可能希望使sink具有幂等性。

总结

实时流处理系统的重要性日益增长,必须处理延迟消息是构建此类系统的一部分。在这篇博客中,我们看到延迟的消息是怎样影响系统的结果,以及 Flink 的事件时间处理功能是怎么解决这些问题的。

这篇关于Apache Flink 事件时间处理和 Watermarks的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/984061

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

如何利用Java获取当天的开始和结束时间

《如何利用Java获取当天的开始和结束时间》:本文主要介绍如何使用Java8的LocalDate和LocalDateTime类获取指定日期的开始和结束时间,展示了如何通过这些类进行日期和时间的处... 目录前言1. Java日期时间API概述2. 获取当天的开始和结束时间代码解析运行结果3. 总结前言在J

修改若依框架Token的过期时间问题

《修改若依框架Token的过期时间问题》本文介绍了如何修改若依框架中Token的过期时间,通过修改`application.yml`文件中的配置来实现,默认单位为分钟,希望此经验对大家有所帮助,也欢迎... 目录修改若依框架Token的过期时间修改Token的过期时间关闭Token的过期时js间总结修改若依

Go Mongox轻松实现MongoDB的时间字段自动填充

《GoMongox轻松实现MongoDB的时间字段自动填充》这篇文章主要为大家详细介绍了Go语言如何使用mongox库,在插入和更新数据时自动填充时间字段,从而提升开发效率并减少重复代码,需要的可以... 目录前言时间字段填充规则Mongox 的安装使用 Mongox 进行插入操作使用 Mongox 进行更

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

Redis如何使用zset处理排行榜和计数问题

《Redis如何使用zset处理排行榜和计数问题》Redis的ZSET数据结构非常适合处理排行榜和计数问题,它可以在高并发的点赞业务中高效地管理点赞的排名,并且由于ZSET的排序特性,可以轻松实现根据... 目录Redis使用zset处理排行榜和计数业务逻辑ZSET 数据结构优化高并发的点赞操作ZSET 结

深入理解Apache Airflow 调度器(最新推荐)

《深入理解ApacheAirflow调度器(最新推荐)》ApacheAirflow调度器是数据管道管理系统的关键组件,负责编排dag中任务的执行,通过理解调度器的角色和工作方式,正确配置调度器,并... 目录什么是Airflow 调度器?Airflow 调度器工作机制配置Airflow调度器调优及优化建议最

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构