Hadoop pipes编程

2024-01-04 07:18
文章标签 编程 hadoop pipes

本文主要是介绍Hadoop pipes编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. Hadoop pipes编程介绍

Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++和Java的RecordReader, Mapper, Partitioner,Rducer和RecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:Hadoop Pipes设计原理

本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。

2. Hadoop pipes编程初体验

Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面简要介绍一下这三个程序。

(1) wordcount-simple.cc:Mapper和Reducer组件采用C++语言编写,RecordReader, Partitioner和RecordWriter采用Java语言编写,其中,RecordReader 为LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),Partitioner为PipesPartitioner,RecordWriter为LineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”)

(2) wordcount-part.cc:Mapper,Partitioner和Reducer组件采用C++语言编写,其他采用Java编写

(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++编写

接下来简单介绍一下wordcount-simple.cc的编译和运行方法。

在Hadoop的安装目录下,执行下面命令:

1
ant -Dcompile.c++=yes examples

则wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:

1
bin/hadoop  -put  build/c++-examples/Linux-amd64-64/bin/wordcount-simple  /user/XXX/ bin/

上传一份数据到HDFS的/user/XXX /pipes_test_data目录下:

1
bin/hadoop  -put  data.txt  /user/XXX /pipes_test_data

直接使用下面命令提交作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader= true \
-D hadoop.pipes.java.recordwriter= true \
-D mapred.job.name= wordcount \
-input /user/XXX /pipes_test_data \
-output /user/XXX /pipes_test_output \
-program /user/XXX/ bin/wordcount-simple

3. Hadoop pipes编程方法

先从最基础的两个组件Mapper和Reducer说起。

(1) Mapper编写方法

用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:

1
2
3
4
5
6
7
class Mapper: public Closable {
public :
virtual void map(MapContext& context) = 0;
};

用户必须实现map函数,它的参数是MapContext,该类的声明如下:

1
2
3
4
5
6
7
8
9
10
11
class MapContext: public TaskContext {
public :
virtual const std::string& getInputSplit() = 0;
virtual const std::string& getInputKeyClass() = 0;
virtual const std::string& getInputValueClass() = 0;
};

而TaskContext类地声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TaskContext {
public :
class Counter {
……
public :
Counter( int counterId) : id(counterId) {}
Counter( const Counter& counter) : id(counter.id) {}
……
};
virtual const JobConf* getJobConf() = 0;
virtual const std::string& getInputKey() = 0;
virtual const std::string& getInputValue() = 0;
virtual void emit( const std::string& key, const std::string& value) = 0;
virtual void progress() = 0;
…….
};

用户可以从context参数中获取当前的key,value,progress和inputsplit等数据信息,此外,还可以调用emit将结果回传给Java代码。

Mapper的构造函数带有一个HadoopPipes::TaskContext参数,用户可以通过它注册一些全局counter,对于程序调试和跟踪作业进度非常有用:

如果你想注册全局counter,在构造函数添加一些类似的代码:

1
2
3
4
5
6
7
WordCountMap(HadoopPipes::TaskContext& context) {
inputWords1 = context.getCounter(“group”, ”counter1”);
inputWords2 = context.getCounter(“group”, ”counter2”);
}

当需要增加counter值时,可以这样:

1
2
3
context.incrementCounter(inputWords1, 1);
context.incrementCounter(inputWords2, 1);

其中getCounter的两个参数分别为组名和组内计数器名,一个组中可以存在多个counter。

用户自定义的counter会在程序结束时,输出到屏幕上,当然,用户可以用通过web界面看到。

(2) Reducer编写方法

Reducer组件的编写方法跟Mapper组件类似,它需要继承虚基类public HadoopPipes::Reducer。

与Mapper组件唯一不同的地方时,map函数的参数类型为HadoopPipes::ReduceContext,它包含一个nextValue()方法,这允许用于遍历当前key对应的value列表,依次进行处理。

接下来介绍RecordReader, Partitioner和RecordWriter的编写方法:

(3) RecordReader编写方法

用户自定义的RecordReader类需要继承虚基类HadoopPipes::RecordReader,它的声明如下:

1
2
3
4
5
6
7
8
9
class RecordReader: public Closable {
public :
virtual bool next(std::string& key, std::string& value) = 0;
virtual float getProgress() = 0;
};

用户需要实现next和 getProgress两个方法。

用户自定义的RecordReader的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getInputSplit()的方法,用户可以获取经过序列化的InpuSplit对象,Java端采用不同的InputFormat可导致InputSplit对象格式不同,但对于大多数InpuSplit对象,它们可以提供至少三个信息:当前要处理的InputSplit所在的文件名,所在文件中的偏移量,它的长度。用户获取这三个信息后,可使用libhdfs库读取文件,以实现next方法。

下面介绍一下反序列化InputSplit对象的方法:

【1】 如果Java端采用的InputFormat为WordCountInpuFormat,可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class XXXReader: public HadoopPipes::RecordReader {
public :
XXXReader (HadoopPipes::MapContext& context) {
std::string filename;
HadoopUtils::StringInStream stream(context.getInputSplit());
HadoopUtils::deserializeString(filename, stream);
……
};

【2】 如果Java端采用的InputFormat为TextInpuFormat,可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
class XXXReader: public HadoopPipes::RecordReader {
public :
XXXReader (HadoopPipes::MapContext& context) {
std::string filename;
HadoopUtils::StringInStream stream(context.getInputSplit());
readString(filename, stream);
int start = ( int )readLong(stream);
int len = ( int )readLong(stream);
……
private :
void readString(std::string& t, HadoopUtils::StringInStream& stream)
{
int len = readShort(stream);
if (len > 0) {
// resize the string to the right length
t.resize(len);
// read into the string in 64k chunks
const int bufSize = 65536;
int offset = 0;
char buf[bufSize];
while (len > 0) {
int chunkLength = len > bufSize ? bufSize : len;
stream.read(buf, chunkLength);
t.replace(offset, chunkLength, buf, chunkLength);
offset += chunkLength;
len -= chunkLength;
}
} else {
t.clear();
}
}
long readLong(HadoopUtils::StringInStream& stream) {
long n;
char b;
stream.read(&b, 1);
n = ( long )(b & 0xff) << 56 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 48 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 40 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 32 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 24 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 16 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 8 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) ;
return n;
}
};

(4) Partitioner编写方法

用户自定义的Partitioner类需要继承虚基类HadoopPipes:: Partitioner,它的声明如下:

1
2
3
4
5
6
7
8
9
class Partitioner {
public :
virtual int partition( const std::string& key, int numOfReduces) = 0;
virtual ~Partitioner() {}
};

用户需要实现partition方法和 析构函数。

对于partition方法,框架会自动为它传入两个参数,分别为key值和reduce task的个数numOfReduces,用户只需返回一个0~ numOfReduces-1的值即可。

(5) RecordWriter编写方法

用户自定义的RecordWriter类需要继承虚基类HadoopPipes:: RecordWriter,它的声明如下:

1
2
3
4
5
6
7
8
9
class RecordWriter: public Closable {
public :
virtual void emit( const std::string& key,
const std::string& value) = 0;
};

用户自定的RecordWriter的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getJobConf()可获取一个HadoopPipes::JobConf的对象,用户可从该对象中获取该reduce task的各种参数,如:该reduce task的编号(这对于确定输出文件名有用),reduce task的输出目录等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MyWriter: public HadoopPipes::RecordWriter {
public :
MyWriter(HadoopPipes::ReduceContext& context) {
const HadoopPipes::JobConf* job = context.getJobConf();
int part = job->getInt( "mapred.task.partition" );
std::string outDir = job->get( "mapred.work.output.dir" );
……
}
}

用户需实现emit方法,将数据写入某个文件。

4. Hadoop pipes编程示例

网上有很多人怀疑Hadoop pipes自带的程序wordcount-nopipe.cc不能运行,各个论坛都有讨论,在此介绍该程序的设计原理和运行方法。

该运行需要具备以下前提:

(1) 采用的InputFormat为WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中

(2) 输入目录和输出目录需位于各个datanode的本地磁盘上,格式为:file:///home/xxx/pipes_test (注意,hdfs中的各种接口同时支持本地路径和HDFS路径,如果是HDFS上的路径,需要使用hdfs://host:9000/user/xxx,表示/user/xxx为namenode 为host的hdfs上的路径,而本地路径,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test为本地路径。例如,bin/hadoop fs –ls file:///home/xxx/pipes_test表示列出本地磁盘上/home/xxx/pipes_tes下的文件)

待确定好各个datanode的本地磁盘上有输入数据/home/xxx/pipes_test/data.txt后,用户首先上传可执行文件到HDFS中:

1
bin/hadoop  -put  build/c++-examples/Linux-amd64-64/bin/wordcount-nopipe  /user/XXX/bin/

然后使用下面命令提交该作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader= false \
-D hadoop.pipes.java.recordwriter= false \
-D mapred.job.name=wordcount \
-D mapred.input.format. class =org.apache.hadoop.mapred.pipes.WordCountInputFormat \
-libjars hadoop-0.20.2-test.jar \
-input file: ///home/xxx/pipes_test/data.txt \
-output file: ///home/xxx/pipes_output \
-program /user/XXX/bin/wordcount-nopipe

5. Hadoop pipes高级编程

如果用户需要在mapreduce作业中加载词典或者传递参数,可这样做:

(1) 提交作业时,用-files选项,将词典(需要传递参数可以放到一个配置文件中)上传给各个datanode,如:

1
2
3
4
5
6
7
8
9
10
11
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader= false \
-D hadoop.pipes.java.recordwriter= false \
-D mapred.job.name=wordcount \
-files dic.txt \
….

(2) 在Mapper或者Reducer的构造函数中,将字典文件以本地文件的形式打开,并把内容保存到一个map或者set中,然后再map()或者reduce()函数中使用即可,如:

1
2
3
4
5
6
7
WordCountMap(HadoopPipes::TaskContext& context) {
file = fopen (“dic.txt”, "r" ); //C库函数
…….
}

为了提高系能,RecordReader和RecordWriter最好采用Java代码实现(或者重用Hadoop中自带的),这是因为Hadoop自带的C++库libhdfs采用JNI实现,底层还是要调用Java相关接口,效率很低,此外,如果要处理的文件为二进制文件或者其他非文本文件,libhdfs可能不好处理。

6. 总结

Hadoop pipes使C++程序员编写MapReduce作业变得可能,它简单好用,提供了用户所需的大部分功能。

1. Hadoop pipes编程介绍

Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:

本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。

2. Hadoop pipes编程初体验

Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面简要介绍一下这三个程序。

(1) wordcount-simple.cc:Mapper和Reducer组件采用C++语言编写,RecordReader, Partitioner和RecordWriter采用Java语言编写,其中,RecordReader 为LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),Partitioner为PipesPartitioner,RecordWriter为LineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”)

