Hadoop Map/Reduce Implementation

2024-06-13 15:48

本文主要是介绍Hadoop Map/Reduce Implementation,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文链接: http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html


HadoopMap/Reduce Implementation

In my previous post, I talk aboutthe methodology of transforming a sequential algorithm into parallel. Afterthat, we can implement the parallel algorithm, one of the popular framework wecan use is the Apache Opensource Hadoop Map/Reduce framework.

Functional Programming

Multithreading is one of the popular way of doing parallel programming, butmajor complexity of multi-thread programming is to co-ordinate the access ofeach thread to the shared data. We need things like semaphores, locks, and alsouse them with great care, otherwise dead locks will result.

If we can eliminate the shared state completely, then the complexity ofco-ordination will disappear. This is the fundamental concept of functionalprogramming. Data is explicitly passed between functions as parameters orreturn values which can only be changed by the active function at that moment.Imagine functions are connected to each other via a directed acyclic graph.Since there is no hidden dependency (via shared state), functions in the DAGcan run anywhere in parallel as long as one is not an ancestor of the other. Inother words, analyze the parallelism is much easier when there is no hiddendependency from shared state.

Map/Reduce functions

Map/reduce is a special form of such a DAG which is applicable in a wide rangeof use cases. It is organized as a “map” function which transform a piece ofdata into some number of key/value pairs. Each of these elements will then besorted by their key and reach to the same node, where a “reduce” function isuse to merge the values (of the same key) into a single result.


 

map(input_record) {

emit(k1, v1)

emit(k2, v2)

}

 

reduce (key, values) {

aggregate = initialize()

while (values.has_next) {

   aggregate = merge(values.next)

}

collect(key, aggregate)

}


The Map/Reduce DAG is organized in this way.



A parallel algorithm is usually structure as multiple rounds of Map/Reduce




Distributed File Systems

The distributed file system is designed to handle large files (multi-GB) withsequential read/write operation. Each file is broken into chunks, and storedacross multiple data nodes as local OS files.



There is a master “NameNode” to keep track of overall file directory structureand the placement of chunks. This NameNode is the central control point and mayre-distributed replicas as needed.

To read a file, the client API will calculate the chunk index based on theoffset of the file pointer and make a request to the NameNode. The NameNodewill reply which DataNodes has a copy of that chunk. From this points, theclient contacts the DataNode directly without going through the NameNode.

To write a file, client API will first contact the NameNode who will designateone of the replica as the primary (by granting it a lease). The response of theNameNode contains who is the primary and who are the secondary replicas. Thenthe client push its changes to all DataNodes in any order, but this change isstored in a buffer of each DataNode. After changes are buffered at allDataNodes, the client send a “commit” request to the primary, which determinesan order to update and then push this order to all other secondaries. After allsecondaries complete the commit, the primary will response to the client aboutthe success.

All changes of chunk distribution and metadata changes will be written to anoperation log file at the NameNode. This log file maintain an order list ofoperation which is important for the NameNode to recover its view after acrash. The NameNode also maintain its persistent state by regularlycheck-pointing to a file.

In case of the NameNode crash, all lease granting operation will fail and soany write operation is effectively fail also. Read operation shouldcontinuously to work as long as the clinet program has a handle to theDataNode. To recover from NameNode crash, a new NameNode can take over afterrestoring the state from the last checkpoint file and replay the operation log.

When a DataNode crashes, it will be detected by the NameNode after missing itshearbeat for a while. The NameNode removes the crashed DataNode from thecluster and spread its chunks to other surviving DataNodes. This way, thereplication factor of each chunk will be maintained across the cluster.

Later when the DataNode recover and rejoin the cluster, it reports all itschunks to the NameNode at boot time. Each chunk has a version number which willadvanced at each update. Therefore, the NameNode can easily figure out if anyof the chunks of a DataNode becomes stale. Those stale chunks will be garbagecollected at a later time.


Job Execution

Hadoop MapRed is based on a “pull” model where multiple “TaskTrackers” poll the“JobTracker” for tasks (either map task or reduce task).

The job execution starts when the client program uploading three files:“job.xml” (the job config including map, combine, reduce function andinput/output data path, etc.), “job.split” (specifies how many splits and rangebased on dividing files into ~16 – 64 MB size), “job.jar” (the actual Mapperand Reducer implementation classes) to the HDFS location (specified by the“mapred.system.dir” property in the “hadoop-default.conf” file). Then theclient program notifies the JobTracker about the Job submission. The JobTrackerreturns a Job id to the client program and starts allocating map tasks to theidle TaskTrackers when they poll for tasks.



Each TaskTracker has a defined number of "task slots" based on thecapacity of the machine. There are heartbeat protocol allows the JobTracker toknow how many free slots from each TaskTracker. The JobTracker will determineappropriate jobs for the TaskTrackers based on how busy thay are, their networkproximity to the data sources (preferring same node, then same rack, then samenetwork switch). The assigned TaskTrackers will fork a MapTask (separate JVMprocess) to execute the map phase processing. The MapTask extracts the inputdata from the splits by using the “RecordReader” and “InputFormat” and itinvokes the user provided “map” function which emits a number of key/value pairin the memory buffer.



When the buffer is full, the output collector will spill thememory buffer into disk. For optimizing the network bandwidth, an optional“combine” function can be invoked to partially reduce values of each key.Afterwards, the “partition” function is invoked on each key to calculate itsreducer node index. The memory buffer is eventually flushed into 2 files, thefirst index file contains an offset pointer of each partition. The second datafile contains all records sorted by partition and then by key.

