本文主要是介绍Hadoop Map/Reduce Implementation,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
原文链接: http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html
HadoopMap/Reduce Implementation
In my previous post, I talk aboutthe methodology of transforming a sequential algorithm into parallel. Afterthat, we can implement the parallel algorithm, one of the popular framework wecan use is the Apache Opensource Hadoop Map/Reduce framework.
Functional Programming
Multithreading is one of the popular way of doing parallel programming, butmajor complexity of multi-thread programming is to co-ordinate the access ofeach thread to the shared data. We need things like semaphores, locks, and alsouse them with great care, otherwise dead locks will result.
If we can eliminate the shared state completely, then the complexity ofco-ordination will disappear. This is the fundamental concept of functionalprogramming. Data is explicitly passed between functions as parameters orreturn values which can only be changed by the active function at that moment.Imagine functions are connected to each other via a directed acyclic graph.Since there is no hidden dependency (via shared state), functions in the DAGcan run anywhere in parallel as long as one is not an ancestor of the other. Inother words, analyze the parallelism is much easier when there is no hiddendependency from shared state.
Map/Reduce functions
Map/reduce is a special form of such a DAG which is applicable in a wide rangeof use cases. It is organized as a “map” function which transform a piece ofdata into some number of key/value pairs. Each of these elements will then besorted by their key and reach to the same node, where a “reduce” function isuse to merge the values (of the same key) into a single result.
map(input_record) {
…
emit(k1, v1)
…
emit(k2, v2)
…
}
reduce (key, values) {
aggregate = initialize()
while (values.has_next) {
aggregate = merge(values.next)
}
collect(key, aggregate)
}
The Map/Reduce DAG is organized in this way.
A parallel algorithm is usually structure as multiple rounds of Map/Reduce
Distributed File Systems
The distributed file system is designed to handle large files (multi-GB) withsequential read/write operation. Each file is broken into chunks, and storedacross multiple data nodes as local OS files.
There is a master “NameNode” to keep track of overall file directory structureand the placement of chunks. This NameNode is the central control point and mayre-distributed replicas as needed.
To read a file, the client API will calculate the chunk index based on theoffset of the file pointer and make a request to the NameNode. The NameNodewill reply which DataNodes has a copy of that chunk. From this points, theclient contacts the DataNode directly without going through the NameNode.
To write a file, client API will first contact the NameNode who will designateone of the replica as the primary (by granting it a lease). The response of theNameNode contains who is the primary and who are the secondary replicas. Thenthe client push its changes to all DataNodes in any order, but this change isstored in a buffer of each DataNode. After changes are buffered at allDataNodes, the client send a “commit” request to the primary, which determinesan order to update and then push this order to all other secondaries. After allsecondaries complete the commit, the primary will response to the client aboutthe success.
All changes of chunk distribution and metadata changes will be written to anoperation log file at the NameNode. This log file maintain an order list ofoperation which is important for the NameNode to recover its view after acrash. The NameNode also maintain its persistent state by regularlycheck-pointing to a file.
In case of the NameNode crash, all lease granting operation will fail and soany write operation is effectively fail also. Read operation shouldcontinuously to work as long as the clinet program has a handle to theDataNode. To recover from NameNode crash, a new NameNode can take over afterrestoring the state from the last checkpoint file and replay the operation log.
When a DataNode crashes, it will be detected by the NameNode after missing itshearbeat for a while. The NameNode removes the crashed DataNode from thecluster and spread its chunks to other surviving DataNodes. This way, thereplication factor of each chunk will be maintained across the cluster.
Later when the DataNode recover and rejoin the cluster, it reports all itschunks to the NameNode at boot time. Each chunk has a version number which willadvanced at each update. Therefore, the NameNode can easily figure out if anyof the chunks of a DataNode becomes stale. Those stale chunks will be garbagecollected at a later time.
Job Execution
Hadoop MapRed is based on a “pull” model where multiple “TaskTrackers” poll the“JobTracker” for tasks (either map task or reduce task).
The job execution starts when the client program uploading three files:“job.xml” (the job config including map, combine, reduce function andinput/output data path, etc.), “job.split” (specifies how many splits and rangebased on dividing files into ~16 – 64 MB size), “job.jar” (the actual Mapperand Reducer implementation classes) to the HDFS location (specified by the“mapred.system.dir” property in the “hadoop-default.conf” file). Then theclient program notifies the JobTracker about the Job submission. The JobTrackerreturns a Job id to the client program and starts allocating map tasks to theidle TaskTrackers when they poll for tasks.
Each TaskTracker has a defined number of "task slots" based on thecapacity of the machine. There are heartbeat protocol allows the JobTracker toknow how many free slots from each TaskTracker. The JobTracker will determineappropriate jobs for the TaskTrackers based on how busy thay are, their networkproximity to the data sources (preferring same node, then same rack, then samenetwork switch). The assigned TaskTrackers will fork a MapTask (separate JVMprocess) to execute the map phase processing. The MapTask extracts the inputdata from the splits by using the “RecordReader” and “InputFormat” and itinvokes the user provided “map” function which emits a number of key/value pairin the memory buffer.
When the buffer is full, the output collector will spill thememory buffer into disk. For optimizing the network bandwidth, an optional“combine” function can be invoked to partially reduce values of each key.Afterwards, the “partition” function is invoked on each key to calculate itsreducer node index. The memory buffer is eventually flushed into 2 files, thefirst index file contains an offset pointer of each partition. The second datafile contains all records sorted by partition and then by key.
When the map task has finished executing all input records, it start the commitprocess, it first flush the in-memory buffer (even it is not full) to the index+ data file pair. Then a merge sort for all index + data file pairs will beperformed to create a single index + data file pair.
The index + data file pair will then be splitted into are R local directories,one for each partition. After all the MapTask completes (all splits are done),the TaskTracker will notify the JobTracker which keeps track of the overallprogress of job. JobTracker also provide a web interface for viewing the jobstatus.
When the JobTracker notices that some map tasks are completed, it will startallocating reduce tasks to subsequent polling TaskTrackers (there are RTaskTrackers will be allocated for reduce task). These allocated TaskTrackersremotely download the region files (according to the assigned reducer index)from the completed map phase nodes and concatenate (merge sort) them into asingle file. Whenever more map tasks are completed afterwards, JobTracker willnotify these allocated TaskTrackers to download more region files (merge withprevious file). In this manner, downloading region files are interleaved withthe map task progress. The reduce phase is not started at this moment yet.
Eventually all the map tasks are completed. The JobTracker then notifies allthe allocated TaskTrackers to proceed to the reduce phase. Each allocatedTaskTracker will fork a ReduceTask (separate JVM) to read the downloaded file(which is already sorted by key) and invoke the “reduce” function, which collectsthe key/aggregatedValue into the final output file (one per reducer node). Notethat each reduce task (and map task as well) is single-threaded. And thisthread will invoke the reduce(key, values) function in assending (ordescending) order of the keys assigned to this reduce task. This provides aninteresting property that all entries written by the reduce() function issorted in increasing order. The output of each reducer is written to a tempoutput file in HDFS. When the reducer finishes processing all keys, the tempoutput file will be renamed atomically to its final output filename.
The Map/Reduce framework is resilient to crashes of any components. TaskTrackernodes periodically report their status to the JobTracker which keeps track ofthe overall job progress. If the JobTracker hasn’t heard from any TaskTrackernodes for a long time, it assumes the TaskTracker node has been crashed andwill reassign its tasks appropriately to other TaskTracker nodes. Since the mapphase result is stored in the local disk, which will not be available when theTaskTracker node crashes. In case a map-phase TaskTracker node crashes, thecrashed MapTasks (regardless of whether it is complete or not) will bereassigned to a different TaskTracker node, which will rerun all the assignedsplits. However, the reduce phase result is stored in HDFS, which is availableeven the TaskTracker node crashes. Therefore, in case a reduce-phaseTaskTracker node crashes, only the incomplete ReduceTasks need to be reassignedto a different TaskTracker node, where the incompleted reduce tasks will bere-run.
The job submission process is asynchronous. Client program can poll for the jobstatus at any time by supplying the job id.
这篇关于Hadoop Map/Reduce Implementation的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!