HIVE 处理日志,自定义inputformat 完整版

2023-12-09 04:09

本文主要是介绍HIVE 处理日志,自定义inputformat 完整版,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

网上找了很多材料都是写了部份代码的,今天在峰哥的帮助下实现了此功能。


为何要设置此功能是由于 hive fields terminated by '||||' 不支持 字符串导致


将你的inputformat类打成jar包,如MyInputFormat.jar
将MyInputFormat.jar放到 hive/lib里,然后就可以建表了
假设你的inputFormat类路径是com.hive.myinput
则建表语句为:create table tbname(name stirng,id int, ...) stored as INPUTFORMAT 'com.hive.myinput' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

HiveIgnoreKeyTextOutputFormat是系统自带的outputformat类,你也可以自定义

由于hive是基于hadoop集群运行的,所以hadoop/lib里面也必须放入MyInputFormat.jar,


此功能需要二个CLASS 类:ClickstreamInputFormat ClickstreamRecordReader


package com.jd.cloud.clickstore;import java.io.IOException; import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.JobConfigurable; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextInputFormat;/** 
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat 
* 
* @author winston 
* 
*/ 
public class ClickstreamInputFormat extends TextInputFormat implements JobConfigurable { public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new ClickstreamRecordReader((FileSplit) genericSplit,job); } 
} 



package com.jd.cloud.clickstore;import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;public class ClickstreamRecordReader implementsRecordReader<LongWritable, Text> {private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader lineReader;int maxLineLength;public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)throws IOException {maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",Integer.MAX_VALUE);start = inputSplit.getStart();end = start + inputSplit.getLength();final Path file = inputSplit.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);// Open file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(file);boolean skipFirstLine = false;if (codec != null) {lineReader = new LineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;--start;fileIn.seek(start);}lineReader = new LineReader(fileIn, job);}if (skipFirstLine) {start += lineReader.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,int maxLineLength) {this.maxLineLength = maxLineLength;this.lineReader = new LineReader(in);this.start = offset;this.pos = offset;this.end = endOffset;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,Configuration job) throws IOException {this.maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength", Integer.MAX_VALUE);this.lineReader = new LineReader(in, job);this.start = offset;this.pos = offset;this.end = endOffset;}public LongWritable createKey() {return new LongWritable();}public Text createValue() {return new Text();}/*** Reads the next record in the split. get usefull fields from the raw nginx* log.* * @param key* key of the record which will map to the byte offset of the* record's line* @param value* the record in text format* @return true if a record existed, false otherwise* @throws IOException*/public synchronized boolean next(LongWritable key, Text value)throws IOException {// Stay within the splitwhile (pos < end) {key.set(pos);int newSize = lineReader.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0)return false;String str = value.toString().toLowerCase().replaceAll("\\@\\_\\@", "\001");value.set(str);pos += newSize;if (newSize < maxLineLength)return true;}return false;}public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized long getPos() throws IOException {return pos;}public synchronized void close() throws IOException {if (lineReader != null)lineReader.close();}// 测试 输出//public static void main(String ags[]){// String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001");// System.out.println(str1);//}
}




1.上传到 HIVE 服务器上 JAVAC 编译

javac -cp ./:/usr/lib/hadoop/hadoop-common.jar:/home/op1/hadoop/hadoop-core-1.0.3.jar:/usr/lib/hadoop/lib/commons-logging-1.1.1.jar */**/*/*/*


2.JAR 打包 类文件

jar -cf ClickstreamInputFormat.jar /home/op1/uerdwdb/src/

3.复制 Hive/lib Hadoop/lib 文件夹内


4.Hive 创建表命令

create table hive_text(num int,name string,`add` string)
stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
location '/home/op1/uerdwdb/text.txt';

这篇关于HIVE 处理日志,自定义inputformat 完整版的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/472421

相关文章

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

Springboot处理跨域的实现方式(附Demo)

《Springboot处理跨域的实现方式(附Demo)》:本文主要介绍Springboot处理跨域的实现方式(附Demo),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录Springboot处理跨域的方式1. 基本知识2. @CrossOrigin3. 全局跨域设置4.

SpringBoot日志配置SLF4J和Logback的方法实现

《SpringBoot日志配置SLF4J和Logback的方法实现》日志记录是不可或缺的一部分,本文主要介绍了SpringBoot日志配置SLF4J和Logback的方法实现,文中通过示例代码介绍的非... 目录一、前言二、案例一:初识日志三、案例二:使用Lombok输出日志四、案例三:配置Logback一

golang 日志log与logrus示例详解

《golang日志log与logrus示例详解》log是Go语言标准库中一个简单的日志库,本文给大家介绍golang日志log与logrus示例详解,感兴趣的朋友一起看看吧... 目录一、Go 标准库 log 详解1. 功能特点2. 常用函数3. 示例代码4. 优势和局限二、第三方库 logrus 详解1.

python+opencv处理颜色之将目标颜色转换实例代码

《python+opencv处理颜色之将目标颜色转换实例代码》OpenCV是一个的跨平台计算机视觉库,可以运行在Linux、Windows和MacOS操作系统上,:本文主要介绍python+ope... 目录下面是代码+ 效果 + 解释转HSV: 关于颜色总是要转HSV的掩膜再标注总结 目标:将红色的部分滤

Python实现自动化接收与处理手机验证码

《Python实现自动化接收与处理手机验证码》在移动互联网时代,短信验证码已成为身份验证、账号注册等环节的重要安全手段,本文将介绍如何利用Python实现验证码的自动接收,识别与转发,需要的可以参考下... 目录引言一、准备工作1.1 硬件与软件需求1.2 环境配置二、核心功能实现2.1 短信监听与获取2.

Vue中组件之间传值的六种方式(完整版)

《Vue中组件之间传值的六种方式(完整版)》组件是vue.js最强大的功能之一,而组件实例的作用域是相互独立的,这就意味着不同组件之间的数据无法相互引用,针对不同的使用场景,如何选择行之有效的通信方式... 目录前言方法一、props/$emit1.父组件向子组件传值2.子组件向父组件传值(通过事件形式)方

如何自定义Nginx JSON日志格式配置

《如何自定义NginxJSON日志格式配置》Nginx作为最流行的Web服务器之一,其灵活的日志配置能力允许我们根据需求定制日志格式,本文将详细介绍如何配置Nginx以JSON格式记录访问日志,这种... 目录前言为什么选择jsON格式日志?配置步骤详解1. 安装Nginx服务2. 自定义JSON日志格式各