Flink中的时间语义与Watermark概念

2023-12-25 08:38

本文主要是介绍Flink中的时间语义与Watermark概念,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、时间语义

1.1 时间语义类型

在这里插入图片描述

  • Event Time:事件创建的时间
  • Ingestion Time:数据进入Flink的时间
  • Processing Time:执行操作算子的本地系统时间,与机器相关

问题:哪种时间语义更重要?

不同的时间语义有不同的应用场合,通常更关心的是事件时间
在这里插入图片描述
某些应用场合,不应该使用Processing Time。Event Time可以从日志数据的时间戳(timestamp)中提取
在这里插入图片描述

1.2 实际应用

public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});env.execute();}
}

二、水位线(Watermark)

乱序数据的影响

  • 当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子
  • 由于网络、分布式等原因,会导致乱序数据的产生
    在这里插入图片描述

怎样避免乱序数据带来的计算不正确问题呢?

遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口

  • Watermark是一种衡量Event Time进展的机制,可以设定延迟触发
  • Watermark是用于处理乱序事件的,处理乱序事件正确的方法,通常是用Watermark机制结合window来实现
  • 数据流中的Watermark用于表示 timestamp小于Watermark 的数据,都已经到达了。因此,window的执行也是由Watermark触发的
  • watermark用来让程序自己平衡延迟和结果正确性

2.1 Flink三种方法保证数据准确性(三重保证)

(1)Watermark,可以保证 几百毫秒内 的乱序数据的准确性
(2)在(1)的基础上,可以再使用 allowedLateness 设置等待时间
(3)在(2)的基础上,可以再使用侧输出流

2.2 Watermark的特点

  • Watermark是一条特殊的数据记录
  • Watermark必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退
  • Watermark与数据的时间戳相关
    在这里插入图片描述

2.3 Watermark的传递

多个分区Watermark不同时,取最小值的Watermark,再将新的Watermark广播给下游算子;Watermark不更新时,不用广播
在这里插入图片描述

2.4 Watermark的引入

Event Time的使用一定要指定数据源中的时间戳;
调用dataStream.assignTimestampsAndWatermarks方法,传入一个BoundedOutOfOrdernessTimestampExtractor,即可指定Watermark

public class WindowTest4_UDFTimeStampAssigner {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//升序数据设置事件时间和watermarkdataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//乱序数据设置时间戳和watermark//BoundedOutOfOrdernessTimestampExtractor 有界乱序时间戳提取器dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading sensorReading) {return sensorReading.getTimestamp() * 1000L;}});env.execute();}
}

自定义的周期分配器:

public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading>{private Long bound = 60 * 1000L; //延迟一分钟private Long maxTs = Long.MIN_VALUE; //当前最大时间戳@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound);}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {maxTs = Math.max(maxTs, element.getTimestamp());return element.getTimestamp();}
}

自定义的断点分配器:

//断点分配器
public static class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading>{private Long bound = 60 * 1000L; //延迟一分钟@Nullable@Overridepublic Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {if(lastElement.getId().equals("sensor_1")){return new Watermark(extractedTimestamp - bound);}else {return null;}}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {return element.getTimestamp();}
}

2.5 Watermark的设定原则

  • 在Flink中,watermark由应用程序开发人员生成,这种通常需要对相应的领域有一定的了解
  • 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果
  • 如果watermark到达太早,则可能使收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题

事件时间语义下的窗口测试代码1:

public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermark// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});/*   //升序数据设置事件时间和watermarkdataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});*///乱序数据设置时间戳和watermark//BoundedOutOfOrdernessTimestampExtractor 有界乱序时间戳提取器dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading sensorReading) {return sensorReading.getTimestamp() * 1000L;}});// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream  = dataStream.keyBy("id").timeWindow(Time.seconds(15)).minBy("temperature");minTempStream.print("minTemp");env.execute();}
}

事件时间语义下的窗口测试代码2----迟到数据处理:

public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermark// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});/*   //升序数据设置事件时间和watermarkdataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});*///乱序数据设置时间戳和watermark//BoundedOutOfOrdernessTimestampExtractor 有界乱序时间戳提取器dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading sensorReading) {return sensorReading.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream  = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");env.execute();}
}

这篇关于Flink中的时间语义与Watermark概念的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/534895

相关文章

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

如何使用 Bash 脚本中的time命令来统计命令执行时间(中英双语)

《如何使用Bash脚本中的time命令来统计命令执行时间(中英双语)》本文介绍了如何在Bash脚本中使用`time`命令来测量命令执行时间,包括`real`、`user`和`sys`三个时间指标,... 使用 Bash 脚本中的 time 命令来统计命令执行时间在日常的开发和运维过程中,性能监控和优化是不

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

Java将时间戳转换为Date对象的方法小结

《Java将时间戳转换为Date对象的方法小结》在Java编程中,处理日期和时间是一个常见需求,特别是在处理网络通信或者数据库操作时,本文主要为大家整理了Java中将时间戳转换为Date对象的方法... 目录1. 理解时间戳2. Date 类的构造函数3. 转换示例4. 处理可能的异常5. 考虑时区问题6.

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

【VUE】跨域问题的概念,以及解决方法。

目录 1.跨域概念 2.解决方法 2.1 配置网络请求代理 2.2 使用@CrossOrigin 注解 2.3 通过配置文件实现跨域 2.4 添加 CorsWebFilter 来解决跨域问题 1.跨域概念 跨域问题是由于浏览器实施了同源策略,该策略要求请求的域名、协议和端口必须与提供资源的服务相同。如果不相同,则需要服务器显式地允许这种跨域请求。一般在springbo

MiniGPT-3D, 首个高效的3D点云大语言模型,仅需一张RTX3090显卡,训练一天时间,已开源

项目主页:https://tangyuan96.github.io/minigpt_3d_project_page/ 代码:https://github.com/TangYuan96/MiniGPT-3D 论文:https://arxiv.org/pdf/2405.01413 MiniGPT-3D在多个任务上取得了SoTA,被ACM MM2024接收,只拥有47.8M的可训练参数,在一张RTX

批处理以当前时间为文件名创建文件

批处理以当前时间为文件名创建文件 批处理创建空文件 有时候,需要创建以当前时间命名的文件,手动输入当然可以,但是有更省心的方法吗? 假设我是 windows 操作系统,打开命令行。 输入以下命令试试: echo %date:~0,4%_%date:~5,2%_%date:~8,2%_%time:~0,2%_%time:~3,2%_%time:~6,2% 输出类似: 2019_06

【MRI基础】TR 和 TE 时间概念

重复时间 (TR) 磁共振成像 (MRI) 中的 TR(重复时间,repetition time)是施加于同一切片的连续脉冲序列之间的时间间隔。具体而言,TR 是施加一个 RF(射频)脉冲与施加下一个 RF 脉冲之间的持续时间。TR 以毫秒 (ms) 为单位,主要控制后续脉冲之前的纵向弛豫程度(T1 弛豫),使其成为显著影响 MRI 中的图像对比度和信号特性的重要参数。 回声时间 (TE)

理解分类器(linear)为什么可以做语义方向的指导?(解纠缠)

Attribute Manipulation(属性编辑)、disentanglement(解纠缠)常用的两种做法:线性探针和PCA_disentanglement和alignment-CSDN博客 在解纠缠的过程中,有一种非常简单的方法来引导G向某个方向进行生成,然后我们通过向不同的方向进行行走,那么就会得到这个属性上的图像。那么你利用多个方向进行生成,便得到了各种方向的图像,每个方向对应了很多