本文主要是介绍Apache Flink 事件时间处理和 Watermarks,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
扫码关注公众号免费阅读全文:冰山烈焰的黑板报
原文地址:Flink Event Time Processing and Watermarks
如果你正在搭建一个实时流程序,事件时间处理是你不久将不得不使用的一个功能之一。因为在现实世界中绝大多数用例的消息都是乱序的,你的系统应该有一个方法应对和处理可能延迟的消息。在这篇博客中,我们将会看到为什么我们需要事件时间处理和我们怎么在 Flink 中使用它。
EvenTime 是一个事件在现实世界中发生时的时间。ProcessingTime 是该事件被 Flink 处理时的时间。为了理解事件时间处理,我们先以一个基于处理时间的系统开始,看看它的缺点。
我们将创建一个大小10秒的滑动窗口(SlidingWindow),每5秒滑动一次。在窗口结束时,系统将提交在此期间收到的一些消息。一旦你理解事件时间处理和 SlidingWindow 的相关工作,就不难理解它和 TumblingWindow 是怎么一起使用的。让我们开始吧。
基于处理时间的系统
在这个例子中,我们期望的消息有这样的格式:vaue,timestamp,其中 value 是消息,timestamp 是这个消息在数据源生成时的时间。因为我们现在在搭建一个基于 ProcessingTime 的系统,下面的代码将忽略 timestamp 这部分。
理解消息生成时包含的信息是很重要的一个方面。Flink 或其他系统不是一个可以自己搞清楚这些的系统。稍后我们将看到事件时间处理提取 timestamp 信息用以处理延迟的消息。
val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)
counts.print
senv.execute("ProcessingTime processing example")
示例1:无延迟的消息
假设数据源分别在第13秒、第13秒和第16秒的时候,生成3条 a 类型的消息。(这里使用小时和分钟都可以,因为窗口的大小仅有10秒)。
这些消息将如下所示进入窗口中。在第13秒生成的前两个消息将进入 window1[5s-10s] 和 window2 [10s-20s],在第16秒生成的第三个消息将进入 window2 [10s-20s] 和 window3[15s-25s]。每个窗口提交后最后的统计值将分别是 (a, 2),(a, 3) 和 (a, 1)。
这个输出可以被看作是预期的结果。现在我们将看下其中一条消息延迟进入系统时,会发生什么。
示例2:有延迟消息
现在假设其中一条消息(在第13秒生成的)延迟6秒达到(第19秒),可能由于网络阻塞。你能猜出来这条消息会落入哪一个窗口吗?
这条延迟的消息会落入 window2 和 window3,因为 19秒在 10s-20s 和 15s-25s 中。它不会对 window2 的计算造成任何影响(因为这条消息不管怎样还是落入了该窗口),但是它会影响 window1 和 window3 的结果。我们现在通过使用事件时间处理来修复这个问题。
基于 EventTime 的系统
为了能够使用事件时间处理,我们需要一个提取消息中事件时间信息的提取器。注意消息的格式是 vaue,timestamp。extractTimestamp() 方法获取 timestamp 并作为 Long 类型返回。请先忽略 getCurrentWatermark() 方法,我们将稍后讨论它。
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {override def extractTimestamp(e: String, prevElementTimestamp: Long) = {e.split(",")(1).toLong }override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis)}
}
现在我们需要配置这个 timestamp 提取器,并且配置 TimeCharactersistic 为 EventTime。其余的代码和 ProcessingTime 的一样。
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999).assignTimestampsAndWatermarks(new TimestampExtractor)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)
counts.print
senv.execute("EventTime processing example")
上述代码的运行结果如下图所示:
这个结果看起来更好些,window2 和 window3 提交了正确的结果,但是 window1 还是错误的。Flink 不会把这条延迟的消息分配到 window3,因为它检测到这个消息的事件时间,知道它没有落入该窗口。但是,它为什么不分配这条消息到 window1 呢?原因是这条延迟的消息进入系统的时候,window1 已经计算完成了。现在让我们使用 Watermark 来解决这个问题。
注意在 window2 中,这条延迟的消息出现在第19秒的位置,而不是第13秒(它的事件时间)。图中的展示是有意表明在该窗口中的这条消息是不会根据它的事件时间排序的。(这或许在将来会有改变)。
Watermarks
Watermark 是非常重要且有趣的点子。我将尽力给你一个简要的概述。如果你有兴趣了解更多的信息,可以观看 Google 这个非常棒的演讲,以及阅读 dataArtisans 这篇博客。Watermark 本质上是一个时间戳。当 Flink 的一个算子接收到一个 Watermark 时,它知道(假设)不会再有比这个时间戳更晚的消息了(译者注:可以理解为不会再有比这个时间戳更早的事件时间的消息进入系统)。所以 Watermark 也可以被认为是告诉 Flink 在 EventTime 中有多远的一种方式。
就本例而言,可以把他看作是告诉 Flink 一条消息可以延迟多久的方式。最后一次尝试,我们现在设置 Watermark 为 current time - 5 seconds,告诉 Flink 期望消息的最大延时是5秒——这是因为只有当 Watermark 通过窗口时,窗口才会计算。由于我们的 Watermark 是 current time - 5 seconds,第一个窗口 [5s-15s] 将在第20秒的时候计算。类似地,窗口 [10s-20s] 将在第25秒的时候计算,以此类推。
override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis - 5000)}
这里我们假设 EventTime 比当前系统时间晚5秒,但并不总是这样。大多数情况下,最好保存到目前为止接收到的最大时间戳(从消息中提取的),然后减去预期的延迟。
修改之后的代码的运行结果:
最终,我们得到了正确的结果,三个窗口都按预期提交统计值——(a, 2),(a, 3) 和 (a, 1)。
Allowed Lateness
在我们使用 Watermark - delay 的早期方法中,窗口不会触发直到 Watermark 超过了 window_length + delay。如果你想适应延迟事件,并且希望窗口能够准时启动,您可以使用 Allowed Lateness。如果 Allowed Lateness 被设置了,Flink 将不会丢弃消息直到它超过 window_end_time + allowed lateness。一旦一条延迟的消息被接收,Flink 将会提取它的时间戳,并检查它是否在 allowed lateness 里,然后它会检查是否触发窗口(根据触发器集)。所以,注意这种方式中,一个窗口可能被多次触发,如果只需要一次处理,您可能希望使sink具有幂等性。
总结
实时流处理系统的重要性日益增长,必须处理延迟消息是构建此类系统的一部分。在这篇博客中,我们看到延迟的消息是怎样影响系统的结果,以及 Flink 的事件时间处理功能是怎么解决这些问题的。
这篇关于Apache Flink 事件时间处理和 Watermarks的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!