关于Maptask任务单线程与多线程执行器解读

2023-12-01 22:32

本文主要是介绍关于Maptask任务单线程与多线程执行器解读,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相比Mpareduce 老版本的API, 新版本的API 在maptask执行map任务的接口设计上有比较大的改动。  

在老版的API中, MapRunner的run函数中:

public void run(RecordReader input, OutputCollector output, Reporter reporter)
        throws IOException
    {
        Object key = input.createKey();
        Object value = input.createValue();
        do
        {
            if(!input.next(key, value))
                break;
            mapper.map(key, value, output, reporter);
            if(incrProcCount)
                reporter.incrCounter("SkippingTaskCounters", "MapProcessedRecords", 1L);
        } while(true);
        mapper.close();
        break MISSING_BLOCK_LABEL_91;
        Exception exception;
        exception;
        mapper.close();
        throw exception;
    }

从代码中可以发现,对于当前maptask的input,就是一个recoredreader,调用nextkey函数,获取key value对, 然后直接交给mapper的map函数来执行。

新版的API中,在maptask的runNewMapper函数里面,先获取当前的mapper类, 

mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

然后直接调用mapper对象的run函数:

mapper.run(mapperContext);

在run函数里面,会将每个key value对 交给map函数进行处理。


一般情况下,我们都使用单线程处理器来完成map任务,mappreunner就是一个单线程处理器,同时hadoop也为maptask定义了多线程处理器,适合cpu多核条件下。MultithreadedMapRunner的代码如下:


 /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import org.apache.hadoop.util.ReflectionUtils;
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.mapred.MapRunnable;
025    import org.apache.hadoop.mapred.JobConf;
026    import org.apache.hadoop.mapred.Mapper;
027    import org.apache.hadoop.mapred.RecordReader;
028    import org.apache.hadoop.mapred.OutputCollector;
029    import org.apache.hadoop.mapred.Reporter;
030    import org.apache.hadoop.mapred.SkipBadRecords;
031    import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import java.io.IOException;
036    import java.util.concurrent.*;
037    
038    /**
039     * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
040     * <p>
041     * It can be used instead of the default implementation,
042     * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
043     * bound in order to improve throughput.
044     * <p>
045     * Map implementations using this MapRunnable must be thread-safe.
046     * <p>
047     * The Map-Reduce job has to be configured to use this MapRunnable class (using
048     * the JobConf.setMapRunnerClass method) and
049     * the number of thread the thread-pool can use with the
050     * <code>mapred.map.multithreadedrunner.threads</code> property, its default
051     * value is 10 threads.
052     * <p>
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Stable
056    public class MultithreadedMapRunner<K1, V1, K2, V2>
057        implements MapRunnable<K1, V1, K2, V2> {
058    
059      private static final Log LOG =
060        LogFactory.getLog(MultithreadedMapRunner.class.getName());
061    
062      private JobConf job;
063      private Mapper<K1, V1, K2, V2> mapper;
064      private ExecutorService executorService;
065      private volatile IOException ioException;
066      private volatile RuntimeException runtimeException;
067      private boolean incrProcCount;
068    
069      @SuppressWarnings("unchecked")
070      public void configure(JobConf jobConf) {
071        int numberOfThreads =
072          jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
073        if (LOG.isDebugEnabled()) {
074          LOG.debug("Configuring jobConf " + jobConf.getJobName() +
075                    " to use " + numberOfThreads + " threads");
076        }
077    
078        this.job = jobConf;
079        //increment processed counter only if skipping feature is enabled
080        this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
081          SkipBadRecords.getAutoIncrMapperProcCount(job);
082        this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
083            jobConf);
084    
085        // Creating a threadpool of the configured size to execute the Mapper
086        // map method in parallel.
087        executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
088                                                 0L, TimeUnit.MILLISECONDS,
089                                                 new BlockingArrayQueue
090                                                   (numberOfThreads));
091      }
092    
093      /**
094       * A blocking array queue that replaces offer and add, which throws on a full
095       * queue, to a put, which waits on a full queue.
096       */
097      private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
098     
099        private static final long serialVersionUID = 1L;
100        public BlockingArrayQueue(int capacity) {
101          super(capacity);
102        }
103        public boolean offer(Runnable r) {
104          return add(r);
105        }
106        public boolean add(Runnable r) {
107          try {
108            put(r);
109          } catch (InterruptedException ie) {
110            Thread.currentThread().interrupt();
111          }
112          return true;
113        }
114      }
115    
116      private void checkForExceptionsFromProcessingThreads()
117          throws IOException, RuntimeException {
118        // Checking if a Mapper.map within a Runnable has generated an
119        // IOException. If so we rethrow it to force an abort of the Map
120        // operation thus keeping the semantics of the default
121        // implementation.
122        if (ioException != null) {
123          throw ioException;
124        }
125    
126        // Checking if a Mapper.map within a Runnable has generated a
127        // RuntimeException. If so we rethrow it to force an abort of the Map
128        // operation thus keeping the semantics of the default
129        // implementation.
130        if (runtimeException != null) {
131          throw runtimeException;
132        }
133      }
134    
135      public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
136                      Reporter reporter)
137        throws IOException {
138        try {
139          // allocate key & value instances these objects will not be reused
140          // because execution of Mapper.map is not serialized.
141          K1 key = input.createKey();
142          V1 value = input.createValue();
143    
144          while (input.next(key, value)) {
145    
146            executorService.execute(new MapperInvokeRunable(key, value, output,
147                                    reporter));
148    
149            checkForExceptionsFromProcessingThreads();
150    
151            // Allocate new key & value instances as mapper is running in parallel
152            key = input.createKey();
153            value = input.createValue();
154          }
155    
156          if (LOG.isDebugEnabled()) {
157            LOG.debug("Finished dispatching all Mappper.map calls, job "
158                      + job.getJobName());
159          }
160    
161          // Graceful shutdown of the Threadpool, it will let all scheduled
162          // Runnables to end.
163          executorService.shutdown();
164    
165          try {
166    
167            // Now waiting for all Runnables to end.
168            while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
169              if (LOG.isDebugEnabled()) {
170                LOG.debug("Awaiting all running Mappper.map calls to finish, job "
171                          + job.getJobName());
172              }
173    
174              // NOTE: while Mapper.map dispatching has concluded there are still
175              // map calls in progress and exceptions would be thrown.
176              checkForExceptionsFromProcessingThreads();
177    
178            }
179    
180            // NOTE: it could be that a map call has had an exception after the
181            // call for awaitTermination() returing true. And edge case but it
182            // could happen.
183            checkForExceptionsFromProcessingThreads();
184    
185          } catch (IOException ioEx) {
186            // Forcing a shutdown of all thread of the threadpool and rethrowing
187            // the IOException
188            executorService.shutdownNow();
189            throw ioEx;
190          } catch (InterruptedException iEx) {
191            throw new RuntimeException(iEx);
192          }
193    
194        } finally {
195          mapper.close();
196        }
197      }
198    
199    
200      /**
201       * Runnable to execute a single Mapper.map call from a forked thread.
202       */
203      private class MapperInvokeRunable implements Runnable {
204        private K1 key;
205        private V1 value;
206        private OutputCollector<K2, V2> output;
207        private Reporter reporter;
208    
209        /**
210         * Collecting all required parameters to execute a Mapper.map call.
211         * <p>
212         *
213         * @param key
214         * @param value
215         * @param output
216         * @param reporter
217         */
218        public MapperInvokeRunable(K1 key, V1 value,
219                                   OutputCollector<K2, V2> output,
220                                   Reporter reporter) {
221          this.key = key;
222          this.value = value;
223          this.output = output;
224          this.reporter = reporter;
225        }
226    
227        /**
228         * Executes a Mapper.map call with the given Mapper and parameters.
229         * <p>
230         * This method is called from the thread-pool thread.
231         *
232         */
233        public void run() {
234          try {
235            // map pair to output
236            MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
237            if(incrProcCount) {
238              reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
239                  SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
240            }
241          } catch (IOException ex) {
242            // If there is an IOException during the call it is set in an instance
243            // variable of the MultithreadedMapRunner from where it will be
244            // rethrown.
245            synchronized (MultithreadedMapRunner.this) {
246              if (MultithreadedMapRunner.this.ioException == null) {
247                MultithreadedMapRunner.this.ioException = ex;
248              }
249            }
250          } catch (RuntimeException ex) {
251            // If there is a RuntimeException during the call it is set in an
252            // instance variable of the MultithreadedMapRunner from where it will be
253            // rethrown.
254            synchronized (MultithreadedMapRunner.this) {
255              if (MultithreadedMapRunner.this.runtimeException == null) {
256                MultithreadedMapRunner.this.runtimeException = ex;
257              }
258            }
259          }
260        }
261      }
262    
263    }

