flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间

本文主要是介绍flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 1. 空闲等待
      • 1.1 空闲等待
      • 1.2 withIdleness
      • 1.3 源码


1. 空闲等待

1.1 空闲等待

  多并行度的flink作业,watermark水位线的传递遵循接收到上游多个水位线时取最小往下游多个子任务发送水位线时进行广播。此时,如果有其中一个子任务没有数据,导致当前Task的水位线无法推进,窗口无法触发,需要等待上游最小的水位线达到触发时间。于是,flink添加了空闲等待的设置

1.2 withIdleness

  在设置WatermarkStrategy时,添加.withIdleness(Duration.ofSeconds(5))

        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy//升序的watermark,没有等待时间.<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {return new MyWatermark<>(Duration.ofSeconds(3));}})//指定时间戳分配器,从数据中提取时间戳.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);//返回的数据为毫秒return element.getTs() * 1000;}}).withIdleness(Duration.ofSeconds(5));

1.3 源码

  其核心逻辑为:

@Public
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {private final WatermarkGenerator<T> watermarks;private final IdlenessTimer idlenessTimer;private boolean isIdleNow = false;/*** Creates a new WatermarksWithIdleness generator to the given generator idleness detection with* the given timeout.** @param watermarks The original watermark generator.* @param idleTimeout The timeout for the idleness detection.*/public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {this(watermarks, idleTimeout, SystemClock.getInstance());}@VisibleForTestingWatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {checkNotNull(idleTimeout, "idleTimeout");checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),"idleTimeout must be greater than zero");this.watermarks = checkNotNull(watermarks, "watermarks");this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);}@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {watermarks.onEvent(event, eventTimestamp, output);idlenessTimer.activity();isIdleNow = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {if (idlenessTimer.checkIfIdle()) {if (!isIdleNow) {output.markIdle();isIdleNow = true;}} else {watermarks.onPeriodicEmit(output);}}// ------------------------------------------------------------------------@VisibleForTestingstatic final class IdlenessTimer {/** The clock used to measure elapsed time. */private final Clock clock;/** Counter to detect change. No problem if it overflows. */private long counter;/** The value of the counter at the last activity check. */private long lastCounter;/*** The first time (relative to {@link Clock#relativeTimeNanos()}) when the activity check* found that no activity happened since the last check. Special value: 0 = no timer.*/private long startOfInactivityNanos;/** The duration before the output is marked as idle. */private final long maxIdleTimeNanos;IdlenessTimer(Clock clock, Duration idleTimeout) {this.clock = clock;long idleNanos;try {idleNanos = idleTimeout.toNanos();} catch (ArithmeticException ignored) {// long integer overflowidleNanos = Long.MAX_VALUE;}this.maxIdleTimeNanos = idleNanos;}public void activity() {counter++;}public boolean checkIfIdle() {if (counter != lastCounter) {// activity since the last check. we reset the timerlastCounter = counter;startOfInactivityNanos = 0L;return false;} else // timer started but has not yet reached idle timeoutif (startOfInactivityNanos == 0L) {// first time that we see no activity since the last periodic probe// begin the timerstartOfInactivityNanos = clock.relativeTimeNanos();return false;} else {return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;}}}
}

checkIfIdle()方法用于判断是否触发水位线推进


这篇关于flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/769885

相关文章

linux hostname设置全过程

《linuxhostname设置全过程》:本文主要介绍linuxhostname设置全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录查询hostname设置步骤其它相关点hostid/etc/hostsEDChina编程A工具license破解注意事项总结以RHE

go中的时间处理过程

《go中的时间处理过程》:本文主要介绍go中的时间处理过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 获取当前时间2 获取当前时间戳3 获取当前时间的字符串格式4 相互转化4.1 时间戳转时间字符串 (int64 > string)4.2 时间字符串转时间

Python设置Cookie永不超时的详细指南

《Python设置Cookie永不超时的详细指南》Cookie是一种存储在用户浏览器中的小型数据片段,用于记录用户的登录状态、偏好设置等信息,下面小编就来和大家详细讲讲Python如何设置Cookie... 目录一、Cookie的作用与重要性二、Cookie过期的原因三、实现Cookie永不超时的方法(一)

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

在Golang中实现定时任务的几种高效方法

《在Golang中实现定时任务的几种高效方法》本文将详细介绍在Golang中实现定时任务的几种高效方法,包括time包中的Ticker和Timer、第三方库cron的使用,以及基于channel和go... 目录背景介绍目的和范围预期读者文档结构概述术语表核心概念与联系故事引入核心概念解释核心概念之间的关系

springboot如何通过http动态操作xxl-job任务

《springboot如何通过http动态操作xxl-job任务》:本文主要介绍springboot如何通过http动态操作xxl-job任务的问题,具有很好的参考价值,希望对大家有所帮助,如有错... 目录springboot通过http动态操作xxl-job任务一、maven依赖二、配置文件三、xxl-

Qt 设置软件版本信息的实现

《Qt设置软件版本信息的实现》本文介绍了Qt项目中设置版本信息的三种常用方法,包括.pro文件和version.rc配置、CMakeLists.txt与version.h.in结合,具有一定的参考... 目录在运行程序期间设置版本信息可以参考VS在 QT 中设置软件版本信息的几种方法方法一:通过 .pro

PostgreSQL 默认隔离级别的设置

《PostgreSQL默认隔离级别的设置》PostgreSQL的默认事务隔离级别是读已提交,这是其事务处理系统的基础行为模式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一 默认隔离级别概述1.1 默认设置1.2 各版本一致性二 读已提交的特性2.1 行为特征2.2

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

mtu设置多少网速最快? 路由器MTU设置最佳网速的技巧

《mtu设置多少网速最快?路由器MTU设置最佳网速的技巧》mtu设置多少网速最快?想要通过设置路由器mtu获得最佳网速,该怎么设置呢?下面我们就来看看路由器MTU设置最佳网速的技巧... 答:1500 MTU值指的是在网络传输中数据包的最大值,合理的设置MTU 值可以让网络更快!mtu设置可以优化不同的网