Flink系列之Time和WaterMark

2023-12-25 17:38
文章标签 系列 flink time watermark

本文主要是介绍Flink系列之Time和WaterMark,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  当数据进入Flink的时候,数据需要带入相应的时间,根据相应的时间进行处理。

  让咱们想象一个场景,有一个队列,分别带着指定的时间,那么处理的时候,需要根据相应的时间进行处理,比如:统计最近五分钟的访问量,那么就需要知道数据到来的时间。五分钟以内的数据将会被计入,超过五分钟的将会计入下一个计算窗口。

  那么Flink的Time分为三种:

  ProcessingTime : 处理时间,即算子处理数据的机器产生的时间,数据的流程一般是Source -> Transform (Operator,即算子) -> Sink(保存数据)。ProcessingTime出现在Transform,算子计算的点。这个比较好生成,数据一旦到了算子就会生成。如果在分布式系统中或异步系统中,对于一个消息系统,有的机器处理的块,有的机器消费的慢,算子和算子之间的处理速度,还有可能有些机器出故障了,ProcessingTime将不能根据时间产生决定数据的结果,因为什么时候到了算子才会生成这个时间。

  EventTime : 事件时间,此事件一般是产生数据的源头生成的。带着event time的事件进入flink的source之后就可以把事件事件进行提取,提取出来之后可以根据这个时间处理需要一致性和决定性的结果。比如,计算一个小时或者五分钟内的数据访问量。当然数据的EventTime可以是有序的,也可以是无序的。有序的数据大家比较好理解,比如,第一秒到第一条,第二秒到第二条数据。无序的数据,举个例子要计算五秒的数据,假如现在为10:00:00, 那么数据EventTime在[10:00:00 10:00:05), [10:00:05 10:00:10),加入一条数据是04秒产生的,那么由于机器处理的慢,该数据在08秒的时候到了,这个时候我们理解该数据就是无序的。可以通过WaterMark的机制处理无序的数据,一会儿咱们在文章中继续解释。

  IngestionTime : 摄入时间,即数据进入Flink的Source的时候计入的时间。相对于以上两个时间,IngestionTime 介于 ProcessingTime 和 EventTime之间,相比于ProcessingTime,生成的更加方便快捷,ProcessingTime每次进入一个Operator(算子,即map、flatMap、reduce等)都会产生一个时间,而IngestionTime在进入Flink的时候就产生了timestamp。相比于eventTime,它不能处理无序的事件,因为每次进入source产生的时间都是有序的,IngestionTime也无须产生WaterMark,因为会自动生成。

  如果大家还不是特别理解的话,咱们从官网拿一张图来展示,这个会比较一目了然。

 

 

 

   Event Producer 产生数据,这个时候就带上EventTime了,这个时候比如用户访问的记录,访问的时间就是EventTime。然后放入了MessageQueue-消息队列,进入到Flink Source的时候可以生成IngetionTime,也就是被Flink "吞" 进入时的时间,可以这么理解一下。然后由Source再进入到Operator-算子,也就是将数据进行转换,如Map, FlatMap等操作,这个时候每进入一个Operator都会生成一个时间即ProcessingTime。

  IngestionTime和ProcessingTime都是生成的,所以时间是升序的,里边的时间戳timestamp和水位线Watermark都是自动生成的,所以不用考虑这个。而EventTime与其他两个有些差异,它可以是升序的,也可以不是无序的。

  假如一个消息队列来了带着事件时间,时间为: 1, 2,3,4, 5。 这个加入是数据过来的时间顺序,如果需要统计2秒之间的数据的话,那么就会产生的窗口数据为[1,2], [3,4] [5],这个有序时间。

  多数情况下,从消息队列过来的数据一般时间一般没有顺序。比如过来的数据事件时间为 1,3,2,4,5,那么我们想要正确2秒的数据,我们就需要引入Watermark, 水位线一说,这个水位线的含义就是当数据达到了这个水位线的时候才触发真正的数据统计,对于窗口来讲,也就是将窗口进行关闭然后进行统计。假如我们允许最大的延迟时间为1秒,那么这些数据将会分成:

  1, 3, 2 | 水位线 |  4,5 | 水位线 |

  1 -> 分到1-2的窗口中。

  3 -> 新创建一个窗口(3-4),然后分到3-4的窗口中。

  2 -> 分到1-2的窗口看。

  水位线 -> 进行窗口统计和数据汇总。

  4 -> 分到3-4的窗口。

  5 -> 新建一个窗口(5-6),然后分配到新窗口中。

  不知道这样写大家能不能理解呢,如果觉得有问题的话可以给我留言。

  上面的这样是延迟数据的处理机制,当然还有并行流处理的情况,这种情况下有的数据慢,有的数据快,那么eventTime小的数据会先流到下一个算子上,下面事件时间14和29在到window的时候,那么14会先流到window进行处理,

  在Source之后会产生对应的watermark,不同的source接入不同的数据将会分别产生对应的watermark,当然watermark也需要遵循着从小到大进行触发,保证数据的正确处理。

 

 

 

 

 

 

 

 

 

 

 

 

 

  Watermark的设定:

  一种是Punctuated Watermark, 翻译过来应该是“间断的水位线”,咱们来看下原文

  To generate watermarks whenever a certain event indicates that a new watermark might be generated, use AssignerWithPunctuatedWatermarks. For this class Flink will first call the extractTimestamp(...) method to assign the element a timestamp, and then immediately call the checkAndGetNextWatermark(...) method on that element.

  如果数据是间断性的,那么可以使用这个作为产生watermark的方式。如果一直有数据且EventTime是递增的话,那么每条数据就会产生一条数据,这种情况下会对系统造成负载,所以连续产生数据的情况下使用这种不合适。这个方法首先调用的是extractTimestamp用于抽取时间戳,checkAndGetNextWatermark用于检查和生成下一个水位线。

  

  第二种是Periodic Watermark,翻译过来是“周期性水位线”,看下原文

  AssignerWithPeriodicWatermarks assigns timestamps and generates watermarks periodically (possibly depending on the stream elements, or purely based on processing time).

  周期性的获取timestamp和生成watermark。可以依赖流元素的时间,比如EventTime或者ProcessingTime。这个接口先调用extractTimestamp方法获取timestamp,接着调用getCurrentWatermark生成相应的时间戳。

  

  这种周期性水位线有如下三种实现:

  1)AscendingTimestampExtractor,如果数据产生的时间是升序的,可以使用这个实现获取timestamp和生成watermark。这种情况下,如果有数据升序中有小于当前时间戳的事件时,比如1,2,3,2,4,在这种情况下数据2将会丢失。丢失的数据可以通过sideOutputLateData获取到。

  2)BoundedOutOfOrdernessTimestampExtractor,如果数据是无需的,可以使用这个实现,指定相应的延迟时间。

  3)IngestionTimeExtractor, 这个是当指定时间特性为IngestionTime时,直接生成时间戳和获取水印。

  

  下面写一个例子,进一步加深理解。以下是通过建立一个socket服务端,通过数据数据进行数据展示,数据分为word和时间戳来演示,首先指定时间特性为EventTime,默认的时间特性为ProcessingTime。将单词和时间戳进行解析拆分进行FlatMap进行数据解析成WordCount类,分配时间戳和生成水印,按word字段进行拆分,统计5秒钟的滚动窗口数据做reduce,最后是打印和输出。

