Hive的UDF开发之向量化表达式(VectorizedExpressions)

2024-02-22 02:04

本文主要是介绍Hive的UDF开发之向量化表达式(VectorizedExpressions),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 背景

笔者的大数据平台XSailboat的SailWorks模块包含离线分析功能。离线分析的后台实现,包含调度引擎、执行引擎、计算引擎和存储引擎。计算和存储引擎由Hive提供,调度引擎和执行引擎由我们自己实现。调度引擎根据DAG图和调度计划,安排执行顺序,监控执行过程。执行引擎接收调度引擎安排的任务,向Yarn申请容器,在容器中执行具体的任务。

我们的离线分析支持编写Hive的UDF函数,打包上传,并声明使用函数。
在这里插入图片描述
我们通常会通过继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF来自定义自己的UDF函数,再参考Hive实现的内置UDF函数时,经常会看到在它的类名上,有@VectorizedExpressions注解,翻译过来即“向量化表达式”。在此记录一下自己学习到的知识和理解。

官方文档《Vectorized Query Execution》
有以下应该至少知道的点:

  1. 向量化查询缺省是关闭的;
  2. 要能支持向量化查询,数据存储格式必需是ORC格式(我们主要是用CSV格式)。

通常所说的向量化计算主要是从以下几个方面提升效率:

  1. 利用CPU底册指令对向量的运算
  2. 利用多核/多线程的能力进行并发计算

而Hive的向量化执行,主要是代码逻辑聚合并充分利用上下文,减少判断次数,减少对象的访问处理和序列化次数,数据切块并行。

2. 实践

