本文主要是介绍mapreduce源码分析作业分配过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前面提到作业初始化将创建一系列的TaskInProgress缓存到内存,等待各个tasktracker结点向jobtracker发送心跳请求任务,由jobtracker端的调度器分配任务,默认JobQueueTaskScheduler,具体实现对应assignTasks方法
assignTasks核心算法 :
1、对个某个tasktracker,计算可用的slot数目,调度器会尽量将任务均匀分布各个结点上,负载均衡.
具体做法是:
分别针对reduce和map计算:
首先算出针对该结点的的一个因子factor:请求作业的总任务数 - 该作业已完成的任务数/集群总的任务数 (扫描jobqueue里各作业)
再算可用的slot数: factor*该结点总的slot数 - 该结点正在运行的任务数
2、先后调用jobinprogess的obtainNewLocalMapTask、obtainNewNonLocalMapTask、obtainNewReduceTask方法,返回Task类任务,再以LaunchAction的形式封装发回到tasktracker去执行,以obtainNewLocalMapTask为例,最终调用的是同一个类中findNewMapTask方法,findNewMapTask会返回离tasktracker最近的task(依次从本结点\本机架\本数据中心去选择,从未运行任务缓存去取,由作业初始化Map<Node, List<TaskInProgress>> createcache创建赋值)
部分核心代码:
assignTasks方法
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();
//
// Get map + reduce counts for the current tracker.
//
final int trackerMapCapacity = taskTracker.getMaxMapTasks();
final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
final int trackerRunningMaps = taskTracker.countMapTasks();
final int trackerRunningReduces = taskTracker.countReduceTasks();
//此处taskTracker为心跳发送过来的
这篇关于mapreduce源码分析作业分配过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!