本文主要是介绍关于Maptask任务单线程与多线程执行器解读,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
相比Mpareduce 老版本的API, 新版本的API 在maptask执行map任务的接口设计上有比较大的改动。
在老版的API中, MapRunner的run函数中:
public void run(RecordReader input, OutputCollector output, Reporter reporter)
throws IOException
{
Object key = input.createKey();
Object value = input.createValue();
do
{
if(!input.next(key, value))
break;
mapper.map(key, value, output, reporter);
if(incrProcCount)
reporter.incrCounter("SkippingTaskCounters", "MapProcessedRecords", 1L);
} while(true);
mapper.close();
break MISSING_BLOCK_LABEL_91;
Exception exception;
exception;
mapper.close();
throw exception;
}
从代码中可以发现,对于当前maptask的input,就是一个recoredreader,调用nextkey函数,获取key value对, 然后直接交给mapper的map函数来执行。
新版的API中,在maptask的runNewMapper函数里面,先获取当前的mapper类,
mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
然后直接调用mapper对象的run函数:
mapper.run(mapperContext);
在run函数里面,会将每个key value对 交给map函数进行处理。
一般情况下,我们都使用单线程处理器来完成map任务,mappreunner就是一个单线程处理器,同时hadoop也为maptask定义了多线程处理器,适合cpu多核条件下。MultithreadedMapRunner的代码如下:
/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib;
020
021 import org.apache.hadoop.util.ReflectionUtils;
022 import org.apache.hadoop.classification.InterfaceAudience;
023 import org.apache.hadoop.classification.InterfaceStability;
024 import org.apache.hadoop.mapred.MapRunnable;
025 import org.apache.hadoop.mapred.JobConf;
026 import org.apache.hadoop.mapred.Mapper;
027 import org.apache.hadoop.mapred.RecordReader;
028 import org.apache.hadoop.mapred.OutputCollector;
029 import org.apache.hadoop.mapred.Reporter;
030 import org.apache.hadoop.mapred.SkipBadRecords;
031 import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034
035 import java.io.IOException;
036 import java.util.concurrent.*;
037
038 /**
039 * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
040 * <p>
041 * It can be used instead of the default implementation,
042 * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
043 * bound in order to improve throughput.
044 * <p>
045 * Map implementations using this MapRunnable must be thread-safe.
046 * <p>
047 * The Map-Reduce job has to be configured to use this MapRunnable class (using
048 * the JobConf.setMapRunnerClass method) and
049 * the number of thread the thread-pool can use with the
050 * <code>mapred.map.multithreadedrunner.threads</code> property, its default
051 * value is 10 threads.
052 * <p>
053 */
054 @InterfaceAudience.Public
055 @InterfaceStability.Stable
056 public class MultithreadedMapRunner<K1, V1, K2, V2>
057 implements MapRunnable<K1, V1, K2, V2> {
058
059 private static final Log LOG =
060 LogFactory.getLog(MultithreadedMapRunner.class.getName());
061
062 private JobConf job;
063 private Mapper<K1, V1, K2, V2> mapper;
064 private ExecutorService executorService;
065 private volatile IOException ioException;
066 private volatile RuntimeException runtimeException;
067 private boolean incrProcCount;
068
069 @SuppressWarnings("unchecked")
070 public void configure(JobConf jobConf) {
071 int numberOfThreads =
072 jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
073 if (LOG.isDebugEnabled()) {
074 LOG.debug("Configuring jobConf " + jobConf.getJobName() +
075 " to use " + numberOfThreads + " threads");
076 }
077
078 this.job = jobConf;
079 //increment processed counter only if skipping feature is enabled
080 this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
081 SkipBadRecords.getAutoIncrMapperProcCount(job);
082 this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
083 jobConf);
084
085 // Creating a threadpool of the configured size to execute the Mapper
086 // map method in parallel.
087 executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
088 0L, TimeUnit.MILLISECONDS,
089 new BlockingArrayQueue
090 (numberOfThreads));
091 }
092
093 /**
094 * A blocking array queue that replaces offer and add, which throws on a full
095 * queue, to a put, which waits on a full queue.
096 */
097 private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
098
099 private static final long serialVersionUID = 1L;
100 public BlockingArrayQueue(int capacity) {
101 super(capacity);
102 }
103 public boolean offer(Runnable r) {
104 return add(r);
105 }
106 public boolean add(Runnable r) {
107 try {
108 put(r);
109 } catch (InterruptedException ie) {
110 Thread.currentThread().interrupt();
111 }
112 return true;
113 }
114 }
115
116 private void checkForExceptionsFromProcessingThreads()
117 throws IOException, RuntimeException {
118 // Checking if a Mapper.map within a Runnable has generated an
119 // IOException. If so we rethrow it to force an abort of the Map
120 // operation thus keeping the semantics of the default
121 // implementation.
122 if (ioException != null) {
123 throw ioException;
124 }
125
126 // Checking if a Mapper.map within a Runnable has generated a
127 // RuntimeException. If so we rethrow it to force an abort of the Map
128 // operation thus keeping the semantics of the default
129 // implementation.
130 if (runtimeException != null) {
131 throw runtimeException;
132 }
133 }
134
135 public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
136 Reporter reporter)
137 throws IOException {
138 try {
139 // allocate key & value instances these objects will not be reused
140 // because execution of Mapper.map is not serialized.
141 K1 key = input.createKey();
142 V1 value = input.createValue();
143
144 while (input.next(key, value)) {
145
146 executorService.execute(new MapperInvokeRunable(key, value, output,
147 reporter));
148
149 checkForExceptionsFromProcessingThreads();
150
151 // Allocate new key & value instances as mapper is running in parallel
152 key = input.createKey();
153 value = input.createValue();
154 }
155
156 if (LOG.isDebugEnabled()) {
157 LOG.debug("Finished dispatching all Mappper.map calls, job "
158 + job.getJobName());
159 }
160
161 // Graceful shutdown of the Threadpool, it will let all scheduled
162 // Runnables to end.
163 executorService.shutdown();
164
165 try {
166
167 // Now waiting for all Runnables to end.
168 while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
169 if (LOG.isDebugEnabled()) {
170 LOG.debug("Awaiting all running Mappper.map calls to finish, job "
171 + job.getJobName());
172 }
173
174 // NOTE: while Mapper.map dispatching has concluded there are still
175 // map calls in progress and exceptions would be thrown.
176 checkForExceptionsFromProcessingThreads();
177
178 }
179
180 // NOTE: it could be that a map call has had an exception after the
181 // call for awaitTermination() returing true. And edge case but it
182 // could happen.
183 checkForExceptionsFromProcessingThreads();
184
185 } catch (IOException ioEx) {
186 // Forcing a shutdown of all thread of the threadpool and rethrowing
187 // the IOException
188 executorService.shutdownNow();
189 throw ioEx;
190 } catch (InterruptedException iEx) {
191 throw new RuntimeException(iEx);
192 }
193
194 } finally {
195 mapper.close();
196 }
197 }
198
199
200 /**
201 * Runnable to execute a single Mapper.map call from a forked thread.
202 */
203 private class MapperInvokeRunable implements Runnable {
204 private K1 key;
205 private V1 value;
206 private OutputCollector<K2, V2> output;
207 private Reporter reporter;
208
209 /**
210 * Collecting all required parameters to execute a Mapper.map call.
211 * <p>
212 *
213 * @param key
214 * @param value
215 * @param output
216 * @param reporter
217 */
218 public MapperInvokeRunable(K1 key, V1 value,
219 OutputCollector<K2, V2> output,
220 Reporter reporter) {
221 this.key = key;
222 this.value = value;
223 this.output = output;
224 this.reporter = reporter;
225 }
226
227 /**
228 * Executes a Mapper.map call with the given Mapper and parameters.
229 * <p>
230 * This method is called from the thread-pool thread.
231 *
232 */
233 public void run() {
234 try {
235 // map pair to output
236 MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
237 if(incrProcCount) {
238 reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
239 SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
240 }
241 } catch (IOException ex) {
242 // If there is an IOException during the call it is set in an instance
243 // variable of the MultithreadedMapRunner from where it will be
244 // rethrown.
245 synchronized (MultithreadedMapRunner.this) {
246 if (MultithreadedMapRunner.this.ioException == null) {
247 MultithreadedMapRunner.this.ioException = ex;
248 }
249 }
250 } catch (RuntimeException ex) {
251 // If there is a RuntimeException during the call it is set in an
252 // instance variable of the MultithreadedMapRunner from where it will be
253 // rethrown.
254 synchronized (MultithreadedMapRunner.this) {
255 if (MultithreadedMapRunner.this.runtimeException == null) {
256 MultithreadedMapRunner.this.runtimeException = ex;
257 }
258 }
259 }
260 }
261 }
262
263 }
这篇关于关于Maptask任务单线程与多线程执行器解读的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!