package com.hqs.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import javax.annotation.Nullable;/*** @author huangqingshi* @Date 2020-01-11*/
public class SocketEventTime {public static void main(String[] args) throws Exception{//创建envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置流的时间特性,
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度env.setParallelism(1);//设置监听localhost:9000端口,以回车分割DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");DataStream<SocketWindowCount.WordCount> wordCountStream = text.flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {@Overridepublic void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {String[] args = value.split(",");out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));}}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SocketWindowCount.WordCount>() {long currentTimeStamp = 0L;//允许的最大延迟时间,单位为毫秒long maxDelayAllowed = 0L;long currentWaterMark;@Nullable@Overridepublic Watermark getCurrentWatermark() {currentWaterMark = currentTimeStamp - maxDelayAllowed;
//                        System.out.println("当前waterMark:" + currentWaterMark);return new Watermark(currentWaterMark);}@Overridepublic long extractTimestamp(SocketWindowCount.WordCount wordCount, long l) {long timeStamp = Long.parseLong(wordCount.timestamp);currentTimeStamp = Math.max(currentTimeStamp, timeStamp);System.out.println("Key:" + wordCount.word + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark+ ",当前水位线:" + (currentTimeStamp - maxDelayAllowed));return timeStamp;}});DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.keyBy("word").window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<SocketWindowCount.WordCount>() {@Overridepublic SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);t1.timestamp = wordCount.timestamp + "," + t1.timestamp;return t1;}});//将结果集进行打印
        windowsCounts.print();//提交所设置的执行env.execute("EventTime Example");}public static class WordCount {public String word;public String timestamp;public static SocketWindowCount.WordCount of(String word, String timestamp) {SocketWindowCount.WordCount wordCount = new SocketWindowCount.WordCount();wordCount.word = word;wordCount.timestamp = timestamp;return wordCount;}@Overridepublic String toString() {return "word:" + word + " timestamp:" + timestamp;}}}

