0基础学习PyFlink——用户自定义函数之UDAF

2023-10-28 09:28

本文主要是介绍0基础学习PyFlink——用户自定义函数之UDAF,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大纲

  • UDAF
    • 入参并非表中一行(Row)的集合
      • 计算每个人考了几门课
      • 计算每门课有几个人考试
      • 计算每个人的平均分
      • 计算每课的平均分
      • 计算每个人的最高分和最低分
    • 入参是表中一行(Row)的集合
      • 计算每个人的最高分、最低分以及所属的课程
      • 计算每课的最高分数、最低分数以及所属人
  • 完整代码
    • 入参并非表中一行(Row)的集合
    • 入参是表中一行(Row)的集合

在前面几篇文章中,我们学习了非聚合类的用户自定义函数。这节我们将介绍最简单的聚合函数UDAF。
在这里插入图片描述

UDAF

我们对比下UDAF和UDF的定义

def udaf(f: Union[Callable, AggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None, accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = "general") -> Union[UserDefinedAggregateFunctionWrapper, Callable]:
def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

可以发现:

  • udaf比udf多了一个参数accumulator_type
  • udaf比udf少了一个参数udf_type

accumulator中文是“累加器”。我们可以将其看成聚合过后(比如GroupBy)的成批数据,每批都要走一次函数。
举一个例子:我们对图中左侧的成绩单,使用人名(name)进行聚类,然后计算出最高分数。即算出每个人考出的最高分数是多少。
在这里插入图片描述
如图所示,聚合后的数据每个都会经过accumulator计算。计算出来的值的类型就是accumulator_type。这个类型的数据是中间态,它并不是最终UDAF返回的数据类型——result_type。具体这块的知识我们会在后面讲解。
为了方便讲解,我们就以上面例子来讲解其使用。先贴出准备的代码:

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctiondef calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])students_score = [("张三", 80.0, "English"),("李四", 75.0, "English"),("王五", 90.0, "English"),("赵六", 85.0, "English"),("张三", 60.0, "Math"),("李四", 95.0, "Math"),("王五", 90.0, "Math"),("赵六", 70.0, "Math"),("孙七", 60.0, "Math"),]tab_source = t_env.from_elements(students_score, row_type_tab_source )

我们在tab_source表中录入了学生的成绩信息,其中包括姓名(name)、成绩(score)和科目(class)。

入参并非表中一行(Row)的集合

计算每个人考了几门课

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的个数并返回
  3. 别名UDTF返回的列名
  4. select出数据
@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_student_exam_count = tab_source.group_by(col('name')) \.aggregate(exam_count(col('name')).alias("count")) \.select(col('name'), col('count')) tab_student_exam_count.execute().print()
+--------------------------------+----------------------+
|                           name |                count |
+--------------------------------+----------------------+
|                           孙七 |                    1 |
|                           张三 |                    2 |
|                           李四 |                    2 |
|                           王五 |                    2 |
|                           赵六 |                    2 |
+--------------------------------+----------------------+
5 rows in set

计算每门课有几个人考试

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合的个数并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_class_exam_count = tab_source.group_by(col('class')) \.aggregate(exam_count(col('class')).alias("count")) \.select(col('class'), col('count')) tab_class_exam_count.execute().print()
+--------------------------------+----------------------+
|                          class |                count |
+--------------------------------+----------------------+
|                        English |                    4 |
|                           Math |                    5 |
+--------------------------------+----------------------+
2 rows in set

计算每个人的平均分

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的均值并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_student_avg_score = tab_source.group_by(col('name')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('name'), col('avg')) tab_student_avg_score.execute().print()
+--------------------------------+--------------------------------+
|                           name |                            avg |
+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |
|                           张三 |                           70.0 |
|                           李四 |                           85.0 |
|                           王五 |                           90.0 |
|                           赵六 |                           77.5 |
+--------------------------------+--------------------------------+
5 rows in set

计算每课的平均分

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合的均值并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_class_avg_score = tab_source.group_by(col('class')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('class'), col('avg')) tab_class_avg_score.execute().print()
+--------------------------------+--------------------------------+
|                          class |                            avg |
+--------------------------------+--------------------------------+
|                        English |                           82.5 |
|                           Math |                           75.0 |
+--------------------------------+--------------------------------+
2 rows in set