package com.cimstech.udf.date;import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.metadata.HiveException;import com.cimstech.xfront.common.excep.WrapException;
import com.cimstech.xfront.common.text.XString;public class VectorUDFStringToTimstamp extends VectorExpression
{private static final long serialVersionUID = 1L;/*** 列序号*/int mColNum0 ;/*** 时间格式*/String mDateFmt ;transient SimpleDateFormat mSdf ;/*** 必需得有1个无参的构造函数.		<br />* hive会先通过无参构造函数创建一个实例,然后调用getDescriptor()方法,取得描述。* 通过描述知道有哪几列,分别是什么格式的,才知道怎么调用有参构造函数。*/public VectorUDFStringToTimstamp(){super() ;}/*** 有参构造函数的参数要和getDescriptor中取得的描述相对应。* Column类型的输入,在此用int类型列序号表示			<br />* 标量列直接是相应类型即可。						* @param aColNum0* @param aDateFmt* @param aOutputColumnNum*/public VectorUDFStringToTimstamp(int aColNum0 , String aDateFmt, int aOutputColumnNum){super(aOutputColumnNum) ;mColNum0 = aColNum0 ;mDateFmt = aDateFmt ;}@Overridepublic String vectorExpressionParameters(){return getColumnParamString(0 , mColNum0)+ " , val " + mDateFmt ;}private void setDatetime(TimestampColumnVector aTimestampColVector, byte[][] aVector, int aElementNum) throws HiveException{if(mSdf == null)mSdf = new SimpleDateFormat(mDateFmt) ;String dateStr = null ;try{dateStr = new String(aVector[aElementNum] , "UTF-8") ;aTimestampColVector.getScratchTimestamp().setTime(mSdf.parse(dateStr).getTime()) ;}catch (UnsupportedEncodingException e){WrapException.wrapThrow(e) ;return ;		// dead code}catch(ParseException e){throw new HiveException(XString.msgFmt("时间字符串[{}]无法按模式[{}]解析!" , dateStr , mDateFmt)) ;}aTimestampColVector.setFromScratchTimestamp(aElementNum);}@Overridepublic void evaluate(VectorizedRowBatch aBatch) throws HiveException{if (childExpressions != null){evaluateChildren(aBatch);}int n = aBatch.size;if (n == 0)return;BytesColumnVector inputColVector = (BytesColumnVector) aBatch.cols[mColNum0];TimestampColumnVector outputColVector = (TimestampColumnVector) aBatch.cols[outputColumnNum];boolean[] inputIsNull = inputColVector.isNull;boolean[] outputIsNull = outputColVector.isNull;byte[][] vector = inputColVector.vector;if (inputColVector.isRepeating){// 如果是重复的,那么只需要解析第1个就行if (inputColVector.noNulls || !inputIsNull[0]){outputIsNull[0] = false;setDatetime(outputColVector, vector, 0);}else{// 重复,且都是null,那么没有可解析的,如下设置即可outputIsNull[0] = true;outputColVector.noNulls = false;}outputColVector.isRepeating = true;return;}elseoutputColVector.isRepeating = false;if (inputColVector.noNulls) 	// 没有为null的{// selectedInUse为true,表示选中输入中的指定行进行处理。if (aBatch.selectedInUse){int[] sel = aBatch.selected;if (!outputColVector.noNulls)		// 全局被标为了有null值,那么各个为止都需要单独设置是否为null{for (int j = 0; j != n; j++){final int i = sel[j] ;outputIsNull[i] = false;		// 某一行,单独设置不为nullsetDatetime(outputColVector, vector, i);}}else{for (int j = 0; j != n; j++){final int i = sel[j];// 全局被标为了没有null值,那么无需一行行标注非nullsetDatetime(outputColVector, vector, i);}}}else{// 输入是全局没有null值的,输出被全局标为了有null值,那么把输出改过来,改为全局没有null值if (!outputColVector.noNulls)		{Arrays.fill(outputIsNull, false);		// 所有输出都非nulloutputColVector.noNulls = true;			// 改为全局没有null值}for (int i = 0; i != n; i++){setDatetime(outputColVector, vector, i);}}}else	// 输入数据是有null的{outputColVector.noNulls = false;if (aBatch.selectedInUse){int[] sel = aBatch.selected;for (int j = 0; j != n; j++){int i = sel[j] ;outputIsNull[i] = inputIsNull[i] ;if(!outputIsNull[i])setDatetime(outputColVector, vector, i) ;}}else{System.arraycopy(inputIsNull, 0, outputIsNull, 0, n);for (int i = 0; i != n; i++){if(!outputIsNull[i])setDatetime(outputColVector, vector, i) ;}}}}@Overridepublic Descriptor getDescriptor(){return (new VectorExpressionDescriptor.Builder())// 不是过滤,都认为是投影(Projection)。投影是数据库理论中的专业术语// 投影是根据输入,构造输出,填充输出列// 过滤就是设置aBatch.selected.setMode(VectorExpressionDescriptor.Mode.PROJECTION)		.setNumArguments(2).setArgumentTypes(VectorExpressionDescriptor.ArgumentType.STRING, VectorExpressionDescriptor.ArgumentType.STRING).setInputExpressionTypes(VectorExpressionDescriptor.InputExpressionType.COLUMN, VectorExpressionDescriptor.InputExpressionType.SCALAR)		// 标量,指定的字符串常量,就是标量.build();}}

这篇关于Hive的UDF开发之向量化表达式(VectorizedExpressions)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/733811

相关文章

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

06 C++Lambda表达式

lambda表达式的定义 没有显式模版形参的lambda表达式 [捕获] 前属性 (形参列表) 说明符 异常 后属性 尾随类型 约束 {函数体} 有显式模版形参的lambda表达式 [捕获] <模版形参> 模版约束 前属性 (形参列表) 说明符 异常 后属性 尾随类型 约束 {函数体} 含义 捕获:包含零个或者多个捕获符的逗号分隔列表 模板形参:用于泛型lambda提供个模板形参的名

Linux_kernel驱动开发11

一、改回nfs方式挂载根文件系统         在产品将要上线之前,需要制作不同类型格式的根文件系统         在产品研发阶段,我们还是需要使用nfs的方式挂载根文件系统         优点:可以直接在上位机中修改文件系统内容,延长EMMC的寿命         【1】重启上位机nfs服务         sudo service nfs-kernel-server resta

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。