Flink-1.12 - 之如何构建一个简单的TopN应用

2023-12-10 13:33

本文主要是介绍Flink-1.12 - 之如何构建一个简单的TopN应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink-1.12 - 之如何构建一个简单的TopN应用

本文主要介绍通过Flink-1.12如何构建一个简单的TopN应用,这里介绍

  • DataStream API构建
  • Flink SQL构建

1 maven依赖如下

    <!--当前版本的控制~~--><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.system.version>1.12.2</flink.system.version><scala.version>2.12</scala.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.system.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.system.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.version}</artifactId><version>${flink.system.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.system.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.system.version}</version></dependency>

2 使用DataStream API构建

package com.shufang.stream;import com.shufang.bean.Orders;
import com.shufang.bean.WindowOrderCount;
import com.shufang.func.MyOrderSourceFunction;
import com.shufang.util.MyUtil;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.Map;public class WindowAggrFunction_TopN_Optimize {public static void main(String[] args) throws Exception {//1 获取执行环境,1.12.0之后默认时间语义是EventTime,但是可以在EventTime mode下明确指定使用processingTimeStreamExecutionEnvironment env = MyUtil.getStreamEnv();env.setParallelism(10);// TODO 正则匹配,不以.css|.js|.png|.ico结束,通常可以用来过滤String regexpPattern = "^((?!\\.(css|js|png|ico)$).)*&";//2 从数据源获取数据,SingleOutputStreamOperator<Orders> orderDtlStream = env.addSource(new MyOrderSourceFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((order, timestamp) -> order.getTimestamp()));orderDtlStream.print("detail");//3 主要是统计最近10s钟内不同货币的交易次数,每5s钟更新一次结果输出,找出热门的交易货币,以及排名SingleOutputStreamOperator<WindowOrderCount> aggregateStream = orderDtlStream.keyBy(order -> order.getCurrency()).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).allowedLateness(Time.minutes(1))  // 当到了窗口的endTime,窗口会输出一个计算结果,但是窗口不会关闭,迟到的数据在一分钟进来都会参与计算并更新结果状态.aggregate(new MyOrderAggr(), new MyAllWindowFunction());orderDtlStream.print("agg");//3.1 要求出每个时间窗口的TopN,我们需要按照窗口分组,按照counts进行排序//windowEnd,key,countSingleOutputStreamOperator<String> top5Stream = aggregateStream.keyBy(wc -> wc.getWindowEnd()).process(new MyHotTopN(5));//4 进行输出top5Stream.print();env.execute("should specify a name");}/*** 定义一个processFunction,每来一次数据就存储State中,最终等到ontimer()的时候触发排序计算操作*/static class MyHotTopN extends KeyedProcessFunction<Long, WindowOrderCount, String> {// 定义一个控制TopN 的N的属性private Integer topSize;private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 定义一个MapState,用来保存每个窗口中的所有的<currency,counts>,最终使用onTimer()触发输出topNMapState<String, Long> mapState;public MyHotTopN(Integer topSize) {this.topSize = topSize;}// 初始化mapState状态@Overridepublic void open(Configuration parameters) throws Exception {mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("mapState", String.class, Long.class));}// 每来一条数据我们处理一次@Overridepublic void processElement(WindowOrderCount value, Context ctx, Collector<String> out) throws Exception {//1 将信息放入到mapState中mapState.put(value.getCurrency(), value.getCounts());//2 注册定时器1,等到每个窗口的endTime + 1,触发窗口的输出操作ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);//3 注册一个定时器,在窗口关闭之后清空该窗口的mapStatectx.timerService().registerEventTimeTimer(value.getWindowEnd() + 60 * 1000);}// 定时器内管理的生命周期的操作@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {//1 在process的时候为每个窗口注册了2个定时器,此时先判断是清空状态的定时器,还是输出窗口TopN的定时器if (timestamp == ctx.getCurrentKey() + 60 * 1000) {// 如果走进来,此时应该触发的定时器是清空的定时器,那么清空窗口的状态,并退出mapState.clear();return;}//2 如果走到这里,说明是输出结果的定时器,那么就进行topN的排序并输出结果String windowEndString = sdf.format(new Date(timestamp - 1));//3 拿到map中的所有的数据,进行排序ArrayList<Map.Entry<String, Long>> topNs = Lists.newArrayList(mapState.iterator());topNs.sort(new Comparator<Map.Entry<String, Long>>() {@Overridepublic int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {if (o1.getValue() < o2.getValue())return 1;else if (o1.getValue() > o2.getValue())return -1;elsereturn 0;}});//4 最终按照@topsize取topN,为了方便打印好看,以String类型遍历输出StringBuilder sb = new StringBuilder("=====================================");sb.append("窗口结束时间为:").append(windowEndString).append("\n");for (int i = 0; i < Math.min(topNs.size(), topSize); i++) {sb.append("当前的货币为:").append(topNs.get(i).getKey()).append(" || ");sb.append("当前的货币的在该时间段的交易次数为:").append(topNs.get(i).getValue()).append(" || ");sb.append("当前的交以次数排名为:").append(i + 1).append("\n");}sb.append("=====================================");out.collect(sb.toString());}}/*** 实现一个增量聚合的窗口函数 agg function ,该类型的窗口函数可以改变输出的类型* reduce不能改变输出的类型,输入输出的类型必须保持一致* Type parameters:* <IN> – 输出的event类型* <ACC> – 累加器的类型,每来一条数据更新一次累加器的状态 ,The type of the accumulator (intermediate aggregate state).* <OUT> – 最终聚合的结果类型 ,The type of the aggregated result*/static class MyOrderAggr implements AggregateFunction<Orders, Long, Long> {// 初始化累加器@Overridepublic Long createAccumulator() {return 0L;}// 累加器的计算逻辑,来一个event => + 1@Overridepublic Long add(Orders orders, Long acc) {return acc + 1;}// 获取累加器的值@Overridepublic Long getResult(Long acc) {return acc;}// 不同的累加器的merge操作@Overridepublic Long merge(Long aLong, Long acc1) {return aLong + acc1;}}/*** 定义一个全窗口函数,用来接收agg function的输出的 value类型,* Type parameters:* <IN> – 从AggFunction的输出类型作为输入类型 The type of the input value.* <OUT> – 最终的输出类型,可以随意定义 The type of the output value.* <KEY> – keyedStream的key的类型 ,The type of the key.* <W> – 这个应用所在的窗口的类型 ,The type of Window that this window function can be applied on.*/static class MyAllWindowFunction implements WindowFunction<Long, WindowOrderCount, String, TimeWindow> {@Overridepublic void apply(String key, TimeWindow window, Iterable<Long> input, Collector<WindowOrderCount> out) throws Exception {Long count = input.iterator().next(); //从累加器获取的统计累加值long windowEnd = window.getEnd(); //窗口的标识:这里是窗口的endTime//最终返回我们需要的类型WindowOrderCount(windowEnd,currency,counts)out.collect(new WindowOrderCount(windowEnd, key, count));}}
}

3 通过Flink SQL构建

package com.shufang.stream;import com.shufang.bean.Orders;
import com.shufang.func.MyOrderSourceFunction;
import com.shufang.util.MyUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;public class TableSQLAPi_TopN {public static void main(String[] args) throws Exception {//1 获取执行环境,1.12.0之后默认时间语义是EventTime,但是可以在EventTime mode下明确指定使用processingTimeStreamExecutionEnvironment env = MyUtil.getStreamEnv();StreamTableEnvironment tableEnv = MyUtil.getBlinkStreamTableEnv();env.setParallelism(1);/* 这是order中的字段,这是一个pojo类* public Long timestamp;* public Long amount;* public String currency;*///2 从数据源获取数据:StreamSingleOutputStreamOperator<Orders> orderDtlStream = env.addSource(new MyOrderSourceFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((order, timestamp) -> order.getTimestamp()));//3 由于没有外部的数据源,我们假装从stream中获取数据,这个Expression虽然好看,但是难用啊Table orders = tableEnv.fromDataStream(orderDtlStream,$("currency"), $("amount"), $("timestamp").rowtime().as("ts"));orders.printSchema();//4 获取到我们想要的统计表Table windowOrderCounts = orders.window(Slide.over(lit(10).seconds()).every(lit(5).seconds()).on($("ts")).as("sw")).groupBy($("sw"), $("currency")).select($("currency"),$("sw").end().as("windowEnd"),$("amount").count().as("counts"));//5 根据counts排序,TableAPi不支持rank、row_number以及dense_rank 排序,所以还需要使用SQL来处理//createTemporaryView("windowOrderCounts",windowOrderCounts)并不是StreamTableEnv的方法//所以我们需要调用:tableEnv.createTemporaryView("windowOrderCounts",stream)来注册表// TODO can't use this !!! tableEnv.createTemporaryView("windowOrderCounts",windowOrderCounts);//6 将windowOrderCounts转成流,并进行表的注册:windowOrderCountsDataStream<Row> stream = tableEnv.toAppendStream(windowOrderCounts, Row.class);tableEnv.createTemporaryView("windowOrderCounts", stream);/*** root*  |-- currency: STRING*  |-- windowEnd: TIMESTAMP(3)*  |-- counts: BIGINT*///7 使用Over窗口实现排序取TopNString sql = "" +"SELECT  \n" +"   windowEnd, \n" +"   currency, \n" +"   counts,  \n" +"   rn  \n" +"FROM ( \n" +"   SELECT  \n" +"\t  *, \n" +"\t  ROW_NUMBER() OVER w AS rn   \n" +"   FROM windowOrderCounts  \n" +"   WINDOW w AS (PARTITION BY windowEnd ORDER BY counts DESC)  \n" +") tmp_table  \n" +"WHERE rn <= 5";Table topN = tableEnv.sqlQuery(sql);DataStream<Tuple2<Boolean, Row>> topNStream = tableEnv.toRetractStream(topN, Row.class);topNStream.print("final top5");env.execute("sql top5");}
}

SQL代码比DataStream代码要整洁很多,内部帮我们构建了状态。

这篇关于Flink-1.12 - 之如何构建一个简单的TopN应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/477245

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu2289(简单二分)

虽说是简单二分,但是我还是wa死了  题意:已知圆台的体积,求高度 首先要知道圆台体积怎么求:设上下底的半径分别为r1,r2,高为h,V = PI*(r1*r1+r1*r2+r2*r2)*h/3 然后以h进行二分 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#includ

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#

usaco 1.3 Prime Cryptarithm(简单哈希表暴搜剪枝)

思路: 1. 用一个 hash[ ] 数组存放输入的数字,令 hash[ tmp ]=1 。 2. 一个自定义函数 check( ) ,检查各位是否为输入的数字。 3. 暴搜。第一行数从 100到999,第二行数从 10到99。 4. 剪枝。 代码: /*ID: who jayLANG: C++TASK: crypt1*/#include<stdio.h>bool h

Retrieval-based-Voice-Conversion-WebUI模型构建指南

一、模型介绍 Retrieval-based-Voice-Conversion-WebUI(简称 RVC)模型是一个基于 VITS(Variational Inference with adversarial learning for end-to-end Text-to-Speech)的简单易用的语音转换框架。 具有以下特点 简单易用:RVC 模型通过简单易用的网页界面,使得用户无需深入了

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、