0基础学习PyFlink——用户自定义函数之UDAF

2023-10-28 09:28

本文主要是介绍0基础学习PyFlink——用户自定义函数之UDAF,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大纲

  • UDAF
    • 入参并非表中一行(Row)的集合
      • 计算每个人考了几门课
      • 计算每门课有几个人考试
      • 计算每个人的平均分
      • 计算每课的平均分
      • 计算每个人的最高分和最低分
    • 入参是表中一行(Row)的集合
      • 计算每个人的最高分、最低分以及所属的课程
      • 计算每课的最高分数、最低分数以及所属人
  • 完整代码
    • 入参并非表中一行(Row)的集合
    • 入参是表中一行(Row)的集合

在前面几篇文章中,我们学习了非聚合类的用户自定义函数。这节我们将介绍最简单的聚合函数UDAF。
在这里插入图片描述

UDAF

我们对比下UDAF和UDF的定义

def udaf(f: Union[Callable, AggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None, accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = "general") -> Union[UserDefinedAggregateFunctionWrapper, Callable]:
def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

可以发现:

  • udaf比udf多了一个参数accumulator_type
  • udaf比udf少了一个参数udf_type

accumulator中文是“累加器”。我们可以将其看成聚合过后(比如GroupBy)的成批数据,每批都要走一次函数。
举一个例子:我们对图中左侧的成绩单,使用人名(name)进行聚类,然后计算出最高分数。即算出每个人考出的最高分数是多少。
在这里插入图片描述
如图所示,聚合后的数据每个都会经过accumulator计算。计算出来的值的类型就是accumulator_type。这个类型的数据是中间态,它并不是最终UDAF返回的数据类型——result_type。具体这块的知识我们会在后面讲解。
为了方便讲解,我们就以上面例子来讲解其使用。先贴出准备的代码:

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctiondef calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])students_score = [("张三", 80.0, "English"),("李四", 75.0, "English"),("王五", 90.0, "English"),("赵六", 85.0, "English"),("张三", 60.0, "Math"),("李四", 95.0, "Math"),("王五", 90.0, "Math"),("赵六", 70.0, "Math"),("孙七", 60.0, "Math"),]tab_source = t_env.from_elements(students_score, row_type_tab_source )

我们在tab_source表中录入了学生的成绩信息,其中包括姓名(name)、成绩(score)和科目(class)。

入参并非表中一行(Row)的集合

计算每个人考了几门课

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的个数并返回
  3. 别名UDTF返回的列名
  4. select出数据
@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_student_exam_count = tab_source.group_by(col('name')) \.aggregate(exam_count(col('name')).alias("count")) \.select(col('name'), col('count')) tab_student_exam_count.execute().print()
+--------------------------------+----------------------+
|                           name |                count |
+--------------------------------+----------------------+
|                           孙七 |                    1 |
|                           张三 |                    2 |
|                           李四 |                    2 |
|                           王五 |                    2 |
|                           赵六 |                    2 |
+--------------------------------+----------------------+
5 rows in set

计算每门课有几个人考试

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合的个数并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_class_exam_count = tab_source.group_by(col('class')) \.aggregate(exam_count(col('class')).alias("count")) \.select(col('class'), col('count')) tab_class_exam_count.execute().print()
+--------------------------------+----------------------+
|                          class |                count |
+--------------------------------+----------------------+
|                        English |                    4 |
|                           Math |                    5 |
+--------------------------------+----------------------+
2 rows in set

计算每个人的平均分

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的均值并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_student_avg_score = tab_source.group_by(col('name')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('name'), col('avg')) tab_student_avg_score.execute().print()
+--------------------------------+--------------------------------+
|                           name |                            avg |
+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |
|                           张三 |                           70.0 |
|                           李四 |                           85.0 |
|                           王五 |                           90.0 |
|                           赵六 |                           77.5 |
+--------------------------------+--------------------------------+
5 rows in set

计算每课的平均分

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合的均值并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_class_avg_score = tab_source.group_by(col('class')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('class'), col('avg')) tab_class_avg_score.execute().print()
+--------------------------------+--------------------------------+
|                          class |                            avg |
+--------------------------------+--------------------------------+
|                        English |                           82.5 |
|                           Math |                           75.0 |
+--------------------------------+--------------------------------+
2 rows in set

计算每个人的最高分和最低分

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的最大值和最小值,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("min", DataTypes.FLOAT())]), func_type="pandas")def max_min_score(pandas_df: pd.DataFrame):return Row(pandas_df.max(), pandas_df.min())tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score(col('score')).alias("max", "min")) \.select(col('name'), col('max'), col('min')) tab_student_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+
|                           name |                            max |                            min |
+--------------------------------+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |                           60.0 |
|                           张三 |                           80.0 |                           60.0 |
|                           李四 |                           95.0 |                           75.0 |
|                           王五 |                           90.0 |                           90.0 |
|                           赵六 |                           85.0 |                           70.0 |
+--------------------------------+--------------------------------+--------------------------------+
5 rows in set

