MapReduce高级编程之自定义InputFormat

2024-03-19 13:58

本文主要是介绍MapReduce高级编程之自定义InputFormat,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?
InputFormat其实是一个接口,包含了两个方法:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
这两个方法有分别完成着以下工作:
方法getSplits将输入数据切分成splits, splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M
方法getSplits将每个split解析成records, 再依次将record解析成<K,V>对
也就是说InputFormat完成以下工作:
InputFile --> splits--> <K,V>
系统常用的 InputFormat 又有哪些呢?
其中TextInputFormat便是最常用的,它的<K,V>就代表<行偏移,该行内容>
然而系统所提供的这几种固定的将 InputFile转换为<K,V>的方式有时候并不能满足我们的需求:
此时需要我们自定义InputFormat ,从而使 Hadoop框架按照我们预设的方式来将
InputFile解析为<K,V>
在领会自定义InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:
InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader(interface), LineRecordReader(class)的关系
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.getRecordReader calls LineRecordReader
LineRecordReader  implements RecordReader
对于InputFormat接口,上面已经有详细的描述
再看看FileInputFormat,它实现了InputFormat接口中的getSplits方法,而将getRecordReader与isSplitable留给具体类(如TextInputFormat)实现,isSplitable方法通常不用修改,所以只需要在自定义的InputFormat中实现
getRecordReader方法即可,而该方法的核心是调用LineRecordReader(即由LineRecorderReader类来实现 "将每个split解析成records, 再依次将record解析成<K,V>对"),该方法实现了接口RecordReader
public interface RecordReader<K, V> {
booleannext(K key, V value) throws IOException;
KcreateKey();
VcreateValue();
longgetPos() throws IOException;
public voidclose() throws IOException;
floatgetProgress() throws IOException;
}

因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader
示例,数据每一行为 “物体,x坐标,y坐标,z坐标
ball 3.5,12.7,9.0
car 15,23.76,42.23
device 0.0,12.4,-67.1
每一行将要被解析为<Text, Point3D>(Point3D是我们在上一篇日志中自定义的数据类型)
方式一,自定义的RecordReader使用中LineRecordReader,
public class ObjectPositionInputFormat extends
FileInputFormat<Text, Point3D> {
public RecordReader<Text, Point3D> getRecordReader (
InputSplit input, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
}
class ObjPosRecordReader implements RecordReader<Text, Point3D> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public ObjPosRecordReader (JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
public boolean next (Text key, Point3D value) throws IOException {
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false ;
}
// parse the lineValue which is in the format:
// objName, x, y, z
String [] pieces = lineValue.toString().split( "," );
if (pieces.length != 4) {
throw new IOException( "Invalid record received");
}
// try to parse floating point components of value
float fx, fy, fz;
try {
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
} catch (NumberFormatException nfe) {
throw new IOException( "Error parsing floating point value in record" );
}
// now that we know we'll succeed, overwrite the output objects
key.set(pieces[0].trim()); // objName is the output key.
value.x = fx;
value.y = fy;
value.z = fz;
return true ;
}
public Text createKey () {
return new Text( "" );
}
public Point3D createValue () {
return new Point3D();
}
public long getPos () throws IOException {
return lineReader.getPos();
}
public void close () throws IOException {
lineReader.close();
}
public float getProgress () throws IOException {
return lineReader.getProgress();
}
}
方式二:自定义的RecordReader中使用LineReader,
public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> {
@ Override
protected boolean isSplitable (JobContext context, Path filename) {
// TODO Auto-generated method stub
return false ;
}
@ Override
public RecordReader<Text, Point3D> createRecordReader (InputSplit inputsplit,
TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new objPosRecordReader();
}
public static class objPosRecordReader extends RecordReader<Text,Point3D>{
public LineReader in;
public Text lineKey;
public Point3D lineValue;
public StringTokenizer token= null ;
public Text line;
@ Override
public void close () throws IOException {
// TODO Auto-generated method stub
}
@ Override
public Text getCurrentKey () throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println( "key" );
//lineKey.set(token.nextToken());
System.out.println( "hello" );
return lineKey;
}
@ Override
public Point3D getCurrentValue () throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return lineValue;
}
@ Override
public float getProgress () throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@ Override
public void initialize (InputSplit input, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split=(FileSplit)input;
Configuration job=context.getConfiguration();
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job);
FSDataInputStream filein=fs.open(file);
in= new LineReader(filein,job);
line= new Text();
lineKey= new Text();
lineValue= new Point3D();
}
@ Override
public boolean nextKeyValue () throws IOException, InterruptedException {
// TODO Auto-generated method stub
int linesize=in.readLine(line);
if (linesize==0)
return false ;
token= new StringTokenizer(line.toString());
String []temp= new String[2];
if (token.hasMoreElements()){
temp[0]=token.nextToken();
if (token.hasMoreElements()){
temp[1]=token.nextToken();
}
}
System.out.println(temp[0]);
System.out.println(temp[1]);
String []points=temp[1].split( "," );
System.out.println(points[0]);
System.out.println(points[1]);
System.out.println(points[2]);
lineKey.set(temp[0]);
lineValue.set(Float.parseFloat(points[0]),Float.parseFloat(points[1]), Float.parseFloat(points[2]));
System.out.println( "pp" );
return true ;
}
}
}
从以上可以看出,自定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader,而在其中可能会到LineReader/LineRecordReader/KeyValueLineRecordReader
因此,要自定义InputFormat,这三个类的源码就必须很熟悉~

这篇关于MapReduce高级编程之自定义InputFormat的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/826210

相关文章

Python中列表的高级索引技巧分享

《Python中列表的高级索引技巧分享》列表是Python中最常用的数据结构之一,它允许你存储多个元素,并且可以通过索引来访问这些元素,本文将带你深入了解Python列表的高级索引技巧,希望对... 目录1.基本索引2.切片3.负数索引切片4.步长5.多维列表6.列表解析7.切片赋值8.删除元素9.反转列表

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

正则表达式高级应用与性能优化记录

《正则表达式高级应用与性能优化记录》本文介绍了正则表达式的高级应用和性能优化技巧,包括文本拆分、合并、XML/HTML解析、数据分析、以及性能优化方法,通过这些技巧,可以更高效地利用正则表达式进行复杂... 目录第6章:正则表达式的高级应用6.1 模式匹配与文本处理6.1.1 文本拆分6.1.2 文本合并6

C#反射编程之GetConstructor()方法解读

《C#反射编程之GetConstructor()方法解读》C#中Type类的GetConstructor()方法用于获取指定类型的构造函数,该方法有多个重载版本,可以根据不同的参数获取不同特性的构造函... 目录C# GetConstructor()方法有4个重载以GetConstructor(Type[]

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。