  使用nc命令建立一个socket连接并且输入数据,前边为单词,后边为timestamp时间戳,大家可以转换为时间:      

huangqingshideMacBook-Pro:~ huangqingshi$ nc -lk 9000
hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000
hello,1553503187000
hello,1553503191000
hello,1553503192000
hello,1553503193000
hello,1553503194000
hello,1553503195000

  输出的结果如下,从上边我们看到最大延迟时间maxDelayAllowed为0秒,也就意味着采用升序的获取,等于使用AscendingTimestampExtractor,每来一条数据即生成一个时间戳和水位。因为中间有一条数据为155350318700,小于上边的数据,所以这条数据丢失了。当5秒的时候触发一个window时间,即数据的结果输出。

Key:hello,EventTime:1553503185000,前一条数据的水位线:0,当前水位线:1553503185000
Key:hello,EventTime:1553503186000,前一条数据的水位线:1553503185000,当前水位线:1553503186000
Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503186000,当前水位线:1553503187000
Key:hello,EventTime:1553503188000,前一条数据的水位线:1553503187000,当前水位线:1553503188000
Key:hello,EventTime:1553503189000,前一条数据的水位线:1553503188000,当前水位线:1553503189000
Key:hello,EventTime:1553503190000,前一条数据的水位线:1553503189000,当前水位线:1553503190000
word:hello timestamp:1553503185000,1553503186000,1553503187000,1553503188000,1553503189000
Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503190000,当前水位线:1553503190000
Key:hello,EventTime:1553503191000,前一条数据的水位线:1553503190000,当前水位线:1553503191000
Key:hello,EventTime:1553503192000,前一条数据的水位线:1553503191000,当前水位线:1553503192000
Key:hello,EventTime:1553503193000,前一条数据的水位线:1553503192000,当前水位线:1553503193000
Key:hello,EventTime:1553503194000,前一条数据的水位线:1553503193000,当前水位线:1553503194000
Key:hello,EventTime:1553503195000,前一条数据的水位线:1553503194000,当前水位线:1553503195000
word:hello timestamp:1553503190000,1553503191000,1553503192000,1553503193000,1553503194000

  下面咱们调整下最大延迟时间代码:

//允许的最大延迟时间,单位为毫秒
long maxDelayAllowed = 5000L;

  咱们来看下输出的结果,这次数据有了上边丢失的数据了。

Key:hello,EventTime:1553503185000,前一条数据的水位线:-5000,当前水位线:1553503180000
Key:hello,EventTime:1553503186000,前一条数据的水位线:1553503180000,当前水位线:1553503181000
Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503181000,当前水位线:1553503182000
Key:hello,EventTime:1553503188000,前一条数据的水位线:1553503182000,当前水位线:1553503183000
Key:hello,EventTime:1553503189000,前一条数据的水位线:1553503183000,当前水位线:1553503184000
Key:hello,EventTime:1553503190000,前一条数据的水位线:1553503184000,当前水位线:1553503185000
Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503185000,当前水位线:1553503185000
Key:hello,EventTime:1553503191000,前一条数据的水位线:1553503185000,当前水位线:1553503186000
Key:hello,EventTime:1553503191000,前一条数据的水位线:1553503186000,当前水位线:1553503186000
Key:hello,EventTime:1553503192000,前一条数据的水位线:1553503186000,当前水位线:1553503187000
Key:hello,EventTime:1553503193000,前一条数据的水位线:1553503187000,当前水位线:1553503188000
Key:hello,EventTime:1553503194000,前一条数据的水位线:1553503188000,当前水位线:1553503189000
Key:hello,EventTime:1553503195000,前一条数据的水位线:1553503189000,当前水位线:1553503190000
word:hello timestamp:1553503185000,1553503186000,1553503187000,1553503188000,1553503189000,1553503187000

  下面咱们来分析下上面的结果,第一条数据的时间为45秒整,上边的数据基本上是连续的,只有一条数据 1553503187000为47秒的时候出现了乱序中。再来回忆一下上边的代码,上边的数据延迟为5秒,统计的数据为5秒的滚动窗口的数据,将时间戳合起来。

