flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间

本文主要是介绍flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 1. 空闲等待
      • 1.1 空闲等待
      • 1.2 withIdleness
      • 1.3 源码


1. 空闲等待

1.1 空闲等待

  多并行度的flink作业,watermark水位线的传递遵循接收到上游多个水位线时取最小往下游多个子任务发送水位线时进行广播。此时,如果有其中一个子任务没有数据,导致当前Task的水位线无法推进,窗口无法触发,需要等待上游最小的水位线达到触发时间。于是,flink添加了空闲等待的设置

1.2 withIdleness

  在设置WatermarkStrategy时,添加.withIdleness(Duration.ofSeconds(5))

        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy//升序的watermark,没有等待时间.<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {return new MyWatermark<>(Duration.ofSeconds(3));}})//指定时间戳分配器,从数据中提取时间戳.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);//返回的数据为毫秒return element.getTs() * 1000;}}).withIdleness(Duration.ofSeconds(5));

1.3 源码

  其核心逻辑为:

@Public
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {private final WatermarkGenerator<T> watermarks;private final IdlenessTimer idlenessTimer;private boolean isIdleNow = false;/*** Creates a new WatermarksWithIdleness generator to the given generator idleness detection with* the given timeout.** @param watermarks The original watermark generator.* @param idleTimeout The timeout for the idleness detection.*/public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {this(watermarks, idleTimeout, SystemClock.getInstance());}@VisibleForTestingWatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {checkNotNull(idleTimeout, "idleTimeout");checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),"idleTimeout must be greater than zero");this.watermarks = checkNotNull(watermarks, "watermarks");this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);}@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {watermarks.onEvent(event, eventTimestamp, output);idlenessTimer.activity();isIdleNow = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {if (idlenessTimer.checkIfIdle()) {if (!isIdleNow) {output.markIdle();isIdleNow = true;}} else {watermarks.onPeriodicEmit(output);}}// ------------------------------------------------------------------------@VisibleForTestingstatic final class IdlenessTimer {/** The clock used to measure elapsed time. */private final Clock clock;/** Counter to detect change. No problem if it overflows. */private long counter;/** The value of the counter at the last activity check. */private long lastCounter;/*** The first time (relative to {@link Clock#relativeTimeNanos()}) when the activity check* found that no activity happened since the last check. Special value: 0 = no timer.*/private long startOfInactivityNanos;/** The duration before the output is marked as idle. */private final long maxIdleTimeNanos;IdlenessTimer(Clock clock, Duration idleTimeout) {this.clock = clock;long idleNanos;try {idleNanos = idleTimeout.toNanos();} catch (ArithmeticException ignored) {// long integer overflowidleNanos = Long.MAX_VALUE;}this.maxIdleTimeNanos = idleNanos;}public void activity() {counter++;}public boolean checkIfIdle() {if (counter != lastCounter) {// activity since the last check. we reset the timerlastCounter = counter;startOfInactivityNanos = 0L;return false;} else // timer started but has not yet reached idle timeoutif (startOfInactivityNanos == 0L) {// first time that we see no activity since the last periodic probe// begin the timerstartOfInactivityNanos = clock.relativeTimeNanos();return false;} else {return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;}}}
}

checkIfIdle()方法用于判断是否触发水位线推进


这篇关于flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/769885

相关文章

Nginx设置连接超时并进行测试的方法步骤

《Nginx设置连接超时并进行测试的方法步骤》在高并发场景下,如果客户端与服务器的连接长时间未响应,会占用大量的系统资源,影响其他正常请求的处理效率,为了解决这个问题,可以通过设置Nginx的连接... 目录设置连接超时目的操作步骤测试连接超时测试方法:总结:设置连接超时目的设置客户端与服务器之间的连接

mybatis和mybatis-plus设置值为null不起作用问题及解决

《mybatis和mybatis-plus设置值为null不起作用问题及解决》Mybatis-Plus的FieldStrategy主要用于控制新增、更新和查询时对空值的处理策略,通过配置不同的策略类型... 目录MyBATis-plusFieldStrategy作用FieldStrategy类型每种策略的作

如何利用Java获取当天的开始和结束时间

《如何利用Java获取当天的开始和结束时间》:本文主要介绍如何使用Java8的LocalDate和LocalDateTime类获取指定日期的开始和结束时间,展示了如何通过这些类进行日期和时间的处... 目录前言1. Java日期时间API概述2. 获取当天的开始和结束时间代码解析运行结果3. 总结前言在J

CSS弹性布局常用设置方式

《CSS弹性布局常用设置方式》文章总结了CSS布局与样式的常用属性和技巧,包括视口单位、弹性盒子布局、浮动元素、背景和边框样式、文本和阴影效果、溢出隐藏、定位以及背景渐变等,通过这些技巧,可以实现复杂... 一、单位元素vm 1vm 为视口的1%vh 视口高的1%vmin 参照长边vmax 参照长边re

修改若依框架Token的过期时间问题

《修改若依框架Token的过期时间问题》本文介绍了如何修改若依框架中Token的过期时间,通过修改`application.yml`文件中的配置来实现,默认单位为分钟,希望此经验对大家有所帮助,也欢迎... 目录修改若依框架Token的过期时间修改Token的过期时间关闭Token的过期时js间总结修改若依

Go Mongox轻松实现MongoDB的时间字段自动填充

《GoMongox轻松实现MongoDB的时间字段自动填充》这篇文章主要为大家详细介绍了Go语言如何使用mongox库,在插入和更新数据时自动填充时间字段,从而提升开发效率并减少重复代码,需要的可以... 目录前言时间字段填充规则Mongox 的安装使用 Mongox 进行插入操作使用 Mongox 进行更

Windows设置nginx启动端口的方法

《Windows设置nginx启动端口的方法》在服务器配置与开发过程中,nginx作为一款高效的HTTP和反向代理服务器,被广泛应用,而在Windows系统中,合理设置nginx的启动端口,是确保其正... 目录一、为什么要设置 nginx 启动端口二、设置步骤三、常见问题及解决一、为什么要设置 nginx

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

一文详解Java Condition的await和signal等待通知机制

《一文详解JavaCondition的await和signal等待通知机制》这篇文章主要为大家详细介绍了JavaCondition的await和signal等待通知机制的相关知识,文中的示例代码讲... 目录1. Condition的核心方法2. 使用场景与优势3. 使用流程与规范基本模板生产者-消费者示例