pyflink的窗口

2024-09-06 20:44
文章标签 窗口 pyflink

本文主要是介绍pyflink的窗口,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

PyFlink 中的窗口操作教程

在流处理应用中,窗口(Window)是一个非常重要的概念,它用于对无界的数据流进行切分,使得我们可以对流中的数据执行聚合、计数、排序等操作。PyFlink 提供了丰富的窗口类型和操作,可以对流数据进行时间和计数等维度的切片,进行实时的数据处理。

在本教程中,我们将介绍 PyFlink 中的几种常见窗口类型,并展示如何使用窗口进行数据处理。

1. 安装 PyFlink

在开始之前,确保你已经安装了 PyFlink:

pip install apache-flink

2. 什么是窗口?

窗口(Window)是 Flink 处理无界数据流的核心技术,它将无限的数据流划分为有限的块,这样可以对这些块进行聚合、计数等操作。常见的窗口类型包括:

  • 滚动窗口(Tumbling Window):将数据流划分为不重叠的固定长度时间段。
  • 滑动窗口(Sliding Window):将数据流划分为固定长度的时间段,这些时间段可以相互重叠。
  • 会话窗口(Session Window):基于数据的活动时间来划分数据流,窗口之间有间隔(即活动的间歇)。
  • 计数窗口(Count Window):基于事件的数量而非时间划分窗口。

3. PyFlink 中的窗口操作

在 PyFlink 中,窗口通常和时间、事件一起使用,通过对数据流应用窗口函数来执行聚合操作。以下是几种常见的窗口操作。

4. 流环境设置

在 PyFlink 中,窗口操作通常在流模式下进行。首先,我们需要设置流环境并定义一些基础数据流。

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings# 创建流执行环境
env = StreamExecutionEnvironment.get_execution_environment()# 创建 Table 环境
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)

5. 时间特性设置

时间特性分为两种类型:事件时间(Event Time)和 处理时间(Processing Time)。事件时间基于事件生成时的时间,而处理时间基于 Flink 系统处理事件的时间。

设置事件时间(Event Time)

事件时间需要通过在数据流中添加时间戳和水印(Watermark)来支持。

# 设置事件时间属性
t_env.get_config().set_local_timezone('UTC')  # 使用 UTC 时区

6. 创建窗口

6.1 滚动窗口(Tumbling Window)

滚动窗口会将数据流划分为固定长度的时间段,并且这些时间段互不重叠。

from pyflink.table.window import Tumble
from pyflink.table import expressions as expr# 创建示例表
t_env.execute_sql("""CREATE TEMPORARY TABLE source_table (user_id STRING,item STRING,amount DOUBLE,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) WITH ('connector' = 'datagen')
""")# 定义滚动窗口,窗口大小为10分钟
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()
6.2 滑动窗口(Sliding Window)

滑动窗口将数据划分为固定长度的时间段,这些时间段可以相互重叠。窗口的滑动步长定义了相邻窗口的开始时间。

from pyflink.table.window import Slide# 定义滑动窗口,窗口大小为10分钟,滑动步长为5分钟
result_table = t_env.from_path("source_table") \.window(Slide.over(expr.lit(10).minutes).every(expr.lit(5).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()
6.3 会话窗口(Session Window)

会话窗口基于数据的活动时间和不活动时间来划分数据流。如果一段时间内没有新的事件到达,窗口会结束。

from pyflink.table.window import Session# 定义会话窗口,不活动间隔为30分钟
result_table = t_env.from_path("source_table") \.window(Session.with_gap(expr.lit(30).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()
6.4 计数窗口(Count Window)

计数窗口是基于记录的数量来划分窗口,而不是基于时间。例如,每 1000 条记录形成一个窗口。

from pyflink.table.window import Tumble# 定义计数窗口,每 1000 条记录形成一个窗口
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(1000).rows).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()

7. 自定义窗口聚合函数

除了使用内置的窗口聚合函数(如 SUM, COUNT 等),你还可以自定义窗口聚合逻辑。

自定义聚合函数
from pyflink.table.udf import AggregateFunction, udafclass AvgAggregateFunction(AggregateFunction):def get_value(self, accumulator):return accumulator[0] / accumulator[1] if accumulator[1] > 0 else 0def create_accumulator(self):return [0, 0]  # sum, countdef accumulate(self, accumulator, value):accumulator[0] += valueaccumulator[1] += 1# 注册自定义聚合函数
avg_udaf = udaf(AvgAggregateFunction(), result_type='DOUBLE')# 使用自定义聚合函数计算窗口内平均值
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), avg_udaf(expr.col("amount")).alias("average_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()

8. 完整示例

以下是一个包含窗口操作的完整 PyFlink 示例:

from pyflink.table.window import Tumble
from pyflink.table import expressions as expr
from pyflink.table.udf import AggregateFunction, udaf
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings# 设置流执行环境
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)# 创建示例表
t_env.execute_sql("""CREATE TEMPORARY TABLE source_table (user_id STRING,item STRING,amount DOUBLE,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) WITH ('connector' = 'datagen')
""")# 定义滚动窗口和自定义聚合函数
class AvgAggregateFunction(AggregateFunction):def get_value(self, accumulator):return accumulator[0] / accumulator[1] if accumulator[1] > 0 else 0def create_accumulator(self):return [0, 0]def accumulate(self, accumulator, value):accumulator[0] += valueaccumulator[1] += 1avg_udaf = udaf(AvgAggregateFunction(), result_type='DOUBLE')# 使用滚动窗口和自定义聚合函数
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), avg_udaf(expr.col("amount")).alias("average_spent"), expr.col("w").end.alias("window_end"))# 输出结果
result_table.execute().print()