  那么第一个汇总的窗口为[2019-03-25 16:39:45 2019-03-25 16:39:50),那么数据在什么时间触发窗口呢,也就是在输入1553503195000的时候进行的窗口汇总, 这条数据的时间为2019-03-25 16:39:55,水位线为2019-03-25 16:39:50,由此我们得出结论:

  当统计时间window窗口中有数据的时候,watermark时间 >= 窗口的结束时间时进行触发。

  如果想使用IngestionTime设置为时间特性的话,只需要更改几行代码即可。  

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<SocketWindowCount.WordCount> wordCountStream = text.flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {@Overridepublic void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {String[] args = value.split(",");out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));}}).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.keyBy("word").timeWindow(Time.seconds(5L)).reduce(new ReduceFunction<SocketWindowCount.WordCount>() {@Overridepublic SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);t1.timestamp = wordCount.timestamp + "," + t1.timestamp;return t1;}});

  如果要使用ProcessingTime,同理把时间特性改一下即可。完整的代码如下,红色的代码为改变的代码。

package com.hqs.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import javax.annotation.Nullable;
import java.sql.Timestamp;/*** @author huangqingshi* @Date 2020-01-11*/
public class SocketIngestionTime {public static void main(String[] args) throws Exception {//创建envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置流的时间特性,
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//设置并行度env.setParallelism(1);//设置监听localhost:9000端口,以回车分割DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");DataStream<SocketWindowCount.WordCount> wordCountStream = text.flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {@Overridepublic void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {String[] args = value.split(",");out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));}});DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.keyBy("word").window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<SocketWindowCount.WordCount>() {@Overridepublic SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);t1.timestamp = wordCount.timestamp + "," + t1.timestamp;return t1;}});//将结果集进行打印
        windowsCounts.print();//提交所设置的执行env.execute("EventTime Example");}public static class WordCount {public String word;public String timestamp;public static SocketWindowCount.WordCount of(String word, String timestamp) {SocketWindowCount.WordCount wordCount = new SocketWindowCount.WordCount();wordCount.word = word;wordCount.timestamp = timestamp;return wordCount;}@Overridepublic String toString() {return "word:" + word + " timestamp:" + timestamp;}}
}

  好了,如果有什么问题,可以留言或加我微信与我联系。

 

 

  

  

  

 

  

  

这篇关于Flink系列之Time和WaterMark的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/536360

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

GPT系列之:GPT-1,GPT-2,GPT-3详细解读

一、GPT1 论文:Improving Language Understanding by Generative Pre-Training 链接:https://cdn.openai.com/research-covers/languageunsupervised/language_understanding_paper.pdf 启发点:生成loss和微调loss同时作用,让下游任务来适应预训

linux 下Time_wait过多问题解决

转自:http://blog.csdn.net/jaylong35/article/details/6605077 问题起因: 自己开发了一个服务器和客户端,通过短连接的方式来进行通讯,由于过于频繁的创建连接,导致系统连接数量被占用,不能及时释放。看了一下18888,当时吓到了。 现象: 1、外部机器不能正常连接SSH 2、内向外不能够正常的ping通过,域名也不能正常解析。

Java基础回顾系列-第七天-高级编程之IO

Java基础回顾系列-第七天-高级编程之IO 文件操作字节流与字符流OutputStream字节输出流FileOutputStream InputStream字节输入流FileInputStream Writer字符输出流FileWriter Reader字符输入流字节流与字符流的区别转换流InputStreamReaderOutputStreamWriter 文件复制 字符编码内存操作流(

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

Java基础回顾系列-第三天-Lambda表达式

Java基础回顾系列-第三天-Lambda表达式 Lambda表达式方法引用引用静态方法引用实例化对象的方法引用特定类型的方法引用构造方法 内建函数式接口Function基础接口DoubleToIntFunction 类型转换接口Consumer消费型函数式接口Supplier供给型函数式接口Predicate断言型函数式接口 Stream API 该篇博文需重点了解:内建函数式

Java基础回顾系列-第二天-面向对象编程

面向对象编程 Java类核心开发结构面向对象封装继承多态 抽象类abstract接口interface抽象类与接口的区别深入分析类与对象内存分析 继承extends重写(Override)与重载(Overload)重写(Override)重载(Overload)重写与重载之间的区别总结 this关键字static关键字static变量static方法static代码块 代码块String类特