[尚硅谷flink学习笔记] 实战案例TopN 问题

2024-04-10 07:28

本文主要是介绍[尚硅谷flink学习笔记] 实战案例TopN 问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  • 实时统计一段时间内的出现次数最多的水位。* 例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。* 我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。

文章目录

        • 全窗口
        • 优化

全窗口
package org.example.process;...
public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 实时统计一段时间内的出现次数最多的水位。* 例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。* 我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。*/SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {return waterSensor.getTs() * 1000;}}));/*** 最近10s=窗口长度 每5s输出=滑动步长* 思路一:使用hashmap存储数据*/sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> collector) throws Exception {HashMap<Integer, Integer> vcCounter = new HashMap<>();elements.forEach(r->{Integer vc = r.getVc();vcCounter.put(vc, vcCounter.getOrDefault(vc, 0) +1);});// 使用 list 进行排序ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();vcCounter.forEach((k,v)->data.add(Tuple2.of(k,v)));// 降序data.sort(((o1, o2) -> o2.f1 - o1.f1));// 输出StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < Math.min(2, data.size()); i++) {Tuple2<Integer, Integer> tuple2 = data.get(i);stringBuilder.append("TOP").append(i+1).append(",").append("vc=").append(tuple2.f0).append(",count=").append(tuple2.f1).append("\r\n");}stringBuilder.append("窗口结束时间=").append(DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd hh:mm:ss.SSS")).append("================").append("\r\n");collector.collect(stringBuilder.toString());}}).print();env.execute();}}
优化
在上一小节的实现过程中,我们没有进行按键分区,直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为1,在实际应用中是要尽量避免的,所以Flink官方也并不推荐使用AllWindowedStream进行处理。另外,我们在全窗口函数中定义了HashMap来统计vc的出现次数,计算过程是要先收集齐所有数据、然后再逐一遍历更新HashMap,这显然不够高效。

基于这样的想法,我们可以从两个方面去做优化:一是对数据进行按键分区,分别统计vc的出现次数;二是进行增量聚合,得到结果最后再做排序输出。所以,我们可以使用增量聚合函数AggregateFunction进行浏览量的统计,然后结合ProcessWindowFunction排序输出来实现Top N的需求。
具体实现可以分成两步:先对每个vc统计出现次数,然后再将统计结果收集起来,排序输出最终结果。由于最后的排序还是基于每个时间窗口的,输出的统计结果中要包含窗口信息,我们可以输出包含了vc、出现次数(count)以及窗口结束时间的Tuple3。之后先按窗口结束时间分区,然后用KeyedProcessFunction来实现。
用KeyedProcessFunction来收集数据做排序,这时面对的是窗口聚合之后的数据流,而窗口已经不存在了;我们需要确保能够收集齐所有数据,所以应该在窗口结束时间基础上再“多等一会儿”。具体实现上,可以采用一个延迟触发的事件时间定时器。基于窗口的结束时间来设定延迟,其实并不需要等太久——因为我们是靠水位线的推进来触发定时器,而水位线的含义就是“之前的数据都到齐了”。所以我们只需要设置1毫秒的延迟,就一定可以保证这一点。
而在等待过程中,之前已经到达的数据应该缓存起来,我们这里用一个自定义的HashMap来进行存储,key为窗口的标记,value为List。之后每来一条数据,就把它添加到当前的HashMap中,并注册一个触发时间为窗口结束时间加1毫秒(windowEnd + 1)的定时器。待到水位线到达这个时间,定时器触发,我们可以保证当前窗口所有vc的统计结果Tuple3都到齐了;于是从HashMap中取出进行排序输出。

package org.example.process;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.example.bean.WaterSensor;
import org.example.utils.WaterSensorMapFunction;import java.time.Duration;
import java.util.*;public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));// 最近10秒= 窗口长度, 每5秒输出 = 滑动步长/*** TODO 思路二: 使用 KeyedProcessFunction实现* 1、按照vc做keyby,开窗,分别count*    ==》 增量聚合,计算 count*    ==》 全窗口,对计算结果 count值封装 ,  带上 窗口结束时间的 标签*          ==》 为了让同一个窗口时间范围的计算结果到一起去** 2、对同一个窗口范围的count值进行处理: 排序、取前N个*    =》 按照 windowEnd做keyby*    =》 使用process, 来一条调用一次,需要先存,分开存,用HashMap,key=windowEnd,value=List*      =》 使用定时器,对 存起来的结果 进行 排序、取前N个*/// 1. 按照 vc 分组、开窗、聚合(增量计算+全量打标签)//  开窗聚合后,就是普通的流,没有了窗口信息,需要自己打上窗口的标记 windowEndSingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS.keyBy(WaterSensor::getVc).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2. 按照窗口标签(窗口结束时间)keyby,保证同一个窗口时间范围的结果,到一起去。排序、取TopNwindowAgg.keyBy(r -> r.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下:* 第一个:输入类型 = 增量函数的输出  count值,Integer* 第二个:输出类型 = Tuple3(vc,count,windowEnd) ,带上 窗口结束时间 的标签* 第三个:key类型 , vc,Integer* 第四个:窗口类型*/public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {// 迭代器里面只有一条数据,next一次即可Integer count = elements.iterator().next();long windowEnd = context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {// 存不同窗口的 统计结果,key=windowEnd,value=list数据private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;// 要取的Top数量private int threshold;public TopN(int threshold) {this.threshold = threshold;dataListMap = new HashMap<>();}@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,只是一条数据,要排序,得到齐才行 ===》 存起来,不同窗口分开存// 1. 存到HashMap中Long windowEnd = value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含vc,不是该vc的第一条,直接添加到List中List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.add(value);} else {// 1.1 不包含vc,是该vc的第一条,需要初始化listList<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();dataList.add(value);dataListMap.put(windowEnd, dataList);}// 2. 注册一个定时器, windowEnd+1ms即可(// 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,只需要延迟1ms即可ctx.timerService().registerEventTimeTimer(windowEnd + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);// 定时器触发,同一个窗口范围的计算结果攒齐了,开始 排序、取TopNLong windowEnd = ctx.getCurrentKey();// 1. 排序List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {@Overridepublic int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {// 降序, 后 减 前return o2.f1 - o1.f1;}});// 2. 取TopNStringBuilder outStr = new StringBuilder();outStr.append("================================\n");// 遍历 排序后的 List,取出前 threshold 个, 考虑可能List不够2个的情况  ==》 List中元素的个数 和 2 取最小值for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);outStr.append("Top" + (i + 1) + "\n");outStr.append("vc=" + vcCount.f0 + "\n");outStr.append("count=" + vcCount.f1 + "\n");outStr.append("窗口结束时间=" + vcCount.f2 + "\n");outStr.append("================================\n");}// 用完的List,及时清理,节省资源dataList.clear();out.collect(outStr.toString());}}
}

这篇关于[尚硅谷flink学习笔记] 实战案例TopN 问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/890419

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

好题——hdu2522(小数问题:求1/n的第一个循环节)

好喜欢这题,第一次做小数问题,一开始真心没思路,然后参考了网上的一些资料。 知识点***********************************无限不循环小数即无理数,不能写作两整数之比*****************************(一开始没想到,小学没学好) 此题1/n肯定是一个有限循环小数,了解这些后就能做此题了。 按照除法的机制,用一个函数表示出来就可以了,代码如下

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于