Hive 编程专题十 : 泛型函数

2023-11-21 08:50
文章标签 编程 hive 专题 型函数

本文主要是介绍Hive 编程专题十 : 泛型函数,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文的主题:

1 - 泛型函数 (Generic Function) 存在的必要性
2 - 一则泛型函数的简例
3 - 全局函数

1 - 泛型函数 (Generic Function) 存在的必要性

泛型函数 (Generic Function) 存在的意义,解决了运行时参数类型多变,而标准函数无法一一匹配的情况。以判断某变量是否为 Null 而赋予不同默认值为例。程序不可能做到对每种类型都做这样的判断,这样将需要重写很多方法,而泛型则很好解决了该问题

2 - 一则泛型函数的简例

package hive.function.generic;import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector ;@Description(name	=	"nvl",value	=	"_FUNC_(value,default_value) - Returns default value " +" if value is null else returns value",extended=	"Example: \n" +">SELECT _FUNC_(null,'bla') FROM src LIMIT 1;\n")public class genericNvl extends GenericUDF {private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver ;private ObjectInspector[] argumentOIs ;@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException{argumentOIs = arguments ;if (arguments.length !=2 ) {throw new UDFArgumentLengthException("The operator 'NVL' accepts 2 arguments.");}returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);if(!(returnOIResolver.update(arguments[0])&&returnOIResolver.update(arguments[1]))) {throw new UDFArgumentTypeException(2,"The 1st and 2nd args of function NLV should have the same type,"+" but they are different: \"" + arguments[0].getTypeName() + " \" and \"" + arguments[1].getTypeName() + "\"");}return returnOIResolver.get();}@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException{Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(),argumentOIs[0]);if (retVal == null) {retVal	=	returnOIResolver.convertIfNecessary(arguments[1].get(), argumentOIs[1]);}return retVal ;}@Override public String getDisplayString(String[] children) {StringBuilder sb = new StringBuilder();sb.append("if ");sb.append(children[0]);sb.append(" is null ");sb.append(" returns ");sb.append(children[1]);return sb.toString();}
}

returnOIResolver.update 起到的作用是判断两个参数是否能转换

/*** Update returnObjectInspector and valueInspectorsAreTheSame based on the* ObjectInspector seen.** @return false if there is a type mismatch*/private boolean update(ObjectInspector oi, boolean isUnionAll) throws UDFArgumentTypeException {if (oi instanceof VoidObjectInspector) {return true;}if (returnObjectInspector == null) {// The first argument, just set the return to be the standard// writable version of this OI.returnObjectInspector = ObjectInspectorUtils.getStandardObjectInspector(oi,ObjectInspectorCopyOption.WRITABLE);return true;}if (returnObjectInspector == oi) {// The new ObjectInspector is the same as the old one, directly return// truereturn true;}TypeInfo oiTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(oi);TypeInfo rTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(returnObjectInspector);if (oiTypeInfo == rTypeInfo) {// Convert everything to writable, if types of arguments are the same,// but ObjectInspectors are different.returnObjectInspector = ObjectInspectorUtils.getStandardObjectInspector(returnObjectInspector,ObjectInspectorCopyOption.WRITABLE);return true;}if (!allowTypeConversion) {return false;}// Types are different, we need to check whether we can convert them to// a common base class or not.TypeInfo commonTypeInfo = null;if (isUnionAll) {commonTypeInfo = FunctionRegistry.getCommonClassForUnionAll(rTypeInfo, oiTypeInfo);} else {commonTypeInfo = FunctionRegistry.getCommonClass(oiTypeInfo,rTypeInfo);}if (commonTypeInfo == null) {return false;}commonTypeInfo = updateCommonTypeForDecimal(commonTypeInfo, oiTypeInfo, rTypeInfo);returnObjectInspector = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(commonTypeInfo);return true;}

除了 initialize 方法,GenericUDF 子类还需要重写其他两个方法,即 evaluate 和 getDisplayString.

3 - 全局函数

在添加临时自定义函数时,引用 Jar 包中定义的类名,而不是包名,如下:

hive> add jar /home/SparkAdmin/HiveFunctions/Nvl.jar> ;
Added [/home/SparkAdmin/HiveFunctions/Nvl.jar] to class path
Added resources: [/home/SparkAdmin/HiveFunctions/Nvl.jar]
hive> create temporary function NullReplace as 'hive.function.generic.Nvl' ;
FAILED: Class hive.function.generic.Nvl not found
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask
hive> create temporary function NullReplace as 'hive.function.generic.genericNvl' ;
OK
3.1 -使用泛型函数:

初始化带 Null 值的数据:

hive> insert into default.employee(name,salary,subordinates,deductions,address)> select null,null,subordinates,deductions,address from default.employee > limit 10 ;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = SparkAdmin_20181124142056_7af103f3-95de-4d42-9b64-77337ad06734
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2018-11-24 14:20:59,351 Stage-1 map = 100%,  reduce = 0%
2018-11-24 14:21:00,368 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_local362424371_0001
Loading data to table default.employee
MapReduce Jobs Launched: 
Stage-Stage-1:  HDFS Read: 50910 HDFS Write: 298 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 4.982 seconds
hive> select * from default.employee ;
OK
ali	320.0	["ali","acai","ayun"]	{"ali":1,"acai":2,"ayun":7}	{"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
liton	345.0	["liton","acai","ayun"]	{"liton":1,"acai":2,"ayun":7}	{"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
tencent	543.0	["tencent","acai","ayun"]	{"tencent":1,"acai":2,"ayun":7}	{"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
NULL	NULL	["tencent","acai","ayun"]	{"tencent":1,"acai":2,"ayun":7}	{"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
NULL	NULL	["liton","acai","ayun"]	{"liton":1,"acai":2,"ayun":7}	{"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
NULL	NULL	["ali","acai","ayun"]	{"ali":1,"acai":2,"ayun":7}	{"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
Time taken: 0.115 seconds, Fetched: 6 row(s)
hive>

null 替换:

hive> select nullreplace(salary,0) as salary from default.employee ;
OK
320.0
345.0
543.0
0.0
0.0
0.0
Time taken: 0.109 seconds, Fetched: 6 row(s)

即使 2 个参数明面上不是同一个类型,但最终还是相互转换了:

hive> select nullreplace(salary,"end") as salary from default.employee ;
OK
320.0
345.0
543.0
end
end
end
Time taken: 0.1 seconds, Fetched: 6 row(s)
hive>

但如果不能像数字与字符之间进行隐式转换,就会报错了:

hive> select nullreplace(salary,array("em","bm","fm")) as salary from default.employee ;
FAILED: NullPointerException null
3.2 - 函数全局可用

自定义函数的调用,是临时的。当关闭当前会话或重开会话时,函数就不能被调用了。

hive> select nullreplace(name,"end") as name from default.name ;
FAILED: SemanticException [Error 10011]: Invalid function nullreplace

实现所有会话都能调用自定义函数,简单直接的方法就是配置 ~/.hiverc (runtime configuration) 文件,在会话开始就定义好要用的自定义函数。

修改 ~/.hiverc 文件:
[SparkAdmin@centos00 bin]$ vi ~/.hiverc
add jar /home/SparkAdmin/HiveFunctions/Nvl.jar;
create temporary function NullReplace as 'hive.function.generic.genericNvl';
~
Create Function 建立全局函数

.hiverc 配置方式放在大型的项目中,复杂了应用,所以 Hive 新版中直接使用 create function 就可以将自定义函数的生存周期放到全局,本质上是将定义的函数存储在了 metaData store 里面

hive> create function nullreplace2 as 'hive.function.generic.genericNvl' using jar '/home/SparkAdmin/HiveFunctions/Nvl.jar' ;
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask. Hive warehouse is non-local, but /home/SparkAdmin/HiveFunctions/Nvl.jar specifies file on local filesystem. Resources on non-local warehouse should specify a non-local scheme/path
hive>

解决方法:

[SparkAdmin@centos00 conf]$ hdfs dfs -copyFromLocal /home/SparkAdmin/HiveFunctions/Nvl.jar /user/hive/warehouse
[SparkAdmin@centos00 conf]$ hdfs dfs -ls /user/hive/warehouse
Found 5 items
-rw-r--r--   3 SparkAdmin supergroup       1798 2018-11-24 20:41 /user/hive/warehouse/Nvl.jar
drwxr-xr-x   - SparkAdmin supergroup          0 2018-11-05 22:04 /user/hive/warehouse/account
drwxr-xr-x   - SparkAdmin supergroup          0 2018-11-09 23:03 /user/hive/warehouse/crm.db
drwxr-xr-x   - SparkAdmin supergroup          0 2018-11-24 14:21 /user/hive/warehouse/employee
drwxr-xr-x   - SparkAdmin supergroup          0 2018-10-31 16:17 /user/hive/warehouse/student
[SparkAdmin@centos00 conf]$

接着创建函数:

hive> create function nullreplace2 as 'hive.function.generic.genericNvl' using jar 'hdfs:///user/hive/warehouse/Nvl.jar' ;
Added [/tmp/06ebd574-bcbc-4146-bc39-f195b8d0c9c2_resources/Nvl.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]
OK
Time taken: 0.814 seconds
hive> select nullreplace2(name,"end") as name from default.employee ;
OK
ali
liton
tencent
end
end
end
Time taken: 1.93 seconds, Fetched: 6 row(s)
hive>

如果整个开发组中,有部分开发人员使用 hive 命令行,而另外部分开发使用了 oracle sql developer,如何让自定义函数在全组开发人员中共享呢?

答案是创建全局函数。

就如前面从 hdfs 的 Jar 包中调用函数一样,在 oracle sql developer 中创建一个全局函数:

create function nullReplace_osd as 'hive.function.generic.genericNvl' using jar 'hdfs:///user/hive/warehouse/Nvl.jar'

打开 Hive 命令行,调用 oracle sql developer 中创建的函数 nullReplace_osd 即可:

hive> select default.nullReplace_osd(name,"end") as name from default.employee ;
Added [/tmp/8526a964-ef87-4924-a331-73013b31f553_resources/Nvl.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]
OK
ali
liton
tencent
end
end
end
Time taken: 1.747 seconds, Fetched: 6 row(s)
hive>

同理,在 Hive 命令行中创建的全局自定义函数,也可以在 oracle sql developer 中调用:

hive> create function NullReplace_hcmd  as 'hive.function.generic.genericNvl' using jar 'hdfs:///user/hive/warehouse/Nvl.jar' ;
Added [/tmp/8526a964-ef87-4924-a331-73013b31f553_resources/Nvl.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]
OK
Time taken: 0.047 seconds
hive> select NullReplace_hcmd(name,"end") as name from default.employee;
OK
ali
liton
tencent
end
end
end
Time taken: 0.146 seconds, Fetched: 6 row(s)
hive>

如果 oracle sql developer 打开则重启,然后调用 hive 命令行创建的全局自定义函数:

执行调用函数:

select default.NullReplace_hcmd2(name,"end") as name from default.employee;在行: 6 上开始执行命令时出错 -
select default.NullReplace_hcmd2(name,"end") as name from default.employee
错误位于命令行: 6 列: 1
错误报告 -
SQL 错误: [Cloudera][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 10011, SQL state: TStatus(statusCode:ERROR_STATUS, infoMessages:[*org.apache.hive.service.cli.HiveSQLException:Error while compiling statement: FAILED: SemanticException [Error 10011]: Invalid function default.NullReplace_hcmd2:17:16, org.apache.hive.service.cli.operation.Operation:toSQLException:Operation.java:380, org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:206, org.apache.hive.service.cli.operation.SQLOperation:runInternal:SQLOperation.java:290, org.apache.hive.service.cli.operation.Operation:run:Operation.java:320, org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:530, org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementAsync:HiveSessionImpl.java:517, org.apache.hive.service.cli.CLIService:executeStatementAsync:CLIService.java:310, org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:530, org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1437, org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1422, org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56, org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286, java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142,
java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617, java.lang.Thread:run:Thread.java:745, *org.apache.hadoop.hive.ql.parse.SemanticException:Invalid function default.NullReplace_hcmd2:28:12, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1GetAllAggregations:SemanticAnalyzer.java:636, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1GetAggregationsFromSelect:SemanticAnalyzer.java:558, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1464, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1768, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1768, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:genResolvedParseTree:SemanticAnalyzer.java:11072, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:analyzeInternal:SemanticAnalyzer.java:11133, org.apache.hadoop.hive.ql.parse.CalcitePlanner:analyzeInternal:CalcitePlanner.java:286, org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer:analyze:BaseSemanticAnalyzer.java:258, org.apache.hadoop.hive.ql.Driver:compile:Driver.java:512, org.apache.hadoop.hive.ql.Driver:compileInternal:Driver.java:1317, org.apache.hadoop.hive.ql.Driver:compileAndRespond:Driver.java:1295, org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:204], sqlState:42000, errorCode:10011, errorMessage:Error while compiling statement: FAILED: SemanticException [Error 10011]: Invalid function default.NullReplace_hcmd2), Query: select default.NullReplace_hcmd2(name,"end") as name from default.employee.

查询 metaData store 数据库,不难发现函数是全部创建成功了,但权限问题隔离了用户访问权限:

SELECT TOP (1000) [FUNC_ID],[CLASS_NAME],[CREATE_TIME],[DB_ID],[FUNC_NAME],[FUNC_TYPE],[OWNER_NAME],[OWNER_TYPE]FROM [metadata].[dbo].[FUNCS]

image

Hive 的权限问题,另开一章讲。

重新编译 Hive

当有十足的把握和复用的必要,提交自定义函数,重新编译 Hive ,是解决覆盖率和及时性的惯用方法。但缼点也很明显,容易造成系统不稳定。所以 Hive 开发小组才有了 Create Function 即可全局使用函数这个补救措施。

这篇关于Hive 编程专题十 : 泛型函数的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/401224

相关文章

C#反射编程之GetConstructor()方法解读

《C#反射编程之GetConstructor()方法解读》C#中Type类的GetConstructor()方法用于获取指定类型的构造函数,该方法有多个重载版本,可以根据不同的参数获取不同特性的构造函... 目录C# GetConstructor()方法有4个重载以GetConstructor(Type[]

【专题】2024飞行汽车技术全景报告合集PDF分享(附原数据表)

原文链接: https://tecdat.cn/?p=37628 6月16日,小鹏汇天旅航者X2在北京大兴国际机场临空经济区完成首飞,这也是小鹏汇天的产品在京津冀地区进行的首次飞行。小鹏汇天方面还表示,公司准备量产,并计划今年四季度开启预售小鹏汇天分体式飞行汽车,探索分体式飞行汽车城际通勤。阅读原文,获取专题报告合集全文,解锁文末271份飞行汽车相关行业研究报告。 据悉,业内人士对飞行汽车行业

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空