本文主要是介绍MapReduce的自定义inputFormat(合并小文件),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案
分析(小文件的优化无非以下几种方式:)
1、 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
2、 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
3、 在mapreduce处理时,可采用combineInputFormat提高效率
实现(本节实现的是上述第二种方式)
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件
自定义InputFromat:
public class Inputfromat extends FileInputFormat {@Override
public class Inputfromat extends FileInputFormat {@Overridepublic RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {RR rr = new RR();rr.initialize(inputSplit,context);return rr;}
}
自定义RecordReader(RR):
public class RR extends RecordReader {private FileSplit fileSplit;private Configuration configuration;private BytesWritable value = new BytesWritable();private Boolean b=false;@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {fileSplit = (FileSplit) inputSplit;configuration = taskAttemptContext.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!b){Path path = fileSplit.getPath();FileSystem fileSystem = path.getFileSystem(configuration);FSDataInputStream open = fileSystem.open(path);byte[] contents = new byte[(int) fileSplit.getLength()];IOUtils.readFully(open,contents,0, contents.length);value.set(contents,0,contents.length);b=true;return true;}return false;}@Overridepublic Object getCurrentKey() throws IOException, InterruptedException {return new Text(fileSplit.getPath().getName());}@Overridepublic Object getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}
定义map:
public class Map extends Mapper<Text, BytesWritable,Text,BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key,value);}
}
定义程序运行main方法:
public class Dirver {public static void main(String[] args)throws Exception {Job job = Job.getInstance(new Configuration(), "inputfromat");job.setJarByClass(Dirver.class);job.setMapperClass(Map.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setInputFormatClass(Inputfromat.class);Inputfromat.addInputPath(job,new Path("E:\\自定义inputformat_小文件合并\\input"));job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job,new Path("E:\\output"));boolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}
这篇关于MapReduce的自定义inputFormat(合并小文件)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!