HIVE 处理日志,自定义inputformat 完整版

2023-12-09 04:09

本文主要是介绍HIVE 处理日志,自定义inputformat 完整版,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

网上找了很多材料都是写了部份代码的,今天在峰哥的帮助下实现了此功能。


为何要设置此功能是由于 hive fields terminated by '||||' 不支持 字符串导致


将你的inputformat类打成jar包,如MyInputFormat.jar
将MyInputFormat.jar放到 hive/lib里,然后就可以建表了
假设你的inputFormat类路径是com.hive.myinput
则建表语句为:create table tbname(name stirng,id int, ...) stored as INPUTFORMAT 'com.hive.myinput' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

HiveIgnoreKeyTextOutputFormat是系统自带的outputformat类,你也可以自定义

由于hive是基于hadoop集群运行的,所以hadoop/lib里面也必须放入MyInputFormat.jar,


此功能需要二个CLASS 类:ClickstreamInputFormat ClickstreamRecordReader


package com.jd.cloud.clickstore;import java.io.IOException; import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.JobConfigurable; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextInputFormat;/** 
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat 
* 
* @author winston 
* 
*/ 
public class ClickstreamInputFormat extends TextInputFormat implements JobConfigurable { public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new ClickstreamRecordReader((FileSplit) genericSplit,job); } 
} 



package com.jd.cloud.clickstore;import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;public class ClickstreamRecordReader implementsRecordReader<LongWritable, Text> {private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader lineReader;int maxLineLength;public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)throws IOException {maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",Integer.MAX_VALUE);start = inputSplit.getStart();end = start + inputSplit.getLength();final Path file = inputSplit.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);// Open file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(file);boolean skipFirstLine = false;if (codec != null) {lineReader = new LineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;--start;fileIn.seek(start);}lineReader = new LineReader(fileIn, job);}if (skipFirstLine) {start += lineReader.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,int maxLineLength) {this.maxLineLength = maxLineLength;this.lineReader = new LineReader(in);this.start = offset;this.pos = offset;this.end = endOffset;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,Configuration job) throws IOException {this.maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength", Integer.MAX_VALUE);this.lineReader = new LineReader(in, job);this.start = offset;this.pos = offset;this.end = endOffset;}public LongWritable createKey() {return new LongWritable();}public Text createValue() {return new Text();}/*** Reads the next record in the split. get usefull fields from the raw nginx* log.* * @param key* key of the record which will map to the byte offset of the* record's line* @param value* the record in text format* @return true if a record existed, false otherwise* @throws IOException*/public synchronized boolean next(LongWritable key, Text value)throws IOException {// Stay within the splitwhile (pos < end) {key.set(pos);int newSize = lineReader.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0)return false;String str = value.toString().toLowerCase().replaceAll("\\@\\_\\@", "\001");value.set(str);pos += newSize;if (newSize < maxLineLength)return true;}return false;}public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized long getPos() throws IOException {return pos;}public synchronized void close() throws IOException {if (lineReader != null)lineReader.close();}// 测试 输出//public static void main(String ags[]){// String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001");// System.out.println(str1);//}
}




1.上传到 HIVE 服务器上 JAVAC 编译

javac -cp ./:/usr/lib/hadoop/hadoop-common.jar:/home/op1/hadoop/hadoop-core-1.0.3.jar:/usr/lib/hadoop/lib/commons-logging-1.1.1.jar */**/*/*/*


2.JAR 打包 类文件

jar -cf ClickstreamInputFormat.jar /home/op1/uerdwdb/src/

3.复制 Hive/lib Hadoop/lib 文件夹内


4.Hive 创建表命令

create table hive_text(num int,name string,`add` string)
stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
location '/home/op1/uerdwdb/text.txt';

这篇关于HIVE 处理日志,自定义inputformat 完整版的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/472421

相关文章

SpringBoot项目注入 traceId 追踪整个请求的日志链路(过程详解)

《SpringBoot项目注入traceId追踪整个请求的日志链路(过程详解)》本文介绍了如何在单体SpringBoot项目中通过手动实现过滤器或拦截器来注入traceId,以追踪整个请求的日志链... SpringBoot项目注入 traceId 来追踪整个请求的日志链路,有了 traceId, 我们在排

Python自动化处理手机验证码

《Python自动化处理手机验证码》手机验证码是一种常见的身份验证手段,广泛应用于用户注册、登录、交易确认等场景,下面我们来看看如何使用Python自动化处理手机验证码吧... 目录一、获取手机验证码1.1 通过短信接收验证码1.2 使用第三方短信接收服务1.3 使用ADB读取手机短信1.4 通过API获取

Python自动化Office文档处理全攻略

《Python自动化Office文档处理全攻略》在日常办公中,处理Word、Excel和PDF等Office文档是再常见不过的任务,手动操作这些文档不仅耗时耗力,还容易出错,幸运的是,Python提供... 目录一、自动化处理Word文档1. 安装python-docx库2. 读取Word文档内容3. 修改

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

java如何通过Kerberos认证方式连接hive

《java如何通过Kerberos认证方式连接hive》该文主要介绍了如何在数据源管理功能中适配不同数据源(如MySQL、PostgreSQL和Hive),特别是如何在SpringBoot3框架下通过... 目录Java实现Kerberos认证主要方法依赖示例续期连接hive遇到的问题分析解决方式扩展思考总

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

CSS自定义浏览器滚动条样式完整代码

《CSS自定义浏览器滚动条样式完整代码》:本文主要介绍了如何使用CSS自定义浏览器滚动条的样式,包括隐藏滚动条的角落、设置滚动条的基本样式、轨道样式和滑块样式,并提供了完整的CSS代码示例,通过这些技巧,你可以为你的网站添加个性化的滚动条样式,从而提升用户体验,详细内容请阅读本文,希望能对你有所帮助...

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

开启mysql的binlog日志步骤详解

《开启mysql的binlog日志步骤详解》:本文主要介绍MySQL5.7版本中二进制日志(bin_log)的配置和使用,文中通过图文及代码介绍的非常详细,需要的朋友可以参考下... 目录1.查看是否开启bin_log2.数据库会把日志放进logs目录中3.查看log日志总结 mysql版本5.71.查看