【大数据】Flink SQL 语法篇(九):Window TopN、Deduplication

2024-02-29 11:28

本文主要是介绍【大数据】Flink SQL 语法篇(九):Window TopN、Deduplication,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink SQL 语法篇(九):Window TopN、Deduplication

  • 1.Window TopN
  • 2.Deduplication
    • 2.1 案例 1(事件时间)
    • 2.2 案例 2(处理时间)

1.Window TopN

Window TopN 定义(支持 Streaming):Window TopN 是一种特殊的 TopN,它的返回结果是每一个窗口内的 N 个最小值或者最大值。

应用场景:小伙伴萌会问了,我有了 TopN 为啥还需要 Window TopN 呢?还记得上一篇博客介绍 TopN 说道的 TopN 时会出现中间结果,从而出现回撤数据的嘛?Window TopN 不会出现回撤数据,因为 Window TopN 实现是在窗口结束时输出最终结果,不会产生中间结果。而且注意,因为是窗口上面的操作,Window TopN 在窗口结束时,会自动把 State 给清除。

SQL 语法标准:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- windowing TVF
WHERE rownum <= N [AND conditions]

实际案例:取当前这一分钟的搜索关键词下的搜索热度前 10 名的词条数据。

-- 输入表字段:
-- 字段名         备注
-- key              搜索关键词
-- name             搜索热度名称
-- search_cnt       热搜消费热度(比如 3000)
-- timestamp        消费词条时间戳CREATE TABLE source_table (name BIGINT NOT NULL,search_cnt BIGINT NOT NULL,key BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH (...
);-- 输出表字段:
-- 字段名         备注
-- key              搜索关键词
-- name             搜索热度名称
-- search_cnt       热搜消费热度(比如 3000)
-- window_start     窗口开始时间戳
-- window_end       窗口结束时间戳CREATE TABLE sink_table (key BIGINT,name BIGINT,search_cnt BIGINT,window_start TIMESTAMP(3),window_end TIMESTAMP(3)
) WITH (...
);-- 处理 sql:INSERT INTO sink_table
SELECT key, name, search_cnt, window_start, window_end
FROM (SELECT key, name, search_cnt, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, keyORDER BY search_cnt desc) AS rownumFROM (SELECT window_start, window_end, key, name, max(search_cnt) as search_cnt-- window tvf 写法FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES))GROUP BY window_start, window_end, key, name)
)
WHERE rownum <= 100

输出结果:

+I[关键词1, 词条1, 8670, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条2, 6928, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条3, 1735, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条4, 7287, 2021-1-28T22:34, 2021-1-28T22:35]
...

SQL 语义:

  • 数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,将数据按照窗口聚合的 Key 通过 Hash 分发策略发送到下游窗口聚合算子。
  • 窗口聚合算子:进行窗口聚合计算,随着时间的推进,将窗口聚合结果计算完成发往下游窗口排序算子。
  • 窗口排序算子:这个算子其实也是一个窗口算子,只不过这个窗口算子为每个 Key 维护了一个 TopN 的榜单数据,接受到上游发送的窗口结果数据进行排序,随着时间的推进,窗口的结束,将排序的结果输出到下游数据汇算子。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

2.Deduplication

Deduplication 定义(支持 Batch / Streaming):Deduplication 其实就是去重,也即上文介绍到的 TopN 中 row_number = 1 的场景,但是这里有一点不一样在于其 排序字段 一定是 时间属性列,不能是其他非时间属性的普通列。在 row_number = 1 时,如果排序字段是普通列 Planner 会翻译成 TopN 算子,如果是时间属性列 Planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。

应用场景:比如上游数据发重了,或者计算 DAU 明细数据等场景,都可以使用 Deduplication 语法去做去重。

SQL 语法标准:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name)
WHERE rownum = 1
  • ROW_NUMBER():标识当前数据的排序值。
  • PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序。
  • ORDER BY time_attr [asc|desc]:标识排序规则,必须为时间戳列,当前 Flink SQL 支持处理时间、事件时间,ASC 代表保留第一行,DESC 代表保留最后一行。
  • WHERE rownum = 1:这个子句是一定需要的,而且必须为 rownum = 1