这篇关于关于Maptask任务单线程与多线程执行器解读的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/442938

相关文章

解读GC日志中的各项指标用法

《解读GC日志中的各项指标用法》:本文主要介绍GC日志中的各项指标用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、基础 GC 日志格式(以 G1 为例)1. Minor GC 日志2. Full GC 日志二、关键指标解析1. GC 类型与触发原因2. 堆

Java设计模式---迭代器模式(Iterator)解读

《Java设计模式---迭代器模式(Iterator)解读》:本文主要介绍Java设计模式---迭代器模式(Iterator),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录1、迭代器(Iterator)1.1、结构1.2、常用方法1.3、本质1、解耦集合与遍历逻辑2、统一

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

MySQL之InnoDB存储页的独立表空间解读

《MySQL之InnoDB存储页的独立表空间解读》:本文主要介绍MySQL之InnoDB存储页的独立表空间,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、独立表空间【1】表空间大小【2】区【3】组【4】段【5】区的类型【6】XDES Entry区结构【

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

在Golang中实现定时任务的几种高效方法

《在Golang中实现定时任务的几种高效方法》本文将详细介绍在Golang中实现定时任务的几种高效方法,包括time包中的Ticker和Timer、第三方库cron的使用,以及基于channel和go... 目录背景介绍目的和范围预期读者文档结构概述术语表核心概念与联系故事引入核心概念解释核心概念之间的关系

springboot如何通过http动态操作xxl-job任务

《springboot如何通过http动态操作xxl-job任务》:本文主要介绍springboot如何通过http动态操作xxl-job任务的问题,具有很好的参考价值,希望对大家有所帮助,如有错... 目录springboot通过http动态操作xxl-job任务一、maven依赖二、配置文件三、xxl-

MySQL主从复制与读写分离的用法解读

《MySQL主从复制与读写分离的用法解读》:本文主要介绍MySQL主从复制与读写分离的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、主从复制mysql主从复制原理实验案例二、读写分离实验案例安装并配置mycat 软件设置mycat读写分离验证mycat读

Python的端到端测试框架SeleniumBase使用解读

《Python的端到端测试框架SeleniumBase使用解读》:本文主要介绍Python的端到端测试框架SeleniumBase使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全... 目录SeleniumBase详细介绍及用法指南什么是 SeleniumBase?SeleniumBase

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2