FlinkSQL窗口实例分析

2023-12-29 06:36
文章标签 分析 实例 窗口 flinksql

本文主要是介绍FlinkSQL窗口实例分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Windowing TVFs

Windowing table-valued functions (Windowing TVFs),即窗口表值函数
注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区,即存在:group by window_start,window_end

  • TUMBLE函数采用三个必需参数,一个可选参数:

    TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到滚动窗口。
    size:是指定翻滚窗口宽度的持续时间。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

  • HOP采用 4 个必需参数和 1 个可选参数:

    HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
    slide:是指定连续跳跃窗口开始之间的持续时间的持续时间
    size:是指定跳跃窗口宽度的持续时间。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

  • CUMULATE采用 4 个必需参数和 1 个可选参数:

    CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
    step:是指定连续累积窗口末尾之间增加的窗口大小的持续时间。
    size:是指定累积窗口最大宽度的持续时间。size必须是 的整数倍step。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

滚动窗口

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

滑动窗口

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(HOP(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '60' SECOND,INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

累计窗口

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '1' HOUR,INTERVAL '24' HOURS)) --从零点开始累计
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '60' SECOND,INTERVAL '10' MINUTES))
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '1' MINUTE,INTERVAL '1' HOURS))
group by window_start,window_end,window_time,group_name

窗口聚合-多维分析

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区--实例1:整体聚合
select window_start,window_end,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end--实例2:根据字段聚合,n个维度
select window_start,window_end,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,group_name--实例3:多维分析GROUPING SETS
select window_start,window_end,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,GROUPING SETS((group_name)) --等同于 实例2
group by window_start,window_end,GROUPING SETS((group_name), ()) --等同于 实例1 union all 实例2--实例4:多维分析GROUPING SETS,多个字段
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,GROUPING SETS((group_name,batch_number),(group_name),(batch_number),())--实例5:多维分析CUBE 2^n个维度
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,CUBE(group_name) --等同于group by window_start,window_end,GROUPING SETS((group_name), ())
group by window_start,window_end,CUBE(group_name,batch_number) --等同于实例4--实例6:多维分析ROLLUP  n+1个维度
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,ROLLUP(group_name) --等同于 实例1 union all 实例2
group by window_start,window_end,ROLLUP(group_name,batch_number) --等同于GROUPING SETS((group_name,batch_number),(group_name),())

窗口topN

Window Top-N 语句的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]
CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区--方式1:窗口 Top-N 紧随窗口聚合之后
create view tmp_window as
select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '24' HOURS))
group by window_start,window_end,window_time,group_name;--计算每个翻滚 24小时窗口内pv最高的前 3 名机构(即每天PV最高的前三名)
select * from(select * ,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY cnt DESC) as rnfrom tmp_window) t
where rn <=3--计算每个机构pv最高的前 3天
select * from(select * ,ROW_NUMBER() OVER (PARTITION BY group_name ORDER BY cnt DESC) as rnfrom tmp_window) t
where rn <=3--方式2:窗口 Top-N 紧随窗口 TVF 之后
select *
from(selectwindow_start,window_end,window_time,group_name,ts,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY ts DESC) AS rnfrom TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '24' HOURS)))
where rn <=3

窗口去重

Flink使用去重的方式,就像Window Top-N查询ROW_NUMBER()的方式一样。理论上,
窗口重复数据删除是窗口 Top-N 的一种特殊情况,其中 N 为 1,并且按处理时间或事件时间排序
Window Deduplication 语句的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]
CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);select *
from(selectwindow_start,window_end,group_name,event_time,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY event_time DESC) AS rnfrom TABLE(TUMBLE(TABLE kafka_table, DESCRIPTOR(event_time), INTERVAL '24' HOURS)))
where rn =1

这篇关于FlinkSQL窗口实例分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/548638

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

C++操作符重载实例(独立函数)

C++操作符重载实例,我们把坐标值CVector的加法进行重载,计算c3=c1+c2时,也就是计算x3=x1+x2,y3=y1+y2,今天我们以独立函数的方式重载操作符+(加号),以下是C++代码: c1802.cpp源代码: D:\YcjWork\CppTour>vim c1802.cpp #include <iostream>using namespace std;/*** 以独立函数

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

实例:如何统计当前主机的连接状态和连接数

统计当前主机的连接状态和连接数 在 Linux 中,可使用 ss 命令来查看主机的网络连接状态。以下是统计当前主机连接状态和连接主机数量的具体操作。 1. 统计当前主机的连接状态 使用 ss 命令结合 grep、cut、sort 和 uniq 命令来统计当前主机的 TCP 连接状态。 ss -nta | grep -v '^State' | cut -d " " -f 1 | sort |

【软考】希尔排序算法分析

目录 1. c代码2. 运行截图3. 运行解析 1. c代码 #include <stdio.h>#include <stdlib.h> void shellSort(int data[], int n){// 划分的数组,例如8个数则为[4, 2, 1]int *delta;int k;// i控制delta的轮次int i;// 临时变量,换值int temp;in

三相直流无刷电机(BLDC)控制算法实现:BLDC有感启动算法思路分析

一枚从事路径规划算法、运动控制算法、BLDC/FOC电机控制算法、工控、物联网工程师,爱吃土豆。如有需要技术交流或者需要方案帮助、需求:以下为联系方式—V 方案1:通过霍尔传感器IO中断触发换相 1.1 整体执行思路 霍尔传感器U、V、W三相通过IO+EXIT中断的方式进行霍尔传感器数据的读取。将IO口配置为上升沿+下降沿中断触发的方式。当霍尔传感器信号发生发生信号的变化就会触发中断在中断