mapreduce将若干小文件合成大文件

2024-09-05 08:58
文章标签 若干 合成 mapreduce

本文主要是介绍mapreduce将若干小文件合成大文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、思路:

http://blog.yfteach.com/?p=815,注意原文中有一个错误,就是FileInputformat中并没有找到createRecordReader这个方法,应该在TextInputFormat中有,而不是textFileInputFormat

2、编码:

      第一步:编写将整个文件作为一条记录处理的类,即实现FileInputFormat.

注意:FileInputFormat本身有很多子类,也实现了很多不同的输入格式,如下


       特别注意:KeyValueTextInputFormat这个是以一行的<key,value>形式作为输入的,默认分隔符是Tab键,比如可以使用KeyValueTextInputFormat对WordCount程序进行修改

package com.SmallFilesToSequenceFileConverter.hadoop;import java.io.IOException;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;//将整个文件作为一条记录处理
public class WholeFileInputFormat extends FileInputFormat<NullWritable,Text>{//表示文件不可分@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {WholeRecordReader reader=new WholeRecordReader();reader.initialize(split, context);return reader;}}

第二步:实现RecordReader,为自定义的InputFormat服务

package com.SmallFilesToSequenceFileConverter.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;//实现RecordReader,为自定义的InputFormat服务
public class WholeRecordReader extends RecordReader<NullWritable,Text>{private FileSplit fileSplit;private Configuration conf;private Text value=new Text();private boolean processed=false;//表示记录是否被处理过@Overridepublic NullWritable getCurrentKey() throws IOException,InterruptedException {return NullWritable.get();}@Overridepublic Text getCurrentValue() throws IOException,InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return processed? 1.0f : 0.0f;}@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit=(FileSplit)split;this.conf=context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if(!processed){byte[]contents=new byte[(int)fileSplit.getLength()];Path file=fileSplit.getPath();FileSystem fs=file.getFileSystem(conf);FSDataInputStream in=null;try{in=fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents,0,contents.length);}finally{IOUtils.closeStream(in);}processed=true;return true;}return false;}@Overridepublic void close() throws IOException {// TODO Auto-generated method stub}}

第三步:编写主类

package com.SmallFilesToSequenceFileConverter.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SmallFilesToSequenceFileConverter{private static class SequenceFileMapper extends Mapper<NullWritable,Text,Text,Text>{private Text filenameKey;//setup在task之前调用,用来初始化filenamekey@Overrideprotected void setup(Context context)throws IOException, InterruptedException {InputSplit split=context.getInputSplit();Path path=((FileSplit)split).getPath();filenameKey=new Text(path.toString());}@Overrideprotected void map(NullWritable key,Text value,Context context)throws IOException, InterruptedException {context.write(filenameKey, value);}}public static void main(String[] args) throws Exception {Configuration conf=new Configuration();Job job=Job.getInstance(conf,"SmallFilesToSequenceFileConverter");job.setJarByClass(SmallFilesToSequenceFileConverter.class);job.setInputFormatClass(WholeFileInputFormat.class);//job.setOutputFormatClass(TextOutputFormat.class);job.setMapperClass(SequenceFileMapper.class);//设置最终的输出job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0:1);}
}

3、测试

原始文件:在input目录下有a、b、c、d、e五个文件,

a文件的数据是:a a a a a 

                           a a a a a

其他同理,

得到最终的结果如下:


可以看到文件名和数据为一行存在同一个文件当中!

这篇关于mapreduce将若干小文件合成大文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1138506

相关文章

概率DP (由一道绿题引起的若干问题。目前为一些老题,蒟蒻的尝试学习1.0)

概率DP: 利用动态规划去解决 概率 期望 的题目。 概率DP 求概率(采用顺推) 从 初始状态推向结果,同一般的DP类似,只是经历了概率论知识的包装。 老题: 添加链接描述 题意: 袋子里有w只白鼠,b只黑鼠,A和B轮流从袋子里抓,谁先抓到白色谁就赢。A每次随机抓一只,B每次随机 抓完一只后 会有另外一只随机老鼠跑出来。如果两个人都没有抓到白色,那么B赢。A先抓,问A赢得概率。 w b 均在

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式

MapReduce算法 – 反转排序(Order Inversion)

译者注:在刚开始翻译的时候,我将Order Inversion按照字面意思翻译成“反序”或者“倒序”,但是翻译完整篇文章之后,我感觉到,将Order Inversion翻译成反序模式是不恰当的,根据本文的内容,很显然,Inversion并非是将顺序倒排的意思,而是如同Spring的IOC一样,表明的是一种控制权的反转。Spring将对象的实例化责任从业务代码反转给了框架,而在本文的模式中,在map

unity导入半透明webm + AE合成半透明视频

有些webm的文件导入unity后无法正常播报,踩坑好久才知道需要webm中的:VP8 标准 现在手上有几条mp4双通道的视频,当然unity中有插件是可以支持这种视频的,为了省事和代码洁癖,毅然决然要webm走到黑。 mp4导入AE合成半透明 打开 AE 软件,创建一个新的合成项目。在项目面板中,选择 “导入” 或直接将 MP4 视频文件拖放到项目面板中,导入视频素材。项目面板中,右击

三文带你轻松上手鸿蒙的AI语音03-文本合成声音

三文带你轻松上手鸿蒙的AI语音03-文本合成声音 前言 接上文 三文带你轻松上手鸿蒙的AI语音02-声音文件转文本 HarmonyOS NEXT 提供的AI 文本合并语音功能,可以将一段不超过10000字符的文本合成为语音并进行播报。 场景举例 手机在无网状态下,系统应用无障碍(屏幕朗读)接入文本转语音能力,为视障人士提供播报能力。类似微信读书,可以实现将文章内容通过语音朗读,可以

圆形缓冲区-MapReduce中的

这篇文章来自一个读者在面试过程中的一个问题,Hadoop在shuffle过程中使用了一个数据结构-环形缓冲区。 环形队列是在实际编程极为有用的数据结构,它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组织简单。能很快知道队列是否满为空。能以很快速度的来存取数据。 因为有简单高效的原因,甚至在硬件都实现了环形队列。 环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用

【硬刚Hadoop】HADOOP MAPREDUCE(11):Join应用

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 Reduce Join Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。 Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在

【硬刚Hadoop】HADOOP MAPREDUCE(10):OutputFormat数据输出

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 OutputFormat接口实现类 2 自定义OutputFormat 3 自定义OutputFormat案例实操 1.需求 过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/o

【硬刚Hadoop】HADOOP MAPREDUCE(9):MapReduce内核源码解析(2)ReduceTask工作机制

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1.ReduceTask工作机制 ReduceTask工作机制,如图4-19所示。 图4-19 ReduceTask工作机制 (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中

【硬刚Hadoop】HADOOP MAPREDUCE(8):MapReduce内核源码解析(1)MapTask工作机制

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 MapTask工作机制 MapTask工作机制如图4-12所示。 图4-12  MapTask工作机制 (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出