入参是表中一行(Row)的集合

计算每个人的最高分、最低分以及所属的课程

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合中分数最大值、最小值;分数最大值所在行的课程名,和分数最小值所在行的课程名,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_class(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "class"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "class"])tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score_with_class.alias("max", "class(max)", "min", "class(min)")) \.select(col('name'), col('max'), col('class(max)'), col('min'), col('class(min)')) tab_student_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                           name |                            max |                     class(max) |                            min |                     class(min) |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |                           Math |                           60.0 |                           Math |
|                           张三 |                           80.0 |                        English |                           60.0 |                           Math |
|                           李四 |                           95.0 |                           Math |                           75.0 |                        English |
|                           王五 |                           90.0 |                        English |                           90.0 |                        English |
|                           赵六 |                           85.0 |                        English |                           70.0 |                           Math |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
5 rows in set

计算每课的最高分数、最低分数以及所属人

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合中分数最大值、最小值;分数最大值所在行的人名,和分数最小值所在行的人名,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_name(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "name"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "name"])tab_class_max_min_score = tab_source.group_by(col('class')) \.aggregate(max_min_score_with_name.alias("max", "name(max)", "min", "name(min)")) \.select(col('class'), col('max'), col('name(max)'), col('min'), col('name(min)')) tab_class_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                          class |                            max |                      name(max) |                            min |                      name(min) |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                        English |                           90.0 |                           王五 |                           75.0 |                           李四 |
|                           Math |                           95.0 |                           李四 |                           60.0 |                           张三 |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
2 rows in set

完整代码

入参并非表中一行(Row)的集合

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctiondef calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])students_score = [("张三", 80.0, "English"),("李四", 75.0, "English"),("王五", 90.0, "English"),("赵六", 85.0, "English"),("张三", 60.0, "Math"),("李四", 95.0, "Math"),("王五", 90.0, "Math"),("赵六", 70.0, "Math"),("孙七", 60.0, "Math"),]tab_source = t_env.from_elements(students_score, row_type_tab_source )@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_student_exam_count = tab_source.group_by(col('name')) \.aggregate(exam_count(col('name')).alias("count")) \.select(col('name'), col('count')) tab_student_exam_count.execute().print()tab_class_exam_count = tab_source.group_by(col('class')) \.aggregate(exam_count(col('class')).alias("count")) \.select(col('class'), col('count')) tab_class_exam_count.execute().print()@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_student_avg_score = tab_source.group_by(col('name')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('name'), col('avg')) tab_student_avg_score.execute().print()tab_class_avg_score = tab_source.group_by(col('class')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('class'), col('avg')) tab_class_avg_score.execute().print()@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("min", DataTypes.FLOAT())]), func_type="pandas")def max_min_score(pandas_df: pd.DataFrame):return Row(pandas_df.max(), pandas_df.min())tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score(col('score')).alias("max", "min")) \.select(col('name'), col('max'), col('min')) tab_student_max_min_score.execute().print()if __name__ == '__main__':calc()

入参是表中一行(Row)的集合

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctiondef calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])students_score = [("张三", 80.0, "English"),("李四", 75.0, "English"),("王五", 90.0, "English"),("赵六", 85.0, "English"),("张三", 60.0, "Math"),("李四", 95.0, "Math"),("王五", 90.0, "Math"),("赵六", 70.0, "Math"),("孙七", 60.0, "Math"),]tab_source = t_env.from_elements(students_score, row_type_tab_source )@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_class(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "class"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "class"])tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score_with_class.alias("max", "class(max)", "min", "class(min)")) \.select(col('name'), col('max'), col('class(max)'), col('min'), col('class(min)')) tab_student_max_min_score.execute().print()@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_name(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "name"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "name"])tab_class_max_min_score = tab_source.group_by(col('class')) \.aggregate(max_min_score_with_name.alias("max", "name(max)", "min", "name(min)")) \.select(col('class'), col('max'), col('name(max)'), col('min'), col('name(min)')) tab_class_max_min_score.execute().print()if __name__ == '__main__':calc()

这篇关于0基础学习PyFlink——用户自定义函数之UDAF的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/292271

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

hdu1171(母函数或多重背包)

题意:把物品分成两份,使得价值最接近 可以用背包,或者是母函数来解,母函数(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v) 其中指数为价值,每一项的数目为(该物品数+1)个 代码如下: #include<iostream>#include<algorithm>

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

【Linux 从基础到进阶】Ansible自动化运维工具使用

Ansible自动化运维工具使用 Ansible 是一款开源的自动化运维工具,采用无代理架构(agentless),基于 SSH 连接进行管理,具有简单易用、灵活强大、可扩展性高等特点。它广泛用于服务器管理、应用部署、配置管理等任务。本文将介绍 Ansible 的安装、基本使用方法及一些实际运维场景中的应用,旨在帮助运维人员快速上手并熟练运用 Ansible。 1. Ansible的核心概念