Flink中在source流中自定义timestamp和watermark

2023-12-01 22:08

本文主要是介绍Flink中在source流中自定义timestamp和watermark,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

To work with Event Time, streaming programs need to set the time characteristic accordingly.

首先配置成,Event Time

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

Assigning Timestamps


接着,我们需要定义如何去获取event time和如何产生Watermark?

一种方式,在source中写死,

复制代码
@Override
public void run(SourceContext<MyType> ctx) throws Exception {while (/* condition */) {MyType next = getNext();ctx.collectWithTimestamp(next, next.getEventTimestamp());if (next.hasWatermarkTime()) {ctx.emitWatermark(new Watermark(next.getWatermarkTime()));}}
}
复制代码

这种方式明显比较low,不太方便,并且这种方式是会被TimestampAssigner 覆盖掉的,

所以看看第二种方式,

Timestamp Assigners / Watermark Generators


一般在会在source后加些map,filter做些初始化或格式化

然后,在任意需要用到event time的操作之前,比如window,进行设置

给个例子,

复制代码
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).timeWindow(Time.seconds(10)).reduce( (a, b) -> a.add(b) ).addSink(...);
复制代码

 

那么Timestamp Assigners如何实现,比如例子中给出的MyTimestampsAndWatermarks

有3种,

复制代码
DataStream<MyEvent> stream = ...DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {@Overridepublic long extractAscendingTimestamp(MyEvent element) {return element.getCreationTime();}
});
复制代码

这种没人用吧,不如直接用processing time了


定期的发送,你可以通过ExecutionConfig.setAutoWatermarkInterval(...),来设置这个频率

复制代码
/*** This generator generates watermarks assuming that elements come out of order to a certain degree only.* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest* elements for timestamp t.*/
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
}/*** This generator generates watermarks that are lagging behind processing time by a certain amount.* It assumes that elements arrive in Flink after at most a certain time.*/
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds
@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - maxTimeLag);}
}
复制代码

 

上面给出两个case,区别是第一种,会以event time的Max,来设置watermark

第二种,是以当前的processing time来设置watermark

 

With Punctuated Watermarks

To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use theAssignerWithPunctuatedWatermarks. For this class, Flink will first call the extractTimestamp(...) method to assign the element a timestamp, and then immediately call for that element the checkAndGetNextWatermark(...)method.

The checkAndGetNextWatermark(...) method gets the timestamp that was assigned in the extractTimestamp(...) method, and can decide whether it wants to generate a Watermark. Whenever the checkAndGetNextWatermark(...) method returns a non-null Watermark, and that Watermark is larger than the latest previous Watermark, that new Watermark will be emitted.

这种即,watermark不是由时间来触发的,而是以特定的event触发的,即本到某些特殊的event或message,才触发watermark

所以它的接口叫,checkAndGetNextWatermark

需要先check

复制代码
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;}
}
复制代码

这篇关于Flink中在source流中自定义timestamp和watermark的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/442869

相关文章

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

Oracle type (自定义类型的使用)

oracle - type   type定义: oracle中自定义数据类型 oracle中有基本的数据类型,如number,varchar2,date,numeric,float....但有时候我们需要特殊的格式, 如将name定义为(firstname,lastname)的形式,我们想把这个作为一个表的一列看待,这时候就要我们自己定义一个数据类型 格式 :create or repla

HTML5自定义属性对象Dataset

原文转自HTML5自定义属性对象Dataset简介 一、html5 自定义属性介绍 之前翻译的“你必须知道的28个HTML5特征、窍门和技术”一文中对于HTML5中自定义合法属性data-已经做过些介绍,就是在HTML5中我们可以使用data-前缀设置我们需要的自定义属性,来进行一些数据的存放,例如我们要在一个文字按钮上存放相对应的id: <a href="javascript:" d

一步一步将PlantUML类图导出为自定义格式的XMI文件

一步一步将PlantUML类图导出为自定义格式的XMI文件 说明: 首次发表日期:2024-09-08PlantUML官网: https://plantuml.com/zh/PlantUML命令行文档: https://plantuml.com/zh/command-line#6a26f548831e6a8cPlantUML XMI文档: https://plantuml.com/zh/xmi

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

argodb自定义函数读取hdfs文件的注意点,避免FileSystem已关闭异常

一、问题描述 一位同学反馈,他写的argo存过中调用了一个自定义函数,函数会加载hdfs上的一个文件,但有些节点会报FileSystem closed异常,同时有时任务会成功,有时会失败。 二、问题分析 argodb的计算引擎是基于spark的定制化引擎,对于自定义函数的调用跟hive on spark的是一致的。udf要通过反射生成实例,然后迭代调用evaluate。通过代码分析,udf在

鸿蒙开发中实现自定义弹窗 (CustomDialog)

效果图 #思路 创建带有 @CustomDialog 修饰的组件 ,并且在组件内部定义controller: CustomDialogController 实例化CustomDialogController,加载组件,open()-> 打开对话框 , close() -> 关闭对话框 #定义弹窗 (CustomDialog)是什么? CustomDialog是自定义弹窗,可用于广告、中

mybatis框架基础以及自定义插件开发

文章目录 框架概览框架预览MyBatis框架的核心组件MyBatis框架的工作原理MyBatis框架的配置MyBatis框架的最佳实践 自定义插件开发1. 添加依赖2. 创建插件类3. 配置插件4. 启动类中注册插件5. 测试插件 参考文献 框架概览 MyBatis是一个优秀的持久层框架,它支持自定义SQL、存储过程以及高级映射,为开发者提供了极大的灵活性和便利性。以下是关于M