本文主要是介绍`Hive`UDF and UDTF example,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Hive的UDF函数与UDTF函数自定义以及使用
本篇主要讲解hive自定义函数的使用以及在hive上进行部署,主要功能是将复杂的json
字符串转化成简单的字段,原始数据如下
"1541217850324|{\"cm\":{\"mid\":\"m7856\",\"uid\":\"u8739\",\"ln\":\"-74.8\",\"sv\":\"V2.2.2\",\"os\":\"8.1.3\",\"g\":\"P7XC9126@gmail.com\",\"nw\":\"3G\",\"l\":\"es\",\"vc\":\"6\",\"hw\":\"640*960\",\"ar\":\"MX\",\"t\":\"1541204134250\",\"la\":\"-31.7\",\"md\":\"huawei-17\",\"vn\":\"1.1.2\",\"sr\":\"O\",\"ba\":\"Huawei\"},\"ap\":\"weather\",\"et\":[{\"ett\":\"1541146624055\",\"en\":\"display\",\"kv\":{\"goodsid\":\"n4195\",\"copyright\":\"ESPN\",\"content_provider\":\"CNN\",\"extend2\":\"5\",\"action\":\"2\",\"extend1\":\"2\",\"place\":\"3\",\"showtype\":\"2\",\"category\":\"72\",\"newstype\":\"5\"}},{\"ett\":\"1541213331817\",\"en\":\"loading\",\"kv\":{\"extend2\":\"\",\"loading_time\":\"15\",\"action\":\"3\",\"extend1\":\"\",\"type1\":\"\",\"type\":\"3\",\"loading_way\":\"1\"}},{\"ett\":\"1541126195645\",\"en\":\"ad\",\"kv\":{\"entry\":\"3\",\"show_style\":\"0\",\"action\":\"2\",\"detail\":\"325\",\"source\":\"4\",\"behavior\":\"2\",\"content\":\"1\",\"newstype\":\"5\"}},{\"ett\":\"1541202678812\",\"en\":\"notification\",\"kv\":{\"ap_time\":\"1541184614380\",\"action\":\"3\",\"type\":\"4\",\"content\":\"\"}},{\"ett\":\"1541194686688\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"3\"}}]}"
maven
依赖如下
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
udf
函数代码
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;public class BaseFieldUDF extends UDF {public String evaluate(String line, String jsonkeysString) {// 0 准备一个sbStringBuilder sb = new StringBuilder();// 1 切割jsonkeys mid uid vc vn l sr os ar mdString[] jsonkeys = jsonkeysString.split(",");// 2 处理line 服务器时间 | jsonString[] logContents = line.split("\\|");// 3 合法性校验if (logContents.length != 2 || StringUtils.isBlank(logContents[1])) {return "";}// 4 开始处理jsontry {JSONObject jsonObject = new JSONObject(logContents[1]);// 获取cm里面的对象JSONObject base = jsonObject.getJSONObject("cm");// 循环遍历取值for (int i = 0; i < jsonkeys.length; i++) {String filedName = jsonkeys[i].trim();if (base.has(filedName)) {sb.append(base.getString(filedName)).append("\t");} else {sb.append("\t");}}//拼接et整个json数组字符串sb.append(jsonObject.getString("et")).append("\t");sb.append(logContents[0]).append("\t");} catch (JSONException e) {e.printStackTrace();}return sb.toString();}
udtf
函数代码
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;import java.util.ArrayList;public class EventJsonUDTF extends GenericUDTF {//该方法中,我们将指定输出参数的名称和参数类型:@Overridepublic StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {ArrayList<String> fieldNames = new ArrayList<String>();ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();fieldNames.add("event_name");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("event_json");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}//输入1条记录,输出若干条结果@Overridepublic void process(Object[] objects) throws HiveException {// 获取传入的etString input = objects[0].toString();// 如果传进来的数据为空,直接返回过滤掉该数据if (StringUtils.isBlank(input)) {return;} else {try {// 获取一共有几个事件(ad/facoriters)JSONArray ja = new JSONArray(input);if (ja == null)return;// 循环遍历每一个事件for (int i = 0; i < ja.length(); i++) {String[] result = new String[2];try {// 取出每个的事件名称(ad/facoriters)result[0] = ja.getJSONObject(i).getString("en");// 取出每一个事件整体result[1] = ja.getString(i);} catch (JSONException e) {continue;}// 将结果返回forward(result);}} catch (JSONException e) {e.printStackTrace();}}}//当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出@Overridepublic void close() throws HiveException {}
}
通过maven打包上传
# NOTE: 如果只是需要创建临时函数,我们只需要将函数放在Hive服务器本地的lib目录下,
# NOTE: 如果需要创建永久函数,我们需要将这个UDTF的包上传到HDFS的目录
hadoop fs -mkdir -p /user/hive/jars
hadoop fs -put xxxxx.jar /user/hive/jars
hive创建自定义函数
#方式一:本地
>hive
add jar /opt/module/hive/xxxxxx.jar;
CREATE FUNCTION explode_json_array AS 'com.shufang.tdtf.EventJsonUDTF';
DESC FUNCTION EXTENDED explode_json_array ; --查看函数的使用方式#函数也是有层级概念的 ,这个跟greenplum的函数一样,需要通过schema.funname()调用,如:
# select dws.fun1()
CREATE [TEMPORARY] FUNCTION [dws.]explode_json_array AS 'com.shufang.tdtf.EventJsonUDTF' \
USING JAR 'hdfs://user/hive/jars/xxxxxxx.jar';
怎么替换函数jar包
如果我们发现创建的函数逻辑不正确,那么我们只需要在逻辑重写之后,将代码重新打包上替换之前的jar包即可,不需要drop function然后重新创建。
这篇关于`Hive`UDF and UDTF example的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!