(2) wordcount-part.cc:Mapper,Partitioner和Reducer组件采用C++语言编写,其他采用Java编写

(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++编写

接下来简单介绍一下wordcount-simple.cc的编译和运行方法。

在Hadoop的安装目录下,执行下面命令:

ant -Dcompile.c++=yes examples

则wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:

bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/

上传一份数据到HDFS的/user/XXX /pipes_test_data目录下:

bin/hadoop -put data.txt /user/XXX /pipes_test_data

直接使用下面命令提交作业:

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=true \

-D hadoop.pipes.java.recordwriter=true \

-D mapred.job.name= wordcount \

-input /user/XXX /pipes_test_data \

-output /user/XXX /pipes_test_output \

-program /user/XXX/ bin/wordcount-simple

3. Hadoop pipes编程方法

先从最基础的两个组件Mapper和Reducer说起。

(1) Mapper编写方法

用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:

class Mapper: public Closable {

public:

virtual void map(MapContext& context) = 0;

};

用户必须实现map函数,它的参数是MapContext,该类的声明如下:

class MapContext: public TaskContext {

public:

virtual const std::string& getInputSplit() = 0;

virtual const std::string& getInputKeyClass() = 0;

virtual const std::string& getInputValueClass() = 0;

};

而TaskContext类地声明如下:

class TaskContext {

public:

class Counter {

……

public:

Counter(int counterId) : id(counterId) {}

Counter(const Counter& counter) : id(counter.id) {}

……

};

virtual const JobConf* getJobConf() = 0;

virtual const std::string& getInputKey() = 0;

virtual const std::string& getInputValue() = 0;

virtual void emit(const std::string& key, const std::string& value) = 0;

virtual void progress() = 0;

…….

};

用户可以从context参数中获取当前的key,value,progress和inputsplit等数据信息,此外,还可以调用emit将结果回传给Java代码。

Mapper的构造函数带有一个HadoopPipes::TaskContext参数,用户可以通过它注册一些全局counter,对于程序调试和跟踪作业进度非常有用:

如果你想注册全局counter,在构造函数添加一些类似的代码:

WordCountMap(HadoopPipes::TaskContext& context) {

inputWords1 = context.getCounter(“group”, ”counter1”);

inputWords2 = context.getCounter(“group”, ”counter2”);

}

当需要增加counter值时,可以这样:

context.incrementCounter(inputWords1, 1);

context.incrementCounter(inputWords2, 1);

其中getCounter的两个参数分别为组名和组内计数器名,一个组中可以存在多个counter。

用户自定义的counter会在程序结束时,输出到屏幕上,当然,用户可以用通过web界面看到。

(2) Reducer编写方法

Reducer组件的编写方法跟Mapper组件类似,它需要继承虚基类public HadoopPipes::Reducer。

与Mapper组件唯一不同的地方时,map函数的参数类型为HadoopPipes::ReduceContext,它包含一个nextValue()方法,这允许用于遍历当前key对应的value列表,依次进行处理。

接下来介绍RecordReader, Partitioner和RecordWriter的编写方法:

(3) RecordReader编写方法

用户自定义的RecordReader类需要继承虚基类HadoopPipes::RecordReader,它的声明如下:

class RecordReader: public Closable {

public:

virtual bool next(std::string& key, std::string& value) = 0;

virtual float getProgress() = 0;

};

用户需要实现next和 getProgress两个方法。

用户自定义的RecordReader的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getInputSplit()的方法,用户可以获取经过序列化的InpuSplit对象,Java端采用不同的InputFormat可导致InputSplit对象格式不同,但对于大多数InpuSplit对象,它们可以提供至少三个信息:当前要处理的InputSplit所在的文件名,所在文件中的偏移量,它的长度。用户获取这三个信息后,可使用libhdfs库读取文件,以实现next方法。

(4) Partitioner编写方法

用户自定义的Partitioner类需要继承虚基类HadoopPipes:: Partitioner,它的声明如下:

class Partitioner {

public:

virtual int partition(const std::string& key, int numOfReduces) = 0;

virtual ~Partitioner() {}

};

用户需要实现partition方法和 析构函数。

对于partition方法,框架会自动为它传入两个参数,分别为key值和reduce task的个数numOfReduces,用户只需返回一个0~ numOfReduces-1的值即可。

(5) RecordWriter编写方法

用户自定义的RecordWriter类需要继承虚基类HadoopPipes:: RecordWriter,它的声明如下:

class RecordWriter: public Closable {

public:

virtual void emit(const std::string& key,

const std::string& value) = 0;

};

用户自定的RecordWriter的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getJobConf()可获取一个HadoopPipes::JobConf的对象,用户可从该对象中获取该reduce task的各种参数,如:该reduce task的编号(这对于确定输出文件名有用),reduce task的输出目录等。

class WordCountWriter: public HadoopPipes::RecordWriter {

public:

MyWriter(HadoopPipes::ReduceContext& context) {

const HadoopPipes::JobConf* job = context.getJobConf();

int part = job->getInt(“mapred.task.partition”);

std::string outDir = job->get(“mapred.work.output.dir”);

……

}

}

用户需实现emit方法,将数据写入某个文件。

4. Hadoop pipes编程示例

网上有很多人怀疑Hadoop pipes自带的程序wordcount-nopipe.cc不能运行,各个论坛都有讨论,在此介绍该程序的设计原理和运行方法。

该运行需要具备以下前提:

(1) 采用的InputFormat为WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中

(2) 输入目录和输出目录需位于各个datanode的本地磁盘上,格式为:file:///home/xxx/pipes_test (注意,hdfs中的各种接口同时支持本地路径和HDFS路径,如果是HDFS上的路径,需要使用hdfs://host:9000/user/xxx,表示/user/xxx为namenode 为host的hdfs上的路径,而本地路径,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test为本地路径)

待确定好各个datanode的本地磁盘上有输入数据/home/xxx/pipes_test/data.txt后,用户首先上传可执行文件到HDFS中:

bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/

然后使用下面命令运行该程序:

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \

-libjars hadoop-0.20.2-test.jar \

-input file:/home/xxx/pipes_test/data.txt \

-output file:/home/xxx/pipes_output \

-program /user/XXX/ bin/wordcount-nopipe

5. Hadoop pipes高级编程

如果用户需要在mapreduce作业中加载词典或者传递参数,可这样做:

(1) 提交作业时,用-files选项,将词典(需要传递参数可以放到一个配置文件中)上传给各个datanode,如

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-files dic.txt \

….

(2) 在Mapper或者Reducer的构造函数中,将字典文件以本地文件的形式打开,并把内容保存到一个map或者set中,然后再map()或者reduce()函数中使用即可,如

WordCountMap(HadoopPipes::TaskContext& context) {

file = fopen(“dic.txt”, “r”); //C库函数

…….

}

为了提高系能,RecordReader和RecordWriter最好采用Java代码实现(或者重用Hadoop中自带的),这是因为Hadoop自带的C++库libhdfs采用JNI实现,底层还是要调用Java相关接口,效率很低,此外,如果要处理的文件为二进制文件或者其他非文本文件,libhdfs可能不好处理。

6. 总结

1. Hadoop pipes编程介绍

Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:

本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。

2. Hadoop pipes编程初体验

Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面简要介绍一下这三个程序。

(1) wordcount-simple.cc:Mapper和Reducer组件采用C++语言编写,RecordReader, Partitioner和RecordWriter采用Java语言编写,其中,RecordReader 为LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),Partitioner为PipesPartitioner,RecordWriter为LineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”)

