MapReduce的自定义inputFormat(合并小文件)

2024-04-07 12:38

本文主要是介绍MapReduce的自定义inputFormat(合并小文件),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

分析(小文件的优化无非以下几种方式:)
1、 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
2、 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
3、 在mapreduce处理时,可采用combineInputFormat提高效率

实现(本节实现的是上述第二种方式)
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件

自定义InputFromat:

public class Inputfromat extends FileInputFormat {@Override
public class Inputfromat extends FileInputFormat {@Overridepublic RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {RR rr = new RR();rr.initialize(inputSplit,context);return rr;}
}

自定义RecordReader(RR):

public class RR extends RecordReader {private FileSplit fileSplit;private Configuration configuration;private BytesWritable value = new BytesWritable();private Boolean b=false;@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {fileSplit = (FileSplit) inputSplit;configuration = taskAttemptContext.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!b){Path path = fileSplit.getPath();FileSystem fileSystem = path.getFileSystem(configuration);FSDataInputStream open = fileSystem.open(path);byte[] contents = new byte[(int) fileSplit.getLength()];IOUtils.readFully(open,contents,0, contents.length);value.set(contents,0,contents.length);b=true;return true;}return false;}@Overridepublic Object getCurrentKey() throws IOException, InterruptedException {return new Text(fileSplit.getPath().getName());}@Overridepublic Object getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}

定义map:

public class Map extends Mapper<Text, BytesWritable,Text,BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key,value);}
}

定义程序运行main方法:

public class Dirver {public static void main(String[] args)throws Exception {Job job = Job.getInstance(new Configuration(), "inputfromat");job.setJarByClass(Dirver.class);job.setMapperClass(Map.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setInputFormatClass(Inputfromat.class);Inputfromat.addInputPath(job,new Path("E:\\自定义inputformat_小文件合并\\input"));job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job,new Path("E:\\output"));boolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}

这篇关于MapReduce的自定义inputFormat(合并小文件)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/882624

相关文章

Java实现自定义table宽高的示例代码

《Java实现自定义table宽高的示例代码》在桌面应用、管理系统乃至报表工具中,表格(JTable)作为最常用的数据展示组件,不仅承载对数据的增删改查,还需要配合布局与视觉需求,而JavaSwing... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam

如何自定义一个log适配器starter

《如何自定义一个log适配器starter》:本文主要介绍如何自定义一个log适配器starter的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录需求Starter 项目目录结构pom.XML 配置LogInitializer实现MDCInterceptor

Druid连接池实现自定义数据库密码加解密功能

《Druid连接池实现自定义数据库密码加解密功能》在现代应用开发中,数据安全是至关重要的,本文将介绍如何在​​Druid​​连接池中实现自定义的数据库密码加解密功能,有需要的小伙伴可以参考一下... 目录1. 环境准备2. 密码加密算法的选择3. 自定义 ​​DruidDataSource​​ 的密码解密3

spring-gateway filters添加自定义过滤器实现流程分析(可插拔)

《spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔)》:本文主要介绍spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔),本文通过实例图... 目录需求背景需求拆解设计流程及作用域逻辑处理代码逻辑需求背景公司要求,通过公司网络代理访问的请求需要做请

Python中合并列表(list)的六种方法小结

《Python中合并列表(list)的六种方法小结》本文主要介绍了Python中合并列表(list)的六种方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录一、直接用 + 合并列表二、用 extend() js方法三、用 zip() 函数交叉合并四、用

利用Python实现Excel文件智能合并工具

《利用Python实现Excel文件智能合并工具》有时候,我们需要将多个Excel文件按照特定顺序合并成一个文件,这样可以更方便地进行后续的数据处理和分析,下面我们看看如何使用Python实现Exce... 目录运行结果为什么需要这个工具技术实现工具的核心功能代码解析使用示例工具优化与扩展有时候,我们需要将

Python实现获取带合并单元格的表格数据

《Python实现获取带合并单元格的表格数据》由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,所以本文我们就来聊聊如何使用Python实现获取带合并单元格的表格数据吧... 由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,现将将封装成类,并通过调用list_exc

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定