0基础学习PyFlink——用户自定义函数之UDTF

2023-10-27 17:20

本文主要是介绍0基础学习PyFlink——用户自定义函数之UDTF,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大纲

  • 表值函数
  • 完整代码

在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF
在这里插入图片描述

表值函数

我们对比下UDF和UDTF

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
def udtf(f: Union[Callable, TableFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_types: Union[List[DataType], DataType, str, List[str]] = None,deterministic: bool = None,name: str = None) -> Union[UserDefinedTableFunctionWrapper, Callable]:

可以发现:

  • UDF比UDTF多了func_type和udf_type参数;
  • UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str];

特别是最后一点,可以认为是UDF和UDTF在应用上的主要区别。
换种更容易理解的说法是:UDTF可以返回任意数量的行作为输出而不是像UDF那样返回单个值(行)。
举一个例子:

word_count_data = ["A", "B", "C", "a", "C"] 

我们希望统计上面这些字符的个数,以及小写后字符的个数。这样A的个数是1,a的个数是2(因为a算一个,A小写后又算一个)。C的个数是2,g的个数是2。
这就要求统计算法在遇到大写字母时,需要统计大小写两种字母;而遇到小写字母时,只需要统计小写字母。

    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]

yield关键字返回的是generator生成器。Table API对rowFunc的调用最终会生成[“A”,“a”,“B”,“b”,“C”,“c”,“a”,“C”,“c”]。
和调用UDF不同的是,需要使用flat_map来调用UDTF。flat即为“打平”,可以生动的理解为将多维降为一维。

    tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()
+--------------------------------+
|                             f0 |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

由于我们没有指定经过处理的值所属的字段名称,于是会使用默认的f0作为字段名。我们可以使用alias来给它别名下。

    tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()
+--------------------------------+
|                     trans_word |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

最后我们就可以用这个新的表做字数统计计算

    tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
+I[A, 1]
+I[a, 2]
+I[B, 1]
+I[b, 1]
+I[C, 2]
+I[c, 2]

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "a", "C"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

这篇关于0基础学习PyFlink——用户自定义函数之UDTF的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/287169

相关文章

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

Python itertools中accumulate函数用法及使用运用详细讲解

《Pythonitertools中accumulate函数用法及使用运用详细讲解》:本文主要介绍Python的itertools库中的accumulate函数,该函数可以计算累积和或通过指定函数... 目录1.1前言:1.2定义:1.3衍生用法:1.3Leetcode的实际运用:总结 1.1前言:本文将详

TP-Link PDDNS服将于务6月30日正式停运:用户需转向第三方DDNS服务

《TP-LinkPDDNS服将于务6月30日正式停运:用户需转向第三方DDNS服务》近期,路由器制造巨头普联(TP-Link)在用户群体中引发了一系列重要变动,上个月,公司发出了一则通知,明确要求所... 路由器厂商普联(TP-Link)上个月发布公告要求所有用户必须完成实名认证后才能继续使用普联提供的 D

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

MySQL数据库函数之JSON_EXTRACT示例代码

《MySQL数据库函数之JSON_EXTRACT示例代码》:本文主要介绍MySQL数据库函数之JSON_EXTRACT的相关资料,JSON_EXTRACT()函数用于从JSON文档中提取值,支持对... 目录前言基本语法路径表达式示例示例 1: 提取简单值示例 2: 提取嵌套值示例 3: 提取数组中的值注意

CSS自定义浏览器滚动条样式完整代码

《CSS自定义浏览器滚动条样式完整代码》:本文主要介绍了如何使用CSS自定义浏览器滚动条的样式,包括隐藏滚动条的角落、设置滚动条的基本样式、轨道样式和滑块样式,并提供了完整的CSS代码示例,通过这些技巧,你可以为你的网站添加个性化的滚动条样式,从而提升用户体验,详细内容请阅读本文,希望能对你有所帮助...

Oracle数据库如何切换登录用户(system和sys)

《Oracle数据库如何切换登录用户(system和sys)》文章介绍了如何使用SQL*Plus工具登录Oracle数据库的system用户,包括打开登录入口、输入用户名和口令、以及切换到sys用户的... 目录打开登录入口登录system用户总结打开登录入口win+R打开运行对话框,输php入:sqlp

Java function函数式接口的使用方法与实例

《Javafunction函数式接口的使用方法与实例》:本文主要介绍Javafunction函数式接口的使用方法与实例,函数式接口如一支未完成的诗篇,用Lambda表达式作韵脚,将代码的机械美感... 目录引言-当代码遇见诗性一、函数式接口的生物学解构1.1 函数式接口的基因密码1.2 六大核心接口的形态学

数据库oracle用户密码过期查询及解决方案

《数据库oracle用户密码过期查询及解决方案》:本文主要介绍如何处理ORACLE数据库用户密码过期和修改密码期限的问题,包括创建用户、赋予权限、修改密码、解锁用户和设置密码期限,文中通过代码介绍... 目录前言一、创建用户、赋予权限、修改密码、解锁用户和设置期限二、查询用户密码期限和过期后的修改1.查询用