计算每个人的最高分和最低分

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的最大值和最小值,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("min", DataTypes.FLOAT())]), func_type="pandas")def max_min_score(pandas_df: pd.DataFrame):return Row(pandas_df.max(), pandas_df.min())tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score(col('score')).alias("max", "min")) \.select(col('name'), col('max'), col('min')) tab_student_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+
|                           name |                            max |                            min |
+--------------------------------+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |                           60.0 |
|                           张三 |                           80.0 |                           60.0 |
|                           李四 |                           95.0 |                           75.0 |
|                           王五 |                           90.0 |                           90.0 |
|                           赵六 |                           85.0 |                           70.0 |
+--------------------------------+--------------------------------+--------------------------------+
5 rows in set

入参是表中一行(Row)的集合

计算每个人的最高分、最低分以及所属的课程

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合中分数最大值、最小值;分数最大值所在行的课程名,和分数最小值所在行的课程名,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_class(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "class"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "class"])tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score_with_class.alias("max", "class(max)", "min", "class(min)")) \.select(col('name'), col('max'), col('class(max)'), col('min'), col('class(min)')) tab_student_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                           name |                            max |                     class(max) |                            min |                     class(min) |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |                           Math |                           60.0 |                           Math |
|                           张三 |                           80.0 |                        English |                           60.0 |                           Math |
|                           李四 |                           95.0 |                           Math |                           75.0 |                        English |
|                           王五 |                           90.0 |                        English |                           90.0 |                        English |
|                           赵六 |                           85.0 |                        English |                           70.0 |                           Math |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
5 rows in set

计算每课的最高分数、最低分数以及所属人

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合中分数最大值、最小值;分数最大值所在行的人名,和分数最小值所在行的人名,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_name(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "name"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "name"])tab_class_max_min_score = tab_source.group_by(col('class')) \.aggregate(max_min_score_with_name.alias("max", "name(max)", "min", "name(min)")) \.select(col('class'), col('max'), col('name(max)'), col('min'), col('name(min)')) tab_class_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                          class |                            max |                      name(max) |                            min |                      name(min) |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                        English |                           90.0 |                           王五 |                           75.0 |                           李四 |
|                           Math |                           95.0 |                           李四 |                           60.0 |                           张三 |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
2 rows in set

完整代码

入参并非表中一行(Row)的集合

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctiondef calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])students_score = [("张三", 80.0, "English"),("李四", 75.0, "English"),("王五", 90.0, "English"),("赵六", 85.0, "English"),("张三", 60.0, "Math"),("李四", 95.0, "Math"),("王五", 90.0, "Math"),("赵六", 70.0, "Math"),("孙七", 60.0, "Math"),]tab_source = t_env.from_elements(students_score, row_type_tab_source )@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_student_exam_count = tab_source.group_by(col('name')) \.aggregate(exam_count(col('name')).alias("count")) \.select(col('name'), col('count')) tab_student_exam_count.execute().print()tab_class_exam_count = tab_source.group_by(col('class')) \.aggregate(exam_count(col('class')).alias("count")) \.select(col('class'), col('count')) tab_class_exam_count.execute().print()@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_student_avg_score = tab_source.group_by(col('name')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('name'), col('avg')) tab_student_avg_score.execute().print()tab_class_avg_score = tab_source.group_by(col('class')) \.aggregate(avg_score(col('score')).alias("avg")) \.select(col('class'), col('avg')) tab_class_avg_score.execute().print()@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("min", DataTypes.FLOAT())]), func_type="pandas")def max_min_score(pandas_df: pd.DataFrame):return Row(pandas_df.max(), pandas_df.min())tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score(col('score')).alias("max", "min")) \.select(col('name'), col('max'), col('min')) tab_student_max_min_score.execute().print()if __name__ == '__main__':calc()

入参是表中一行(Row)的集合

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctiondef calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])students_score = [("张三", 80.0, "English"),("李四", 75.0, "English"),("王五", 90.0, "English"),("赵六", 85.0, "English"),("张三", 60.0, "Math"),("李四", 95.0, "Math"),("王五", 90.0, "Math"),("赵六", 70.0, "Math"),("孙七", 60.0, "Math"),]tab_source = t_env.from_elements(students_score, row_type_tab_source )@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_class(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "class"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "class"])tab_student_max_min_score = tab_source.group_by(col('name')) \.aggregate(max_min_score_with_class.alias("max", "class(max)", "min", "class(min)")) \.select(col('name'), col('max'), col('class(max)'), col('min'), col('class(min)')) tab_student_max_min_score.execute().print()@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")def max_min_score_with_name(pandas_df: pd.DataFrame):return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "name"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "name"])tab_class_max_min_score = tab_source.group_by(col('class')) \.aggregate(max_min_score_with_name.alias("max", "name(max)", "min", "name(min)")) \.select(col('class'), col('max'), col('name(max)'), col('min'), col('name(min)')) tab_class_max_min_score.execute().print()if __name__ == '__main__':calc()

