flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间

本文主要是介绍flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 1. 空闲等待
      • 1.1 空闲等待
      • 1.2 withIdleness
      • 1.3 源码


1. 空闲等待

1.1 空闲等待

  多并行度的flink作业,watermark水位线的传递遵循接收到上游多个水位线时取最小往下游多个子任务发送水位线时进行广播。此时,如果有其中一个子任务没有数据,导致当前Task的水位线无法推进,窗口无法触发,需要等待上游最小的水位线达到触发时间。于是,flink添加了空闲等待的设置

1.2 withIdleness

  在设置WatermarkStrategy时,添加.withIdleness(Duration.ofSeconds(5))

        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy//升序的watermark,没有等待时间.<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {return new MyWatermark<>(Duration.ofSeconds(3));}})//指定时间戳分配器,从数据中提取时间戳.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);//返回的数据为毫秒return element.getTs() * 1000;}}).withIdleness(Duration.ofSeconds(5));

1.3 源码

  其核心逻辑为:

@Public
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {private final WatermarkGenerator<T> watermarks;private final IdlenessTimer idlenessTimer;private boolean isIdleNow = false;/*** Creates a new WatermarksWithIdleness generator to the given generator idleness detection with* the given timeout.** @param watermarks The original watermark generator.* @param idleTimeout The timeout for the idleness detection.*/public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {this(watermarks, idleTimeout, SystemClock.getInstance());}@VisibleForTestingWatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {checkNotNull(idleTimeout, "idleTimeout");checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),"idleTimeout must be greater than zero");this.watermarks = checkNotNull(watermarks, "watermarks");this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);}@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {watermarks.onEvent(event, eventTimestamp, output);idlenessTimer.activity();isIdleNow = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {if (idlenessTimer.checkIfIdle()) {if (!isIdleNow) {output.markIdle();isIdleNow = true;}} else {watermarks.onPeriodicEmit(output);}}// ------------------------------------------------------------------------@VisibleForTestingstatic final class IdlenessTimer {/** The clock used to measure elapsed time. */private final Clock clock;/** Counter to detect change. No problem if it overflows. */private long counter;/** The value of the counter at the last activity check. */private long lastCounter;/*** The first time (relative to {@link Clock#relativeTimeNanos()}) when the activity check* found that no activity happened since the last check. Special value: 0 = no timer.*/private long startOfInactivityNanos;/** The duration before the output is marked as idle. */private final long maxIdleTimeNanos;IdlenessTimer(Clock clock, Duration idleTimeout) {this.clock = clock;long idleNanos;try {idleNanos = idleTimeout.toNanos();} catch (ArithmeticException ignored) {// long integer overflowidleNanos = Long.MAX_VALUE;}this.maxIdleTimeNanos = idleNanos;}public void activity() {counter++;}public boolean checkIfIdle() {if (counter != lastCounter) {// activity since the last check. we reset the timerlastCounter = counter;startOfInactivityNanos = 0L;return false;} else // timer started but has not yet reached idle timeoutif (startOfInactivityNanos == 0L) {// first time that we see no activity since the last periodic probe// begin the timerstartOfInactivityNanos = clock.relativeTimeNanos();return false;} else {return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;}}}
}

checkIfIdle()方法用于判断是否触发水位线推进


这篇关于flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/769885

相关文章

Spring Boot 集成 Quartz并使用Cron 表达式实现定时任务

《SpringBoot集成Quartz并使用Cron表达式实现定时任务》本篇文章介绍了如何在SpringBoot中集成Quartz进行定时任务调度,并通过Cron表达式控制任务... 目录前言1. 添加 Quartz 依赖2. 创建 Quartz 任务3. 配置 Quartz 任务调度4. 启动 Sprin

Linux之计划任务和调度命令at/cron详解

《Linux之计划任务和调度命令at/cron详解》:本文主要介绍Linux之计划任务和调度命令at/cron的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux计划任务和调度命令at/cron一、计划任务二、命令{at}介绍三、命令语法及功能 :at

C#TextBox设置提示文本方式(SetHintText)

《C#TextBox设置提示文本方式(SetHintText)》:本文主要介绍C#TextBox设置提示文本方式(SetHintText),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录C#TextBox设置提示文本效果展示核心代码总结C#TextBox设置提示文本效果展示核心代

SpringQuartz定时任务核心组件JobDetail与Trigger配置

《SpringQuartz定时任务核心组件JobDetail与Trigger配置》Spring框架与Quartz调度器的集成提供了强大而灵活的定时任务解决方案,本文主要介绍了SpringQuartz定... 目录引言一、Spring Quartz基础架构1.1 核心组件概述1.2 Spring集成优势二、J

Java中Date、LocalDate、LocalDateTime、LocalTime、时间戳之间的相互转换代码

《Java中Date、LocalDate、LocalDateTime、LocalTime、时间戳之间的相互转换代码》:本文主要介绍Java中日期时间转换的多种方法,包括将Date转换为LocalD... 目录一、Date转LocalDateTime二、Date转LocalDate三、LocalDateTim

Pyserial设置缓冲区大小失败的问题解决

《Pyserial设置缓冲区大小失败的问题解决》本文主要介绍了Pyserial设置缓冲区大小失败的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录问题描述原因分析解决方案问题描述使用set_buffer_size()设置缓冲区大小后,buf

Java Optional避免空指针异常的实现

《JavaOptional避免空指针异常的实现》空指针异常一直是困扰开发者的常见问题之一,本文主要介绍了JavaOptional避免空指针异常的实现,帮助开发者编写更健壮、可读性更高的代码,减少因... 目录一、Optional 概述二、Optional 的创建三、Optional 的常用方法四、Optio

Redis实现延迟任务的三种方法详解

《Redis实现延迟任务的三种方法详解》延迟任务(DelayedTask)是指在未来的某个时间点,执行相应的任务,本文为大家整理了三种常见的实现方法,感兴趣的小伙伴可以参考一下... 目录1.前言2.Redis如何实现延迟任务3.代码实现3.1. 过期键通知事件实现3.2. 使用ZSet实现延迟任务3.3

golang获取当前时间、时间戳和时间字符串及它们之间的相互转换方法

《golang获取当前时间、时间戳和时间字符串及它们之间的相互转换方法》:本文主要介绍golang获取当前时间、时间戳和时间字符串及它们之间的相互转换,本文通过实例代码给大家介绍的非常详细,感兴趣... 目录1、获取当前时间2、获取当前时间戳3、获取当前时间的字符串格式4、它们之间的相互转化上篇文章给大家介

Feign Client超时时间设置不生效的解决方法

《FeignClient超时时间设置不生效的解决方法》这篇文章主要为大家详细介绍了FeignClient超时时间设置不生效的原因与解决方法,具有一定的的参考价值,希望对大家有一定的帮助... 在使用Feign Client时,可以通过两种方式来设置超时时间:1.针对整个Feign Client设置超时时间