flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间

本文主要是介绍flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 1. 空闲等待
      • 1.1 空闲等待
      • 1.2 withIdleness
      • 1.3 源码


1. 空闲等待

1.1 空闲等待

  多并行度的flink作业,watermark水位线的传递遵循接收到上游多个水位线时取最小往下游多个子任务发送水位线时进行广播。此时,如果有其中一个子任务没有数据,导致当前Task的水位线无法推进,窗口无法触发,需要等待上游最小的水位线达到触发时间。于是,flink添加了空闲等待的设置

1.2 withIdleness

  在设置WatermarkStrategy时,添加.withIdleness(Duration.ofSeconds(5))

        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy//升序的watermark,没有等待时间.<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {return new MyWatermark<>(Duration.ofSeconds(3));}})//指定时间戳分配器,从数据中提取时间戳.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);//返回的数据为毫秒return element.getTs() * 1000;}}).withIdleness(Duration.ofSeconds(5));

1.3 源码

  其核心逻辑为:

@Public
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {private final WatermarkGenerator<T> watermarks;private final IdlenessTimer idlenessTimer;private boolean isIdleNow = false;/*** Creates a new WatermarksWithIdleness generator to the given generator idleness detection with* the given timeout.** @param watermarks The original watermark generator.* @param idleTimeout The timeout for the idleness detection.*/public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {this(watermarks, idleTimeout, SystemClock.getInstance());}@VisibleForTestingWatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {checkNotNull(idleTimeout, "idleTimeout");checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),"idleTimeout must be greater than zero");this.watermarks = checkNotNull(watermarks, "watermarks");this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);}@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {watermarks.onEvent(event, eventTimestamp, output);idlenessTimer.activity();isIdleNow = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {if (idlenessTimer.checkIfIdle()) {if (!isIdleNow) {output.markIdle();isIdleNow = true;}} else {watermarks.onPeriodicEmit(output);}}// ------------------------------------------------------------------------@VisibleForTestingstatic final class IdlenessTimer {/** The clock used to measure elapsed time. */private final Clock clock;/** Counter to detect change. No problem if it overflows. */private long counter;/** The value of the counter at the last activity check. */private long lastCounter;/*** The first time (relative to {@link Clock#relativeTimeNanos()}) when the activity check* found that no activity happened since the last check. Special value: 0 = no timer.*/private long startOfInactivityNanos;/** The duration before the output is marked as idle. */private final long maxIdleTimeNanos;IdlenessTimer(Clock clock, Duration idleTimeout) {this.clock = clock;long idleNanos;try {idleNanos = idleTimeout.toNanos();} catch (ArithmeticException ignored) {// long integer overflowidleNanos = Long.MAX_VALUE;}this.maxIdleTimeNanos = idleNanos;}public void activity() {counter++;}public boolean checkIfIdle() {if (counter != lastCounter) {// activity since the last check. we reset the timerlastCounter = counter;startOfInactivityNanos = 0L;return false;} else // timer started but has not yet reached idle timeoutif (startOfInactivityNanos == 0L) {// first time that we see no activity since the last periodic probe// begin the timerstartOfInactivityNanos = clock.relativeTimeNanos();return false;} else {return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;}}}
}

checkIfIdle()方法用于判断是否触发水位线推进


这篇关于flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/769885

相关文章

Linux中chmod权限设置方式

《Linux中chmod权限设置方式》本文介绍了Linux系统中文件和目录权限的设置方法,包括chmod、chown和chgrp命令的使用,以及权限模式和符号模式的详细说明,通过这些命令,用户可以灵活... 目录设置基本权限命令:chmod1、权限介绍2、chmod命令常见用法和示例3、文件权限详解4、ch

什么是cron? Linux系统下Cron定时任务使用指南

《什么是cron?Linux系统下Cron定时任务使用指南》在日常的Linux系统管理和维护中,定时执行任务是非常常见的需求,你可能需要每天执行备份任务、清理系统日志或运行特定的脚本,而不想每天... 在管理 linux 服务器的过程中,总有一些任务需要我们定期或重复执行。就比如备份任务,通常会选在服务器资

SpringBoot项目引入token设置方式

《SpringBoot项目引入token设置方式》本文详细介绍了JWT(JSONWebToken)的基本概念、结构、应用场景以及工作原理,通过动手实践,展示了如何在SpringBoot项目中实现JWT... 目录一. 先了解熟悉JWT(jsON Web Token)1. JSON Web Token是什么鬼

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

使用Spring Cache时设置缓存键的注意事项详解

《使用SpringCache时设置缓存键的注意事项详解》在现代的Web应用中,缓存是提高系统性能和响应速度的重要手段之一,Spring框架提供了强大的缓存支持,通过​​@Cacheable​​、​​... 目录引言1. 缓存键的基本概念2. 默认缓存键生成器3. 自定义缓存键3.1 使用​​@Cacheab

java如何调用kettle设置变量和参数

《java如何调用kettle设置变量和参数》文章简要介绍了如何在Java中调用Kettle,并重点讨论了变量和参数的区别,以及在Java代码中如何正确设置和使用这些变量,避免覆盖Kettle中已设置... 目录Java调用kettle设置变量和参数java代码中变量会覆盖kettle里面设置的变量总结ja

如何使用 Bash 脚本中的time命令来统计命令执行时间(中英双语)

《如何使用Bash脚本中的time命令来统计命令执行时间(中英双语)》本文介绍了如何在Bash脚本中使用`time`命令来测量命令执行时间,包括`real`、`user`和`sys`三个时间指标,... 使用 Bash 脚本中的 time 命令来统计命令执行时间在日常的开发和运维过程中,性能监控和优化是不

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

Java将时间戳转换为Date对象的方法小结

《Java将时间戳转换为Date对象的方法小结》在Java编程中,处理日期和时间是一个常见需求,特别是在处理网络通信或者数据库操作时,本文主要为大家整理了Java中将时间戳转换为Date对象的方法... 目录1. 理解时间戳2. Date 类的构造函数3. 转换示例4. 处理可能的异常5. 考虑时区问题6.

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r