Flink Window中典型的增量聚合(ReduceFunction / AggregateFunction)

本文主要是介绍Flink Window中典型的增量聚合(ReduceFunction / AggregateFunction),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、什么是增量聚合函数

在Flink Window中定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪,这也就是窗口函数所需要做的事情。所以在窗口分配器之后,我们还要再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
窗口可以将数据收集起来,最基本的处理操作当然就是基于窗口内的数据进行聚合。
我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
在这里插入图片描述

二、ReduceFunction

源码解析

@FunctionalInterface
@Public
public interface ReduceFunction<T> extends Function, Serializable {T reduce(T var1, T var2) throws Exception;
}

实际案例
在Flink中,使用socket模拟实时的数据流DataStream,通过定义一个滚动窗口,窗口的大小为10s,按照id分区,使用reduce聚合函数实现value的累加统计

package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkWindowReduceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);// 注意这里为什么返回的是KeyedStream(建控流/分区流),而不是DataStreamKeyedStream<WaterSensor, String> keyedStream = streamSource// 使用map函数将输入的string转为一个WaterSensor类.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {// 这里写的很详细,如何把string转为的WaterSensor类String[] strings = s.split(",");String id = strings[0];Long ts = Long.valueOf(strings[1]);Integer vc = Integer.valueOf(strings[2]);WaterSensor waterSensor = new WaterSensor();waterSensor.setId(id);waterSensor.setTs(ts);waterSensor.setVc(vc);return waterSensor;//return new WaterSensor(strings[0],Long.valueOf(strings[1]),Integer.valueOf(strings[2])}})// 按照id做keyBy分区(提问:KeyBy是如何实现分区的?).keyBy(new KeySelector<WaterSensor, String>() {// 也可以直接使用lamda表达式更简单@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {// getId()方法就是return的waterSensor.idreturn waterSensor.getId();}});/*** 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(WindowFunctions)* .window()方法需要传入一个窗口分配器,它指明了窗口的类型* */SingleOutputStreamOperator<WaterSensor> outputStreamOperator = keyedStream// 设置滚动窗口的大小(10秒).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 使用匿名函数实现增量聚合函数ReduceFunction.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor waterSensor1, WaterSensor waterSensor2) throws Exception {System.out.println("调用reduce方法,之前的结果:" + waterSensor1 + ",现在来的数据:" + waterSensor2);return new WaterSensor(waterSensor1.getId(), System.currentTimeMillis(), waterSensor1.getVc() + waterSensor2.getVc());}});outputStreamOperator.print();streamExecutionEnvironment.execute();}
}

启动Flink程序,启动nc,模拟输入

nc -lk 8888
# 00-10秒输入
a,11111,1
# 11-20秒输入
a,11111,2
a,22222,3
# 21-30秒输入
a,11111,4

查看控制台打印结果

WaterSensor{id='a', ts=11111, vc=1}
调用reduce方法,之前的结果:WaterSensor{id='a', ts=11111, vc=2},现在来的数据:WaterSensor{id='a', ts=22222, vc=3}
WaterSensor{id='a', ts=1702022598011, vc=5}
WaterSensor{id='a', ts=11111, vc=4}

在这里插入图片描述

三、AggregateFunction

虽然ReduceFunction 可以解决大多数归约聚合的问题,但是我们通过上述案例可以发现:这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API 中的 aggregate 就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2);
}

接口中有四个方法:
1.createAccumulator()
创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
2.add()
将输入的元素添加到累加器中。
3.getResult()
从累加器中提取聚合的输出结果。
4.merge()
合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

这篇关于Flink Window中典型的增量聚合(ReduceFunction / AggregateFunction)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/472877

相关文章

poj2505(典型博弈)

题意:n = 1,输入一个k,每一次n可以乘以[2,9]中的任何一个数字,两个玩家轮流操作,谁先使得n >= k就胜出 这道题目感觉还不错,自己做了好久都没做出来,然后看了解题才理解的。 解题思路:能进入必败态的状态时必胜态,只能到达胜态的状态为必败态,当n >= K是必败态,[ceil(k/9.0),k-1]是必胜态, [ceil(ceil(k/9.0)/2.0),ceil(k/9.

Jenkins构建Maven聚合工程,指定构建子模块

一、设置单独编译构建子模块 配置: 1、Root POM指向父pom.xml 2、Goals and options指定构建模块的参数: mvn -pl project1/project1-son -am clean package 单独构建project1-son项目以及它所依赖的其它项目。 说明: mvn clean package -pl 父级模块名/子模块名 -am参数

js window.addEventListener 是什么?

window.addEventListener 是 JavaScript 中的一个方法,用于向指定对象(在这个情况下是 window 对象,代表浏览器窗口)添加事件监听器,以便在该对象上发生特定事件时执行相应的函数(称为事件处理函数或事件监听器)。 这个方法接受三个参数: 事件类型(type):一个字符串,表示要监听的事件类型。例如,"click" 表示鼠标点击事件,"load" 表示页面加

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

ElasticSearch的DSL查询⑤(ES数据聚合、DSL语法数据聚合、RestClient数据聚合)

目录 一、数据聚合 1.1 DSL实现聚合 1.1.1 Bucket聚合  1.1.2 带条件聚合 1.1.3 Metric聚合 1.1.4 总结 2.1 RestClient实现聚合 2.1.1 Bucket聚合 2.1.2 带条件聚合 2.2.3 Metric聚合 一、数据聚合 聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。例如:

Qt中window frame的影响

window frame 在创建图形化界面的时候,会创建窗口主体,上面会多出一条,周围多次一圈细边,这就叫window frame窗口框架,这是操作系统自带的。 这个对geometry的一些属性有一定影响,主要体现在Qt坐标系体系: 窗口当中包含一个按钮,这个按钮的坐标系是以父元素为参考,那么这个参考是widget本体作为参考,还是window frame作为参考,这两种参考体系都存在

深度剖析AI情感陪伴类产品及典型应用 Character.ai

前段时间AI圈内C.AI的受够风波可谓是让大家都丈二摸不着头脑,连C.AI这种行业top应用都要找谋生方法了!投资人摸不着头脑,用户们更摸不着头脑。在这之前断断续续玩了一下这款产品,这次也是乘着这个风波,除了了解一下为什么这么厉害的创始人 Noam Shazeer 也要另寻他路,以及产品本身的发展阶段和情况! 什么是Character.ai? Character.ai官网:https://

Caused by: android.view.WindowManager$BadTokenException: Unable to add window -- token android.os.B

一个bug日志 FATAL EXCEPTION: main03-25 14:24:07.724: E/AndroidRuntime(4135): java.lang.RuntimeException: Unable to start activity ComponentInfo{com.syyx.jingubang.ky/com.anguotech.android.activity.Init

最初的window

不知你是否也是一个常年在MFC下编程的程序员,有的时候是否忘记了在MFC之前是如何写画窗口的了呢,或者你从来都只是机械的在MFC下面写代码,已经麻木了。其实有一个很简单的方法,或许能够帮你更清楚的了解WINDOW是怎么产生的。 随便用什么版本的VS,在创建win32工程的时候,直接创建WINDOW类型的就OK了。然后,来研究下产生的源代码吧。 // Global Variables:H

七、Maven继承和聚合关系、及Maven的仓库及查找顺序

1.继承   2.聚合   3.Maven的仓库及查找顺序