2.1 案例 1(事件时间)

某一游戏用户等级的场景,每一个用户都有一个用户等级,需要求出当前用户等级在 星星⭐,月亮🌙,太阳🌞 的用户数分别有多少。

-- 数据源:当每一个用户的等级初始化及后续变化的时候的数据,即用户等级变化明细数据。
CREATE TABLE source_table (user_id BIGINT COMMENT '用户 id',level STRING COMMENT '用户等级',row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件时间戳',WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.level.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '1000000'
);-- 数据汇:输出即每一个等级的用户数
CREATE TABLE sink_table (level STRING COMMENT '等级',uv BIGINT COMMENT '当前等级用户数',row_time timestamp(3) COMMENT '时间戳'
) WITH ('connector' = 'print'
);-- 处理逻辑:
INSERT INTO sink_table
select level, count(1) as uv, max(row_time) as row_time
from (SELECTuser_id,level,row_time,row_number() over(partition by user_id order by row_time) as rnFROM source_table
)
where rn = 1
group by level

输出结果:

+I[等级 1, 6928, 2021-1-28T22:34]
-I[等级 1, 6928, 2021-1-28T22:34]
+I[等级 1, 8670, 2021-1-28T22:34]
-I[等级 1, 8670, 2021-1-28T22:34]
+I[等级 1, 77287, 2021-1-28T22:34]
...

可以看到其有回撤数据。

其对应的 SQL 语义如下:

  • 数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 Key 通过 Hash 分发策略发送到下游去重算子。
  • Deduplication 去重算子:接受到上游数据之后,根据 order by 中的条件判断当前的这条数据和之前数据时间戳大小,以上面案例来说,如果当前数据时间戳大于之前数据时间戳,则撤回之前向下游发的中间结果,然后将最新的结果发向下游(发送策略也为 Hash,具体的 Hash 策略为按照 group by 中 Key 进行发送),如果当前数据时间戳小于之前数据时间戳,则不做操作。此算子产出的结果就是每一个用户的对应的最新等级信息。
  • Group by 聚合算子:接受到上游数据之后,根据 Group by 聚合粒度对数据进行聚合计算结果(每一个等级的用户数),发往下游数据汇算子。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

2.2 案例 2(处理时间)

最原始的日志是明细数据,需要我们根据用户 id 筛选出这个用户当天的第一条数据,发往下游,下游可以据此计算分各种维度的 DAU。

-- 数据源:原始日志明细数据
CREATE TABLE source_table (user_id BIGINT COMMENT '用户 id',name STRING COMMENT '用户姓名',server_timestamp BIGINT COMMENT '用户访问时间戳',proctime AS PROCTIME()
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.name.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '10','fields.server_timestamp.min' = '1','fields.server_timestamp.max' = '100000'
);-- 数据汇:根据 user_id 去重的第一条数据
CREATE TABLE sink_table (user_id BIGINT,name STRING,server_timestamp BIGINT
) WITH ('connector' = 'print'
);-- 处理逻辑:
INSERT INTO sink_table
select user_id,name,server_timestamp
from (SELECTuser_id,name,server_timestamp,row_number() over(partition by user_id order by proctime) as rnFROM source_table
)
where rn = 1

输出结果:

+I[1, 用户 1, 2021-1-28T22:34]
+I[2, 用户 2, 2021-1-28T22:34]
+I[3, 用户 3, 2021-1-28T22:34]
...

可以看到这个处理逻辑是没有回撤数据的。其对应的 SQL 语义如下:

  • 数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 Key 通过 Hash 分发策略发送到下游去重算子。
  • Deduplication 去重算子:处理时间语义下,如果是当前 Key 的第一条数据,则直接发往下游,如果判断(根据 State 中是否存储过该 Key)不是第一条,则直接丢弃。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

⭐ 在 Deduplication 关于是否会出现回撤流,博主总结如下:

  • Order by 事件时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还大的数据。
  • Order by 事件时间 ASC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还小的数据。
  • Order by 处理时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前处理时间还大的数据。
  • Order by 处理时间 ASC:不会出现回撤流,因为当前 Key 下 不可能会有 比当前处理时间还小的数据。