9. 总结

在 PyFlink 中,窗口是流处理的核心概念之一,允许你对无界数据流进行聚合、计算和操作。Flink 提供了丰富的窗口类型,包括滚动窗口、滑动窗口、会话窗口和计数窗口,以满足不同场景下的需求。通过本教程,你可以学习如何在 PyFlink 中使用窗口对流数据进行处理,并通过自定义函数来实现更复杂的计算逻辑。

这篇关于pyflink的窗口的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143072

相关文章

bat脚本启动git bash窗口,并执行命令方式

《bat脚本启动gitbash窗口,并执行命令方式》本文介绍了如何在Windows服务器上使用cmd启动jar包时出现乱码的问题,并提供了解决方法——使用GitBash窗口启动并设置编码,通过编写s... 目录一、简介二、使用说明2.1 start.BAT脚本2.2 参数说明2.3 效果总结一、简介某些情

基于Redis有序集合实现滑动窗口限流的步骤

《基于Redis有序集合实现滑动窗口限流的步骤》滑动窗口算法是一种基于时间窗口的限流算法,通过动态地滑动窗口,可以动态调整限流的速率,Redis有序集合可以用来实现滑动窗口限流,本文介绍基于Redis... 滑动窗口算法是一种基于时间窗口的限流算法,它将时间划分为若干个固定大小的窗口,每个窗口内记录了该时间

使用JS/Jquery获得父窗口的几个方法(笔记)

<pre name="code" class="javascript">取父窗口的元素方法:$(selector, window.parent.document);那么你取父窗口的父窗口的元素就可以用:$(selector, window.parent.parent.document);如题: $(selector, window.top.document);//获得顶级窗口里面的元素 $(

专题二_滑动窗口_算法专题详细总结

目录 滑动窗口,引入: 滑动窗口,本质:就是同向双指针; 1.⻓度最⼩的⼦数组(medium) 1.解析:给我们一个数组nums,要我们找出最小子数组的和==target,首先想到的就是暴力解法 1)暴力: 2)优化,滑动窗口: 1.进窗口 2.出窗口 3.更新值 2.⽆重复字符的最⻓⼦串(medium) 1)仍然是暴力解法: 2)优化: 进窗口:hash[s[rig

hot100刷题第1-9题,三个专题哈希,双指针,滑动窗口

求满足条件的子数组,一般是前缀和、滑动窗口,经常结合哈希表; 区间操作元素,一般是前缀和、差分数组 数组有序,更大概率会用到二分搜索 目前已经掌握一些基本套路,重零刷起leetcode hot 100, 套路题按套路来,非套路题适当参考gpt解法。 一、梦开始的地方, 两数之和 class Solution:#注意要返回的是数组下标def twoSum(self, nums: Lis

主窗口的设计与开发(二)

主窗口的设计与开发(二) 前言         在上一集当中,我们完成了主窗口的初始化,主窗口包括了左中右三个区域。我们还完成了对左窗口的初始化,左窗口包括了用户头像、会话标签页按钮、好友标签页按钮以及好友申请标签页按钮。对于切换每个标签页,我们还做了初始化信号槽的内容。最后我们将整个MainWidget类设置为单例模式。         那么这一集我们将继续完成主窗口的设计与开发,这一集我

QtC++截图支持窗口获取

介绍 在截图工具中你会发现,接触到窗口后会自动圈出目标窗口,个别强大一点的还能进行元素识别可以自动圈出元素,那么今天简单分析一下QTc++如何获取窗口并圈出当前鼠标下的窗口。 介绍1.如何获取所有窗口2.比较函数3.实现窗口判断 结尾 1.如何获取所有窗口 1.我们需要调用windows接口EnumWindowsProc回调函数来获取所有顶级窗口,需要包含windows.

运行.bat文件,如何在Dos窗口里面得到该文件的路径

把java代码打包成.jar文件,编写一个.bat文件,执行该文件,编译.jar包;(.bat,.jar放在同一个文件夹下) 运行.bat文件,如何在Dos窗口里面得到该文件的路径,并运行.jar文件: echo 当前盘符:%~d0 echo 当前路径:%cd% echo 当前执行命令行:%0 echo 当前bat文件路径:%~dp0 echo 当前bat文件短路径:%~sdp0 nc

类codepen的实现可拖拽窗口demo

首先说下思想 flex或者其他布局方式,实现左右分割布局,主盒子宽度100%,左右布局中包含一个分割条(可在布局容器中,也可以单独定义)为分隔条绑定鼠标点击事件,为document绑定鼠标移动事件和鼠标放开事件,通过监听鼠标移动事件和上一个状态保存下来的鼠标位置作对比,判断当前鼠标移动方向(往左还是往右)然后计算当前鼠标位置和鼠标点击位置的距离,来计算左右容器的变化,然后通过dom的方式设置宽度

【leetcode详解】考试的最大困扰度(滑动窗口典例)

实战总结: sum += answerKey[right] == c; 经典操作,将判断语句转化为0, 1接收来计数//大问题分解: 对'T'还是'F'做修改, 传参为c//滑动窗口: 遍历, 维护left& right指向 及 c的个数, 更新不知从何下手写代码时:考虑先写好第一次的,然后以此为基础补充代码以适后续情况 题面: 解题感受: 思路总体好想, 实现略有挑战。 思路分析: