Hive UDAF开发详解

2024-01-05 02:58
文章标签 详解 开发 hive udaf

本文主要是介绍Hive UDAF开发详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

这篇文章是来自 Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不严格翻译,因为翻译的文章示例写得比较通俗易懂,此外,我把自己对于Hive的UDAF理解穿插到文章里面。

udfa是hive中用户自定义的聚集函数,hive内置UDAF函数包括有sum()与count(),UDAF实现有简单与通用两种方式,简单UDAF因为使用Java反射导致性能损失,而且有些特性不能使用,已经被弃用了;在这篇博文中我们将关注Hive中自定义聚类函数-GenericUDAF,UDAF开发主要涉及到以下两个抽象类:

[java]  view plain  copy
  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver  
  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator  

源码链接

博文中的所有的代码和数据可以在以下链接找到: hive examples

示例数据准备

首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。

[plain]  view plain  copy
  1. ~$ cat ./people.txt  
  2.   
  3. John Smith  
  4. John and Ann White  
  5. Ted Green  
  6. Dorothy  

把该文件上载到hdfs目录/user/matthew/people中:

[plain]  view plain  copy
  1. hadoop fs -mkdir people  
  2. hadoop fs -put ./people.txt people  

下面要创建hive外部表,在hive shell中执行

[sql]  view plain  copy
  1. CREATE EXTERNAL TABLE people (name string)  
  2. ROW FORMAT DELIMITED FIELDS   
  3.     TERMINATED BY '\t'   
  4.     ESCAPED BY ''   
  5.     LINES TERMINATED BY '\n'  
  6. STORED AS TEXTFILE   
  7. LOCATION '/user/matthew/people';  

相关抽象类介绍

创建一个GenericUDAF必须先了解以下两个抽象类:
[java]  view plain  copy
  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver   
[java]  view plain  copy
  1. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator  

为了更好理解上述抽象类的API,要记住hive只是mapreduce函数,只不过hive已经帮助我们写好并隐藏mapreduce,向上提供简洁的sql函数,所以我们要结合Mapper、Combiner与Reducer来帮助我们理解这个函数。要记住在hadoop集群中有若干台机器,在不同的机器上Mapper与Reducer任务独立运行。

所以大体上来说,这个UDAF函数读取数据(mapper),聚集一堆mapper输出到部分聚集结果(combiner),并且最终创建一个最终的聚集结果(reducer)。因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果。

AbstractGenericUDAFResolver

Resolver很简单,要覆盖实现下面方法,该方法会根据sql传人的参数数据格式指定调用哪个Evaluator进行处理。

[java]  view plain  copy
  1. <span style="background-color: rgb(255, 255, 255);"><span style="font-size:14px;">public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;</span></span>  

GenericUDAFEvaluator

UDAF逻辑处理主要发生在Evaluator中,要实现该抽象类的几个方法。

在理解Evaluator之前,必须先理解objectInspector接口与GenericUDAFEvaluator中的内部类Model。

ObjectInspector

作用主要是解耦数据使用与数据格式,使得数据流在输入输出端切换不同的输入输出格式,不同的Operator上使用不同的格式。可以参考这两篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有关于objectinspector的介绍。
Model

Model代表了UDAF在mapreduce的各个阶段。

[java]  view plain  copy
  1. public static enum Mode {  
  2.     /** 
  3.      * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合 
  4.      * 将会调用iterate()和terminatePartial() 
  5.      */  
  6.     PARTIAL1,  
  7.         /** 
  8.      * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合: 
  9.      * 将会调用merge() 和 terminatePartial()  
  10.      */  
  11.     PARTIAL2,  
  12.         /** 
  13.      * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合  
  14.      * 将会调用merge()和terminate() 
  15.      */  
  16.     FINAL,  
  17.         /** 
  18.      * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合 
  19.       * 将会调用 iterate()和terminate() 
  20.      */  
  21.     COMPLETE  
  22.   };  

一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

GenericUDAFEvaluator的方法
[java]  view plain  copy
  1. // 确定各个阶段输入输出参数的数据格式ObjectInspectors  
  2. public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;  
  3.   
  4. // 保存数据聚集结果的类  
  5. abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;  
  6.   
  7. // 重置聚集结果  
  8. public void reset(AggregationBuffer agg) throws HiveException;  
  9.   
  10. // map阶段,迭代处理输入sql传过来的列数据  
  11. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;  
  12.   
  13. // map与combiner结束返回结果,得到部分数据聚集结果  
  14. public Object terminatePartial(AggregationBuffer agg) throws HiveException;  
  15.   
  16. // combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。  
  17. public void merge(AggregationBuffer agg, Object partial) throws HiveException;  
  18.   
  19. // reducer阶段,输出最终结果  
  20. public Object terminate(AggregationBuffer agg) throws HiveException;  
图解Model与Evaluator关系

Model各阶段对应Evaluator方法调用



Evaluator各个阶段下处理mapreduce流程

实例

下面将讲述一个聚集函数UDAF的实例,我们将计算people这张表中的name列字母的个数。

下面的函数代码是计算指定列中字符的总数(包括空格)

代码

[java]  view plain  copy
  1. @Description(name = "letters", value = "_FUNC_(expr) - 返回该列中所有字符串的字符总数")  
  2. public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {  
  3.   
  4.     @Override  
  5.     public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  
  6.             throws SemanticException {  
  7.         if (parameters.length != 1) {  
  8.             throw new UDFArgumentTypeException(parameters.length - 1,  
  9.                     "Exactly one argument is expected.");  
  10.         }  
  11.           
  12.         ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);  
  13.           
  14.         if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){  
  15.             throw new UDFArgumentTypeException(0,  
  16.                             "Argument must be PRIMITIVE, but "  
  17.                             + oi.getCategory().name()  
  18.                             + " was passed.");  
  19.         }  
  20.           
  21.         PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;  
  22.           
  23.         if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){  
  24.             throw new UDFArgumentTypeException(0,  
  25.                             "Argument must be String, but "  
  26.                             + inputOI.getPrimitiveCategory().name()  
  27.                             + " was passed.");  
  28.         }  
  29.           
  30.         return new TotalNumOfLettersEvaluator();  
  31.     }  
  32.   
  33.     public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {  
  34.   
  35.         PrimitiveObjectInspector inputOI;  
  36.         ObjectInspector outputOI;  
  37.         PrimitiveObjectInspector integerOI;  
  38.           
  39.         int total = 0;  
  40.   
  41.         @Override  
  42.         public ObjectInspector init(Mode m, ObjectInspector[] parameters)  
  43.                 throws HiveException {  
  44.               
  45.             assert (parameters.length == 1);  
  46.             super.init(m, parameters);  
  47.              
  48.              //map阶段读取sql列,输入为String基础数据格式  
  49.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {  
  50.                 inputOI = (PrimitiveObjectInspector) parameters[0];  
  51.             } else {  
  52.             //其余阶段,输入为Integer基础数据格式  
  53.                 integerOI = (PrimitiveObjectInspector) parameters[0];  
  54.             }  
  55.   
  56.              // 指定各个阶段输出数据格式都为Integer类型  
  57.             outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,  
  58.                     ObjectInspectorOptions.JAVA);  
  59.             return outputOI;  
  60.   
  61.         }  
  62.   
  63.         /** 
  64.          * 存储当前字符总数的类 
  65.          */  
  66.         static class LetterSumAgg implements AggregationBuffer {  
  67.             int sum = 0;  
  68.             void add(int num){  
  69.                 sum += num;  
  70.             }  
  71.         }  
  72.   
  73.         @Override  
  74.         public AggregationBuffer getNewAggregationBuffer() throws HiveException {  
  75.             LetterSumAgg result = new LetterSumAgg();  
  76.             return result;  
  77.         }  
  78.   
  79.         @Override  
  80.         public void reset(AggregationBuffer agg) throws HiveException {  
  81.             LetterSumAgg myagg = new LetterSumAgg();  
  82.         }  
  83.           
  84.         private boolean warned = false;  
  85.   
  86.         @Override  
  87.         public void iterate(AggregationBuffer agg, Object[] parameters)  
  88.                 throws HiveException {  
  89.             assert (parameters.length == 1);  
  90.             if (parameters[0] != null) {  
  91.                 LetterSumAgg myagg = (LetterSumAgg) agg;  
  92.                 Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);  
  93.                 myagg.add(String.valueOf(p1).length());  
  94.             }  
  95.         }  
  96.   
  97.         @Override  
  98.         public Object terminatePartial(AggregationBuffer agg) throws HiveException {  
  99.             LetterSumAgg myagg = (LetterSumAgg) agg;  
  100.             total += myagg.sum;  
  101.             return total;  
  102.         }  
  103.   
  104.         @Override  
  105.         public void merge(AggregationBuffer agg, Object partial)  
  106.                 throws HiveException {  
  107.             if (partial != null) {  
  108.                   
  109.                 LetterSumAgg myagg1 = (LetterSumAgg) agg;  
  110.                   
  111.                 Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);  
  112.                   
  113.                 LetterSumAgg myagg2 = new LetterSumAgg();  
  114.                   
  115.                 myagg2.add(partialSum);  
  116.                 myagg1.add(myagg2.sum);  
  117.             }  
  118.         }  
  119.   
  120.         @Override  
  121.         public Object terminate(AggregationBuffer agg) throws HiveException {  
  122.             LetterSumAgg myagg = (LetterSumAgg) agg;  
  123.             total = myagg.sum;  
  124.             return myagg.sum;  
  125.         }  
  126.   
  127.     }  
  128. }  

代码说明

 

AggregationBuffer 允许我们保存中间结果,通过定义我们的buffer,我们可以处理任何格式的数据,在代码例子中字符总数保存在AggregationBuffer 。

[java]  view plain  copy
  1. /** 
  2. * 保存当前字符总数的类 
  3. */  
  4. static class LetterSumAgg implements AggregationBuffer {  
  5.     int sum = 0;  
  6.     void add(int num){  
  7.         sum += num;  
  8.     }  
  9. }  

这意味着UDAF在不同的mapreduce阶段会接收到不同的输入。Iterate读取我们表中的一行(或者准确来说是表),然后输出其他数据格式的聚集结果。

artialAggregation合并这些聚集结果到另外相同格式的新的聚集结果,然后最终的reducer取得这些聚集结果然后输出最终结果(该结果或许与接收数据的格式不一致)。

在init()方法中我们指定输入为string,结果输出格式为integer,还有,部分聚集结果输出格式为integer(保存在aggregation buffer中);terminate()terminatePartial()两者输出一个integer

[java]  view plain  copy
  1. // init方法中根据不同的mode指定输出数据的格式objectinspector  
  2. if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {  
  3.     inputOI = (PrimitiveObjectInspector) parameters[0];  
  4. else {  
  5.     integerOI = (PrimitiveObjectInspector) parameters[0];  
  6. }  
  7.   
  8. // 不同model阶段的输出数据格式  
  9. outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,  
  10.                     ObjectInspectorOptions.JAVA);  

iterate()函数读取到每行中列的字符串,计算与保存该字符串的长度

[java]  view plain  copy
  1. public void iterate(AggregationBuffer agg, Object[] parameters)  
  2.     throws HiveException {  
  3.     ...  
  4.     Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);  
  5.     myagg.add(String.valueOf(p1).length());  
  6.     }  
  7. }  

Merge函数增加部分聚集总数到AggregationBuffer

[java]  view plain  copy
  1. public void merge(AggregationBuffer agg, Object partial)  
  2.         throws HiveException {  
  3.     if (partial != null) {  
  4.                   
  5.         LetterSumAgg myagg1 = (LetterSumAgg) agg;  
  6.                   
  7.         Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);  
  8.                   
  9.         LetterSumAgg myagg2 = new LetterSumAgg();  
  10.                   
  11.         myagg2.add(partialSum);  
  12.         myagg1.add(myagg2.sum);  
  13.     }  
  14. }  

Terminate()函数返回AggregationBuffer中的内容,这里产生了最终结果。

[java]  view plain  copy
  1. public Object terminate(AggregationBuffer agg) throws HiveException {  
  2.     LetterSumAgg myagg = (LetterSumAgg) agg;  
  3.     total = myagg.sum;  
  4.     return myagg.sum;  
  5. }  

使用自定义函数

[plain]  view plain  copy
  1. ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;  
  2. CREATE TEMPORARY FUNCTION letters as 'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';  
  3.   
  4. SELECT letters(name) FROM people;  
  5. OK  
  6. 44  
  7. Time taken: 20.688 seconds  

资料参考

http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

http://blog.csdn.net/duguduchong/article/details/8684963

这篇关于Hive UDAF开发详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/571448

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

6.1.数据结构-c/c++堆详解下篇(堆排序,TopK问题)

上篇:6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)-CSDN博客 本章重点 1.使用堆来完成堆排序 2.使用堆解决TopK问题 目录 一.堆排序 1.1 思路 1.2 代码 1.3 简单测试 二.TopK问题 2.1 思路(求最小): 2.2 C语言代码(手写堆) 2.3 C++代码(使用优先级队列 priority_queue)

Linux_kernel驱动开发11

一、改回nfs方式挂载根文件系统         在产品将要上线之前,需要制作不同类型格式的根文件系统         在产品研发阶段,我们还是需要使用nfs的方式挂载根文件系统         优点:可以直接在上位机中修改文件系统内容,延长EMMC的寿命         【1】重启上位机nfs服务         sudo service nfs-kernel-server resta

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

Vue3项目开发——新闻发布管理系统(六)

文章目录 八、首页设计开发1、页面设计2、登录访问拦截实现3、用户基本信息显示①封装用户基本信息获取接口②用户基本信息存储③用户基本信息调用④用户基本信息动态渲染 4、退出功能实现①注册点击事件②添加退出功能③数据清理 5、代码下载 八、首页设计开发 登录成功后,系统就进入了首页。接下来,也就进行首页的开发了。 1、页面设计 系统页面主要分为三部分,左侧为系统的菜单栏,右侧