这篇关于0基础学习PyFlink——用户自定义函数之UDAF的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/292271

相关文章

SpringBoot全局异常拦截与自定义错误页面实现过程解读

《SpringBoot全局异常拦截与自定义错误页面实现过程解读》本文介绍了SpringBoot中全局异常拦截与自定义错误页面的实现方法,包括异常的分类、SpringBoot默认异常处理机制、全局异常拦... 目录一、引言二、Spring Boot异常处理基础2.1 异常的分类2.2 Spring Boot默

Mybatis对MySQL if 函数的不支持问题解读

《Mybatis对MySQLif函数的不支持问题解读》接手项目后,为了实现多租户功能,引入了Mybatis-plus,发现之前运行正常的SQL语句报错,原因是Mybatis不支持MySQL的if函... 目录MyBATis对mysql if 函数的不支持问题描述经过查询网上搜索资料找到原因解决方案总结Myb

自定义注解SpringBoot防重复提交AOP方法详解

《自定义注解SpringBoot防重复提交AOP方法详解》该文章描述了一个防止重复提交的流程,通过HttpServletRequest对象获取请求信息,生成唯一标识,使用Redis分布式锁判断请求是否... 目录防重复提交流程引入依赖properties配置自定义注解切面Redis工具类controller

Python容器转换与共有函数举例详解

《Python容器转换与共有函数举例详解》Python容器是Python编程语言中非常基础且重要的概念,它们提供了数据的存储和组织方式,下面:本文主要介绍Python容器转换与共有函数的相关资料,... 目录python容器转换与共有函数详解一、容器类型概览二、容器类型转换1. 基本容器转换2. 高级转换示

Java利用Spire.Doc for Java实现在模板的基础上创建Word文档

《Java利用Spire.DocforJava实现在模板的基础上创建Word文档》在日常开发中,我们经常需要根据特定数据动态生成Word文档,本文将深入探讨如何利用强大的Java库Spire.Do... 目录1. Spire.Doc for Java 库介绍与安装特点与优势Maven 依赖配置2. 通过替换

pandas使用apply函数给表格同时添加多列

《pandas使用apply函数给表格同时添加多列》本文介绍了利用Pandas的apply函数在DataFrame中同时添加多列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、Pandas使用apply函数给表格同时添加多列二、应用示例一、Pandas使用apply函

Python中Namespace()函数详解

《Python中Namespace()函数详解》Namespace是argparse模块提供的一个类,用于创建命名空间对象,它允许通过点操作符访问数据,比字典更易读,在深度学习项目中常用于加载配置、命... 目录1. 为什么使用 Namespace?2. Namespace 的本质是什么?3. Namesp

kafka自定义分区器使用详解

《kafka自定义分区器使用详解》本文介绍了如何根据企业需求自定义Kafka分区器,只需实现Partitioner接口并重写partition()方法,示例中,包含cuihaida的数据发送到0号分区... 目录kafka自定义分区器假设现在有一个需求使用分区器的方法总结kafka自定义分区器根据企业需求

JavaScript装饰器从基础到实战教程

《JavaScript装饰器从基础到实战教程》装饰器是js中一种声明式语法特性,用于在不修改原始代码的情况下,动态扩展类、方法、属性或参数的行为,本文将从基础概念入手,逐步讲解装饰器的类型、用法、进阶... 目录一、装饰器基础概念1.1 什么是装饰器?1.2 装饰器的语法1.3 装饰器的执行时机二、装饰器的

Java JAR 启动内存参数配置指南(从基础设置到性能优化)

《JavaJAR启动内存参数配置指南(从基础设置到性能优化)》在启动Java可执行JAR文件时,合理配置JVM内存参数是保障应用稳定性和性能的关键,本文将系统讲解如何通过命令行参数、环境变量等方式... 目录一、核心内存参数详解1.1 堆内存配置1.2 元空间配置(MetASPace)1.3 线程栈配置1.