When the map task has finished executing all input records, it start the commitprocess, it first flush the in-memory buffer (even it is not full) to the index+ data file pair. Then a merge sort for all index + data file pairs will beperformed to create a single index + data file pair.

The index + data file pair will then be splitted into are R local directories,one for each partition. After all the MapTask completes (all splits are done),the TaskTracker will notify the JobTracker which keeps track of the overallprogress of job. JobTracker also provide a web interface for viewing the jobstatus.

When the JobTracker notices that some map tasks are completed, it will startallocating reduce tasks to subsequent polling TaskTrackers (there are RTaskTrackers will be allocated for reduce task). These allocated TaskTrackersremotely download the region files (according to the assigned reducer index)from the completed map phase nodes and concatenate (merge sort) them into asingle file. Whenever more map tasks are completed afterwards, JobTracker willnotify these allocated TaskTrackers to download more region files (merge withprevious file). In this manner, downloading region files are interleaved withthe map task progress. The reduce phase is not started at this moment yet.

Eventually all the map tasks are completed. The JobTracker then notifies allthe allocated TaskTrackers to proceed to the reduce phase. Each allocatedTaskTracker will fork a ReduceTask (separate JVM) to read the downloaded file(which is already sorted by key) and invoke the “reduce” function, which collectsthe key/aggregatedValue into the final output file (one per reducer node). Notethat each reduce task (and map task as well) is single-threaded. And thisthread will invoke the reduce(key, values) function in assending (ordescending) order of the keys assigned to this reduce task. This provides aninteresting property that all entries written by the reduce() function issorted in increasing order. The output of each reducer is written to a tempoutput file in HDFS. When the reducer finishes processing all keys, the tempoutput file will be renamed atomically to its final output filename.

The Map/Reduce framework is resilient to crashes of any components. TaskTrackernodes periodically report their status to the JobTracker which keeps track ofthe overall job progress. If the JobTracker hasn’t heard from any TaskTrackernodes for a long time, it assumes the TaskTracker node has been crashed andwill reassign its tasks appropriately to other TaskTracker nodes. Since the mapphase result is stored in the local disk, which will not be available when theTaskTracker node crashes. In case a map-phase TaskTracker node crashes, thecrashed MapTasks (regardless of whether it is complete or not) will bereassigned to a different TaskTracker node, which will rerun all the assignedsplits. However, the reduce phase result is stored in HDFS, which is availableeven the TaskTracker node crashes. Therefore, in case a reduce-phaseTaskTracker node crashes, only the incomplete ReduceTasks need to be reassignedto a different TaskTracker node, where the incompleted reduce tasks will bere-run.

The job submission process is asynchronous. Client program can poll for the jobstatus at any time by supplying the job id.

 


这篇关于Hadoop Map/Reduce Implementation的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1057743

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Collection List Set Map的区别和联系

Collection List Set Map的区别和联系 这些都代表了Java中的集合,这里主要从其元素是否有序,是否可重复来进行区别记忆,以便恰当地使用,当然还存在同步方面的差异,见上一篇相关文章。 有序否 允许元素重复否 Collection 否 是 List 是 是 Set AbstractSet 否

Map

Map 是 Java 中用于存储键值对的集合接口。以下是对 Map 的详细介绍: 特点 键值对存储:每个元素包含一个键和一个值。 键唯一:键不能重复,但值可以重复。 无序/有序:根据具体实现,键值对的顺序可能无序(如 HashMap)或有序(如 TreeMap、LinkedHashMap)。 主要实现类 HashMap 基于哈希表,无序存储。 允许一个 null 键和多个 null 值。

Java中集合类Set、List和Map的区别

Java中的集合包括三大类,它们是Set、List和Map,它们都处于java.util包中,Set、List和Map都是接口,它们有各自的实现类。Set的实现类主要有HashSet和TreeSet,List的实现类主要有ArrayList,Map的实现类主要有HashMap和TreeMap。那么它们有什么区别呢? Set中的对象不按特定方式排序,并且没有重复对象。但它的有些实现类能对集合中的对

C++数据结构重要知识点(5)(哈希表、unordered_map和unordered_set封装)

1.哈希思想和哈希表 (1)哈希思想和哈希表的区别 哈希(散列、hash)是一种映射思想,本质上是值和值建立映射关系,key-value就使用了这种思想。哈希表(散列表,数据结构),主要功能是值和存储位置建立映射关系,它通过key-value模型中的key来定位数组的下标,将value存进该位置。 哈希思想和哈希表数据结构这两个概念要分清,哈希是哈希表的核心思想。 (2)unordered

【C++STL(十四)】一个哈希桶简单模拟实现unordered_map/set

目录 前言 一、改造哈希桶 改造结点类 改造主体  模板参数改造  迭代器(重点) 改造完整哈希桶 unordered_map模拟实现 unordered_set模拟实现 前言 前面的文章都有说unordered_map/set的底层结构就是哈希表,严格来说是哈希桶,那么接下来我们就尝试使用同一个哈希桶来模拟实现一下。整体的逻辑和一棵红黑树封装map/set类似,所以

Java中Map取值转String Null值处理

Map<String, Object> 直接取值转String String value = (String)map.get("key") 当map.get(“key”)为Null值时会报错。 使用String类的valueOf静态方法可以解决这个问题 String value = String.valueOf(map.get("key"))