Flink 1.14.* Flink窗口创建和窗口计算源码

2024-08-30 05:12

本文主要是介绍Flink 1.14.* Flink窗口创建和窗口计算源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

解析Flink如何创建的窗口,和以聚合函数为例,窗口如何计算聚合函数

  • 一、构建不同窗口的build类
    • 1、全局窗口
    • 2、创建按键分流后的窗口
  • 二、在使用窗口处理数据流时,不同窗口创建的都是窗口算子WindowOperator
    • 1、聚合函数实现
    • 2、创建全局窗口(入参传的是NullByteKeySelector)
    • 3、创建按键分流后的窗口(入参传的是KeyedStream的KeySelector)
    • 3、WindowOperator

一、构建不同窗口的build类

这个是示例,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> input = env.fromElements(Tuple2.of("key1", 1),Tuple2.of("key1", 3),Tuple2.of("key2", 2),Tuple2.of("key2", 4));

1、全局窗口

下面是创建全局窗口的代码

AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowed = input.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
@PublicEvolving
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {return new AllWindowedStream(this, assigner);}

@Public
public class AllWindowedStream<T, W extends Window> {private final KeyedStream<T, Byte> input;private final WindowAssigner<? super T, W> windowAssigner;private Trigger<? super T, ? super W> trigger;private Evictor<? super T, ? super W> evictor;private long allowedLateness = 0L;private OutputTag<T> lateDataOutputTag;@PublicEvolvingpublic AllWindowedStream(DataStream<T> input, WindowAssigner<? super T, W> windowAssigner) {//这里设置input的KeySelector为null的对象this.input = input.keyBy(new NullByteKeySelector());this.windowAssigner = windowAssigner;this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());}
}

AllWindowedStream 是对整个数据流应用窗口操作的抽象,而不进行键分组。换句话说,AllWindowedStream 是对全局数据流进行窗口操作。

使用场景:

  1. 当你不需要对数据流进行键分组,而是希望对整个数据流应用窗口操作时,使用 AllWindowedStream
  2. 适用于全局统计、全局聚合等场景。

2、创建按键分流后的窗口

下面是根据第一位字段当键分流,针对键分的流数据,分别创建窗口

KeyedStream<Tuple2<String, Integer>, String> keyed = input.keyBy(value -> value.f0);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
 @PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream(this, assigner);}
@Public
public class WindowedStream<T, K, W extends Window> {private final KeyedStream<T, K> input;//WindowOperatorBuilder 是 Flink 内部用于构建窗口操作符的工具类。它主要用于在内部构建和配置窗口操作符(WindowOperator),并不直接用于用户代码中。WindowOperatorBuilder 提供了一种灵活的方式来配置窗口操作符的各种细节,包括窗口分配器、窗口触发器、窗口合并器等。private final WindowOperatorBuilder<T, K, W> builder;@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {//这里只需要设置input,input的keyBy已经在前面设置了this.input = input;//通过input.getKeySelector()获取KeyedStream设置的函数this.builder = new WindowOperatorBuilder(windowAssigner, windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), input.getExecutionConfig(), input.getType(), input.getKeySelector(), input.getKeyType());}//调用WindowedStream的trigger实际上调用的是WindowOperatorBuilder的trigger方法@PublicEvolvingpublic WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {this.builder.trigger(trigger);return this;}}   
public class WindowOperatorBuilder<T, K, W extends Window> {private static final String WINDOW_STATE_NAME = "window-contents";private final ExecutionConfig config;private final WindowAssigner<? super T, W> windowAssigner;private final TypeInformation<T> inputType;private final KeySelector<T, K> keySelector;private final TypeInformation<K> keyType;private Trigger<? super T, ? super W> trigger;@Nullableprivate Evictor<? super T, ? super W> evictor;private long allowedLateness = 0L;@Nullableprivate OutputTag<T> lateDataOutputTag;public WindowOperatorBuilder(WindowAssigner<? super T, W> windowAssigner, Trigger<? super T, ? super W> trigger, ExecutionConfig config, TypeInformation<T> inputType, KeySelector<T, K> keySelector, TypeInformation<K> keyType) {this.windowAssigner = windowAssigner;this.config = config;this.inputType = inputType;//把KeyedStream中的keySelector赋值到WindowOperatorBuilder的keySelectorthis.keySelector = keySelector;this.keyType = keyType;this.trigger = trigger;}
}

WindowedStream 是在对数据流进行键分组后,对每个键的子流应用窗口操作的抽象。也就是说,WindowedStream 是对每个键进行独立的窗口操作。

使用场景:

  1. 当你需要对数据流按键分组,并对每个键的子流应用窗口操作时,使用 WindowedStream
  2. 适用于需要对不同键进行独立统计和聚合的场景。

二、在使用窗口处理数据流时,不同窗口创建的都是窗口算子WindowOperator

这里以聚合函数为例,看不同的窗口类型创建的算子是什么。

1、聚合函数实现

 // 定义聚合函数AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> aggregateFunction =new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> createAccumulator() {return new Tuple2<>("", 0);}@Overridepublic Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {return new Tuple2<>(value.f0, value.f1 + accumulator.f1);}@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {return new Tuple2<>(a.f0, a.f1 + b.f1);}};//聚合函数接口 public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2);
}

2、创建全局窗口(入参传的是NullByteKeySelector)

根据上面知道,此时

@Public
public class AllWindowedStream<T, W extends Window> {@PublicEvolvingpublic AllWindowedStream(DataStream<T> input, WindowAssigner<? super T, W> windowAssigner) {//这里设置input的KeySelector为null的对象this.input = input.keyBy(new NullByteKeySelector());this.windowAssigner = windowAssigner;this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());}@PublicEvolvingpublic <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, AllWindowFunction<V, R, W> windowFunction, TypeInformation<ACC> accumulatorType, TypeInformation<R> resultType) {//根据AllWindowedStream的构造函数,知道此时this.input.getKeySelector()=new NullByteKeySelectorKeySelector<T, Byte> keySel = this.input.getKeySelector();//省略干扰代码AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor("window-contents", aggregateFunction, accumulatorType.createSerializer(this.getExecutionEnvironment().getConfig()));operator = new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.getExecutionEnvironment().getConfig()), keySel, this.input.getKeyType().createSerializer(this.getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueAllWindowFunction(windowFunction), this.trigger, this.allowedLateness, this.lateDataOutputTag);//省略干扰代码   return this.input.transform(opName, resultType, (OneInputStreamOperator)operator).forceNonParallel();}}
@PublicEvolving
public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {private final AggregateFunction<IN, ACC, OUT> aggFunction;public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeSerializer<ACC> typeSerializer) {super(name, typeSerializer, (Object)null);this.aggFunction = (AggregateFunction)Preconditions.checkNotNull(aggFunction);}
}

3、创建按键分流后的窗口(入参传的是KeyedStream的KeySelector)

public class WindowedStream<T, K, W extends Window> {@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder = new WindowOperatorBuilder(windowAssigner, windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), input.getExecutionConfig(), input.getType(), input.getKeySelector(), input.getKeyType());}public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType, TypeInformation<R> resultType) {//删除干扰代码aggregateFunction = (AggregateFunction)this.input.getExecutionEnvironment().clean(aggregateFunction);String opName = this.builder.generateOperatorName(aggregateFunction, windowFunction);OneInputStreamOperator<T, R> operator = this.builder.aggregate(aggregateFunction, windowFunction, accumulatorType);return this.input.transform(opName, resultType, operator);}}

通过上面我们知道builder指的是WindowOperatorBuilder,并且构造函数入参中的keySelector实际上是keyedStreamkeySelector

public class WindowOperatorBuilder<T, K, W extends Window> {public WindowOperatorBuilder(WindowAssigner<? super T, W> windowAssigner, Trigger<? super T, ? super W> trigger, ExecutionConfig config, TypeInformation<T> inputType, KeySelector<T, K> keySelector, TypeInformation<K> keyType) {this.windowAssigner = windowAssigner;this.config = config;this.inputType = inputType;//这个keySelector = keyedStream的keySelectorthis.keySelector = keySelector;this.keyType = keyType;this.trigger = trigger;}public <ACC, V, R> WindowOperator<K, T, ?, R, W> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {//删除干扰代码AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor("window-contents", aggregateFunction, accumulatorType.createSerializer(this.config));return this.buildWindowOperator(stateDesc, new InternalSingleValueWindowFunction(windowFunction));}private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc, InternalWindowFunction<ACC, R, K, W> function) {return new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config), stateDesc, function, this.trigger, this.allowedLateness, this.lateDataOutputTag);}}  
}    

两种窗口最后都是构建WindowOperator,只是传的参数不一样,其中全局窗口的keySelectornull对象,按键建窗口的keySelector是取的KeyedStream

3、WindowOperator

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {private final KeySelector<IN, K> keySelector;private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, long allowedLateness, OutputTag<IN> lateDataOutputTag) {//删除干扰代码    this.windowStateDescriptor = windowStateDescriptor;this.keySelector = (KeySelector)Preconditions.checkNotNull(keySelector);}public void open() throws Exception {if (this.windowStateDescriptor != null) {this.windowState = (InternalAppendingState)this.getOrCreateKeyedState(this.windowSerializer, this.windowStateDescriptor);}}//数据到的执行方法public void processElement(StreamRecord<IN> element) throws Exception {//它遍历名为elementWindows的迭代器Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);//这里有判断窗口是否是像会话窗口那种需要动态合并窗口的逻辑,为了不干扰理解,这里删除了那一块代码逻辑,有兴趣的可以专门去看一下//删除干扰代码Iterator var12 = elementWindows.iterator();label59:while(true) {Window window;TriggerResult triggerResult;while(true) {//在每次迭代中,它会检查窗口是否已经过期(isWindowLate方法)do {if (!var12.hasNext()) {break label59;}window = (Window)var12.next();} while(this.isWindowLate(window));//更新窗口的状态,将元素值添加到窗口状态中,并在触发器上下文中设置键和窗口isSkippedElement = false;this.windowState.setCurrentNamespace(window);//add方法this.windowState.add(element.getValue());this.triggerContext.key = key;this.triggerContext.window = window;//调用onElement方法对元素进行处理并检查触发器结果triggerResult = this.triggerContext.onElement(element);if (!triggerResult.isFire()) {//如果触发结果不需要触发(isFire() 返回 false),则跳出内部循环。break;}//如果窗口内容不为空,它将发出窗口内容并终止内部循环ACC contents = this.windowState.get();if (contents != null) {this.emitWindowContents(window, contents);break;}}//如果触发器结果要求清除窗口(isPurge()返回true),则会清除窗口状态if (triggerResult.isPurge()) {this.windowState.clear();}this.registerCleanupTimer(window);}}//水位线判断逻辑protected boolean isWindowLate(W window) {return this.windowAssigner.isEventTime() && this.cleanupTime(window) <= this.internalTimerService.currentWatermark();}}    

这里又发现了熟悉的接口,OneInputStreamOperator<IN, OUT>processElement方法实际上是父类接口Input<IN>的processElement方法

下面是WindowOperator的类关系图,和Flink 1.14.*中flatMap,filter等基本转换函数源码中RichFlatMapFunctionRichFilterFunction一样的父类AbstractUdfStreamOperator ,接口新增了特性
在这里插入图片描述
通过这些,大家心里应该有数了,不管是FlatMap还是Filter还是窗口,都是基于这个类关系图扩展下来的

这篇关于Flink 1.14.* Flink窗口创建和窗口计算源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1119893

相关文章

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

手把手教你idea中创建一个javaweb(webapp)项目详细图文教程

《手把手教你idea中创建一个javaweb(webapp)项目详细图文教程》:本文主要介绍如何使用IntelliJIDEA创建一个Maven项目,并配置Tomcat服务器进行运行,过程包括创建... 1.启动idea2.创建项目模板点击项目-新建项目-选择maven,显示如下页面输入项目名称,选择

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

bat脚本启动git bash窗口,并执行命令方式

《bat脚本启动gitbash窗口,并执行命令方式》本文介绍了如何在Windows服务器上使用cmd启动jar包时出现乱码的问题,并提供了解决方法——使用GitBash窗口启动并设置编码,通过编写s... 目录一、简介二、使用说明2.1 start.BAT脚本2.2 参数说明2.3 效果总结一、简介某些情

基于Redis有序集合实现滑动窗口限流的步骤

《基于Redis有序集合实现滑动窗口限流的步骤》滑动窗口算法是一种基于时间窗口的限流算法,通过动态地滑动窗口,可以动态调整限流的速率,Redis有序集合可以用来实现滑动窗口限流,本文介绍基于Redis... 滑动窗口算法是一种基于时间窗口的限流算法,它将时间划分为若干个固定大小的窗口,每个窗口内记录了该时间

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定

poj 1113 凸包+简单几何计算

题意: 给N个平面上的点,现在要在离点外L米处建城墙,使得城墙把所有点都包含进去且城墙的长度最短。 解析: 韬哥出的某次训练赛上A出的第一道计算几何,算是大水题吧。 用convexhull算法把凸包求出来,然后加加减减就A了。 计算见下图: 好久没玩画图了啊好开心。 代码: #include <iostream>#include <cstdio>#inclu