这篇关于【大数据】Flink SQL 语法篇(九):Window TopN、Deduplication的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/758612

相关文章

Mysql中InnoDB与MyISAM索引差异详解(最新整理)

《Mysql中InnoDB与MyISAM索引差异详解(最新整理)》InnoDB和MyISAM在索引实现和特性上有差异,包括聚集索引、非聚集索引、事务支持、并发控制、覆盖索引、主键约束、外键支持和物理存... 目录1. 索引类型与数据存储方式InnoDBMyISAM2. 事务与并发控制InnoDBMyISAM

MySQL 日期时间格式化函数 DATE_FORMAT() 的使用示例详解

《MySQL日期时间格式化函数DATE_FORMAT()的使用示例详解》`DATE_FORMAT()`是MySQL中用于格式化日期时间的函数,本文详细介绍了其语法、格式化字符串的含义以及常见日期... 目录一、DATE_FORMAT()语法二、格式化字符串详解三、常见日期时间格式组合四、业务场景五、总结一、

mysql线上查询之前要性能调优的技巧及示例

《mysql线上查询之前要性能调优的技巧及示例》文章介绍了查询优化的几种方法,包括使用索引、避免不必要的列和行、有效的JOIN策略、子查询和派生表的优化、查询提示和优化器提示等,这些方法可以帮助提高数... 目录避免不必要的列和行使用有效的JOIN策略使用子查询和派生表时要小心使用查询提示和优化器提示其他常

grom设置全局日志实现执行并打印sql语句

《grom设置全局日志实现执行并打印sql语句》本文主要介绍了grom设置全局日志实现执行并打印sql语句,包括设置日志级别、实现自定义Logger接口以及如何使用GORM的默认logger,通过这些... 目录gorm中的自定义日志gorm中日志的其他操作日志级别Debug自定义 Loggergorm中的

MySQL InnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据

《MySQLInnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据》mysql的ibdata文件被误删、被恶意修改,没有从库和备份数据的情况下的数据恢复,不能保证数据库所有表数据... 参考:mysql Innodb表空间卸载、迁移、装载的使用方法注意!此方法只适用于innodb_fi

mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据

《mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据》文章主要介绍了如何从.frm和.ibd文件恢复MySQLInnoDB表结构和数据,需要的朋友可以参... 目录一、恢复表结构二、恢复表数据补充方法一、恢复表结构(从 .frm 文件)方法 1:使用 mysq

mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespace id不一致处理

《mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespaceid不一致处理》文章描述了公司服务器断电后数据库故障的过程,作者通过查看错误日志、重新初始化数据目录、恢复备... 周末突然接到一位一年多没联系的妹妹打来电话,“刘哥,快来救救我”,我脑海瞬间冒出妙瓦底,电信火苲马扁.

golang获取prometheus数据(prometheus/client_golang包)

《golang获取prometheus数据(prometheus/client_golang包)》本文主要介绍了使用Go语言的prometheus/client_golang包来获取Prometheu... 目录1. 创建链接1.1 语法1.2 完整示例2. 简单查询2.1 语法2.2 完整示例3. 范围值

MySQL进阶之路索引失效的11种情况详析

《MySQL进阶之路索引失效的11种情况详析》:本文主要介绍MySQL查询优化中的11种常见情况,包括索引的使用和优化策略,通过这些策略,开发者可以显著提升查询性能,需要的朋友可以参考下... 目录前言图示1. 使用不等式操作符(!=, <, >)2. 使用 OR 连接多个条件3. 对索引字段进行计算操作4

MySQL表锁、页面锁和行锁的作用及其优缺点对比分析

《MySQL表锁、页面锁和行锁的作用及其优缺点对比分析》MySQL中的表锁、页面锁和行锁各有特点,适用于不同的场景,表锁锁定整个表,适用于批量操作和MyISAM存储引擎,页面锁锁定数据页,适用于旧版本... 目录1. 表锁(Table Lock)2. 页面锁(Page Lock)3. 行锁(Row Lock