(2) wordcount-part.cc:Mapper,Partitioner和Reducer组件采用C++语言编写,其他采用Java编写

(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++编写

接下来简单介绍一下wordcount-simple.cc的编译和运行方法。

在Hadoop的安装目录下,执行下面命令:

ant -Dcompile.c++=yes examples

则wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:

bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/

上传一份数据到HDFS的/user/XXX /pipes_test_data目录下:

bin/hadoop -put data.txt /user/XXX /pipes_test_data

直接使用下面命令提交作业:

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=true \

-D hadoop.pipes.java.recordwriter=true \

-D mapred.job.name= wordcount \

-input /user/XXX /pipes_test_data \

-output /user/XXX /pipes_test_output \

-program /user/XXX/ bin/wordcount-simple

3. Hadoop pipes编程方法

先从最基础的两个组件Mapper和Reducer说起。

(1) Mapper编写方法

用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:

class Mapper: public Closable {

public:

virtual void map(MapContext& context) = 0;

};

用户必须实现map函数,它的参数是MapContext,该类的声明如下:

class MapContext: public TaskContext {

public:

virtual const std::string& getInputSplit() = 0;

virtual const std::string& getInputKeyClass() = 0;

virtual const std::string& getInputValueClass() = 0;

};

而TaskContext类地声明如下:

class TaskContext {

public:

class Counter {

……

public:

Counter(int counterId) : id(counterId) {}

Counter(const Counter& counter) : id(counter.id) {}

……

};

virtual const JobConf* getJobConf() = 0;

virtual const std::string& getInputKey() = 0;

virtual const std::string& getInputValue() = 0;

virtual void emit(const std::string& key, const std::string& value) = 0;

virtual void progress() = 0;

…….

};

用户可以从context参数中获取当前的key,value,progress和inputsplit等数据信息,此外,还可以调用emit将结果回传给Java代码。

Mapper的构造函数带有一个HadoopPipes::TaskContext参数,用户可以通过它注册一些全局counter,对于程序调试和跟踪作业进度非常有用:

如果你想注册全局counter,在构造函数添加一些类似的代码:

WordCountMap(HadoopPipes::TaskContext& context) {

inputWords1 = context.getCounter(“group”, ”counter1”);

inputWords2 = context.getCounter(“group”, ”counter2”);

}

当需要增加counter值时,可以这样:

context.incrementCounter(inputWords1, 1);

context.incrementCounter(inputWords2, 1);

其中getCounter的两个参数分别为组名和组内计数器名,一个组中可以存在多个counter。

用户自定义的counter会在程序结束时,输出到屏幕上,当然,用户可以用通过web界面看到。

(2) Reducer编写方法

Reducer组件的编写方法跟Mapper组件类似,它需要继承虚基类public HadoopPipes::Reducer。

与Mapper组件唯一不同的地方时,map函数的参数类型为HadoopPipes::ReduceContext,它包含一个nextValue()方法,这允许用于遍历当前key对应的value列表,依次进行处理。

接下来介绍RecordReader, Partitioner和RecordWriter的编写方法:

(3) RecordReader编写方法

用户自定义的RecordReader类需要继承虚基类HadoopPipes::RecordReader,它的声明如下:

class RecordReader: public Closable {

public:

virtual bool next(std::string& key, std::string& value) = 0;

virtual float getProgress() = 0;

};

用户需要实现next和 getProgress两个方法。

用户自定义的RecordReader的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getInputSplit()的方法,用户可以获取经过序列化的InpuSplit对象,Java端采用不同的InputFormat可导致InputSplit对象格式不同,但对于大多数InpuSplit对象,它们可以提供至少三个信息:当前要处理的InputSplit所在的文件名,所在文件中的偏移量,它的长度。用户获取这三个信息后,可使用libhdfs库读取文件,以实现next方法。

(4) Partitioner编写方法

用户自定义的Partitioner类需要继承虚基类HadoopPipes:: Partitioner,它的声明如下:

class Partitioner {

public:

virtual int partition(const std::string& key, int numOfReduces) = 0;

virtual ~Partitioner() {}

};

用户需要实现partition方法和 析构函数。

对于partition方法,框架会自动为它传入两个参数,分别为key值和reduce task的个数numOfReduces,用户只需返回一个0~ numOfReduces-1的值即可。

(5) RecordWriter编写方法

用户自定义的RecordWriter类需要继承虚基类HadoopPipes:: RecordWriter,它的声明如下:

class RecordWriter: public Closable {

public:

virtual void emit(const std::string& key,

const std::string& value) = 0;

};

用户自定的RecordWriter的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getJobConf()可获取一个HadoopPipes::JobConf的对象,用户可从该对象中获取该reduce task的各种参数,如:该reduce task的编号(这对于确定输出文件名有用),reduce task的输出目录等。

class WordCountWriter: public HadoopPipes::RecordWriter {

public:

MyWriter(HadoopPipes::ReduceContext& context) {

const HadoopPipes::JobConf* job = context.getJobConf();

int part = job->getInt(“mapred.task.partition”);

std::string outDir = job->get(“mapred.work.output.dir”);

……

}

}

用户需实现emit方法,将数据写入某个文件。

4. Hadoop pipes编程示例

网上有很多人怀疑Hadoop pipes自带的程序wordcount-nopipe.cc不能运行,各个论坛都有讨论,在此介绍该程序的设计原理和运行方法。

该运行需要具备以下前提:

(1) 采用的InputFormat为WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中

(2) 输入目录和输出目录需位于各个datanode的本地磁盘上,格式为:file:///home/xxx/pipes_test (注意,hdfs中的各种接口同时支持本地路径和HDFS路径,如果是HDFS上的路径,需要使用hdfs://host:9000/user/xxx,表示/user/xxx为namenode 为host的hdfs上的路径,而本地路径,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test为本地路径)

待确定好各个datanode的本地磁盘上有输入数据/home/xxx/pipes_test/data.txt后,用户首先上传可执行文件到HDFS中:

bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/

然后使用下面命令运行该程序:

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \

-libjars hadoop-0.20.2-test.jar \

-input file:/home/xxx/pipes_test/data.txt \

-output file:/home/xxx/pipes_output \

-program /user/XXX/ bin/wordcount-nopipe

5. Hadoop pipes高级编程

如果用户需要在mapreduce作业中加载词典或者传递参数,可这样做:

(1) 提交作业时,用-files选项,将词典(需要传递参数可以放到一个配置文件中)上传给各个datanode,如

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-files dic.txt \

….

(2) 在Mapper或者Reducer的构造函数中,将字典文件以本地文件的形式打开,并把内容保存到一个map或者set中,然后再map()或者reduce()函数中使用即可,如

WordCountMap(HadoopPipes::TaskContext& context) {

file = fopen(“dic.txt”, “r”); //C库函数

…….

}

为了提高系能,RecordReader和RecordWriter最好采用Java代码实现(或者重用Hadoop中自带的),这是因为Hadoop自带的C++库libhdfs采用JNI实现,底层还是要调用Java相关接口,效率很低,此外,如果要处理的文件为二进制文件或者其他非文本文件,libhdfs可能不好处理。

6. 总结

Hadoop pipes使C++程序员编写MapReduce作业变得可能,它简单好用,提供了用户所需的大部分功能。

Hadoop pipes使C++程序员编写MapReduce作业变得可能,它简单好用,提供了用户所需的大部分功能。

转自:http://dongxicheng.org/mapreduce/hadoop-pipes-programming/


这篇关于Hadoop pipes编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/568555

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)