关于Maptask任务单线程与多线程执行器解读

2023-12-01 22:32

本文主要是介绍关于Maptask任务单线程与多线程执行器解读,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相比Mpareduce 老版本的API, 新版本的API 在maptask执行map任务的接口设计上有比较大的改动。  

在老版的API中, MapRunner的run函数中:

public void run(RecordReader input, OutputCollector output, Reporter reporter)
        throws IOException
    {
        Object key = input.createKey();
        Object value = input.createValue();
        do
        {
            if(!input.next(key, value))
                break;
            mapper.map(key, value, output, reporter);
            if(incrProcCount)
                reporter.incrCounter("SkippingTaskCounters", "MapProcessedRecords", 1L);
        } while(true);
        mapper.close();
        break MISSING_BLOCK_LABEL_91;
        Exception exception;
        exception;
        mapper.close();
        throw exception;
    }

从代码中可以发现,对于当前maptask的input,就是一个recoredreader,调用nextkey函数,获取key value对, 然后直接交给mapper的map函数来执行。

新版的API中,在maptask的runNewMapper函数里面,先获取当前的mapper类, 

mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

然后直接调用mapper对象的run函数:

mapper.run(mapperContext);

在run函数里面,会将每个key value对 交给map函数进行处理。


一般情况下,我们都使用单线程处理器来完成map任务,mappreunner就是一个单线程处理器,同时hadoop也为maptask定义了多线程处理器,适合cpu多核条件下。MultithreadedMapRunner的代码如下:


 /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import org.apache.hadoop.util.ReflectionUtils;
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.mapred.MapRunnable;
025    import org.apache.hadoop.mapred.JobConf;
026    import org.apache.hadoop.mapred.Mapper;
027    import org.apache.hadoop.mapred.RecordReader;
028    import org.apache.hadoop.mapred.OutputCollector;
029    import org.apache.hadoop.mapred.Reporter;
030    import org.apache.hadoop.mapred.SkipBadRecords;
031    import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import java.io.IOException;
036    import java.util.concurrent.*;
037    
038    /**
039     * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
040     * <p>
041     * It can be used instead of the default implementation,
042     * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
043     * bound in order to improve throughput.
044     * <p>
045     * Map implementations using this MapRunnable must be thread-safe.
046     * <p>
047     * The Map-Reduce job has to be configured to use this MapRunnable class (using
048     * the JobConf.setMapRunnerClass method) and
049     * the number of thread the thread-pool can use with the
050     * <code>mapred.map.multithreadedrunner.threads</code> property, its default
051     * value is 10 threads.
052     * <p>
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Stable
056    public class MultithreadedMapRunner<K1, V1, K2, V2>
057        implements MapRunnable<K1, V1, K2, V2> {
058    
059      private static final Log LOG =
060        LogFactory.getLog(MultithreadedMapRunner.class.getName());
061    
062      private JobConf job;
063      private Mapper<K1, V1, K2, V2> mapper;
064      private ExecutorService executorService;
065      private volatile IOException ioException;
066      private volatile RuntimeException runtimeException;
067      private boolean incrProcCount;
068    
069      @SuppressWarnings("unchecked")
070      public void configure(JobConf jobConf) {
071        int numberOfThreads =
072          jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
073        if (LOG.isDebugEnabled()) {
074          LOG.debug("Configuring jobConf " + jobConf.getJobName() +
075                    " to use " + numberOfThreads + " threads");
076        }
077    
078        this.job = jobConf;
079        //increment processed counter only if skipping feature is enabled
080        this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
081          SkipBadRecords.getAutoIncrMapperProcCount(job);
082        this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
083            jobConf);
084    
085        // Creating a threadpool of the configured size to execute the Mapper
086        // map method in parallel.
087        executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
088                                                 0L, TimeUnit.MILLISECONDS,
089                                                 new BlockingArrayQueue
090                                                   (numberOfThreads));
091      }
092    
093      /**
094       * A blocking array queue that replaces offer and add, which throws on a full
095       * queue, to a put, which waits on a full queue.
096       */
097      private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
098     
099        private static final long serialVersionUID = 1L;
100        public BlockingArrayQueue(int capacity) {
101          super(capacity);
102        }
103        public boolean offer(Runnable r) {
104          return add(r);
105        }
106        public boolean add(Runnable r) {
107          try {
108            put(r);
109          } catch (InterruptedException ie) {
110            Thread.currentThread().interrupt();
111          }
112          return true;
113        }
114      }
115    
116      private void checkForExceptionsFromProcessingThreads()
117          throws IOException, RuntimeException {
118        // Checking if a Mapper.map within a Runnable has generated an
119        // IOException. If so we rethrow it to force an abort of the Map
120        // operation thus keeping the semantics of the default
121        // implementation.
122        if (ioException != null) {
123          throw ioException;
124        }
125    
126        // Checking if a Mapper.map within a Runnable has generated a
127        // RuntimeException. If so we rethrow it to force an abort of the Map
128        // operation thus keeping the semantics of the default
129        // implementation.
130        if (runtimeException != null) {
131          throw runtimeException;
132        }
133      }
134    
135      public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
136                      Reporter reporter)
137        throws IOException {
138        try {
139          // allocate key & value instances these objects will not be reused
140          // because execution of Mapper.map is not serialized.
141          K1 key = input.createKey();
142          V1 value = input.createValue();
143    
144          while (input.next(key, value)) {
145    
146            executorService.execute(new MapperInvokeRunable(key, value, output,
147                                    reporter));
148    
149            checkForExceptionsFromProcessingThreads();
150    
151            // Allocate new key & value instances as mapper is running in parallel
152            key = input.createKey();
153            value = input.createValue();
154          }
155    
156          if (LOG.isDebugEnabled()) {
157            LOG.debug("Finished dispatching all Mappper.map calls, job "
158                      + job.getJobName());
159          }
160    
161          // Graceful shutdown of the Threadpool, it will let all scheduled
162          // Runnables to end.
163          executorService.shutdown();
164    
165          try {
166    
167            // Now waiting for all Runnables to end.
168            while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
169              if (LOG.isDebugEnabled()) {
170                LOG.debug("Awaiting all running Mappper.map calls to finish, job "
171                          + job.getJobName());
172              }
173    
174              // NOTE: while Mapper.map dispatching has concluded there are still
175              // map calls in progress and exceptions would be thrown.
176              checkForExceptionsFromProcessingThreads();
177    
178            }
179    
180            // NOTE: it could be that a map call has had an exception after the
181            // call for awaitTermination() returing true. And edge case but it
182            // could happen.
183            checkForExceptionsFromProcessingThreads();
184    
185          } catch (IOException ioEx) {
186            // Forcing a shutdown of all thread of the threadpool and rethrowing
187            // the IOException
188            executorService.shutdownNow();
189            throw ioEx;
190          } catch (InterruptedException iEx) {
191            throw new RuntimeException(iEx);
192          }
193    
194        } finally {
195          mapper.close();
196        }
197      }
198    
199    
200      /**
201       * Runnable to execute a single Mapper.map call from a forked thread.
202       */
203      private class MapperInvokeRunable implements Runnable {
204        private K1 key;
205        private V1 value;
206        private OutputCollector<K2, V2> output;
207        private Reporter reporter;
208    
209        /**
210         * Collecting all required parameters to execute a Mapper.map call.
211         * <p>
212         *
213         * @param key
214         * @param value
215         * @param output
216         * @param reporter
217         */
218        public MapperInvokeRunable(K1 key, V1 value,
219                                   OutputCollector<K2, V2> output,
220                                   Reporter reporter) {
221          this.key = key;
222          this.value = value;
223          this.output = output;
224          this.reporter = reporter;
225        }
226    
227        /**
228         * Executes a Mapper.map call with the given Mapper and parameters.
229         * <p>
230         * This method is called from the thread-pool thread.
231         *
232         */
233        public void run() {
234          try {
235            // map pair to output
236            MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
237            if(incrProcCount) {
238              reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
239                  SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
240            }
241          } catch (IOException ex) {
242            // If there is an IOException during the call it is set in an instance
243            // variable of the MultithreadedMapRunner from where it will be
244            // rethrown.
245            synchronized (MultithreadedMapRunner.this) {
246              if (MultithreadedMapRunner.this.ioException == null) {
247                MultithreadedMapRunner.this.ioException = ex;
248              }
249            }
250          } catch (RuntimeException ex) {
251            // If there is a RuntimeException during the call it is set in an
252            // instance variable of the MultithreadedMapRunner from where it will be
253            // rethrown.
254            synchronized (MultithreadedMapRunner.this) {
255              if (MultithreadedMapRunner.this.runtimeException == null) {
256                MultithreadedMapRunner.this.runtimeException = ex;
257              }
258            }
259          }
260        }
261      }
262    
263    }

这篇关于关于Maptask任务单线程与多线程执行器解读的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/442938

相关文章

Redis实现延迟任务的三种方法详解

《Redis实现延迟任务的三种方法详解》延迟任务(DelayedTask)是指在未来的某个时间点,执行相应的任务,本文为大家整理了三种常见的实现方法,感兴趣的小伙伴可以参考一下... 目录1.前言2.Redis如何实现延迟任务3.代码实现3.1. 过期键通知事件实现3.2. 使用ZSet实现延迟任务3.3

Linux中的计划任务(crontab)使用方式

《Linux中的计划任务(crontab)使用方式》:本文主要介绍Linux中的计划任务(crontab)使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、前言1、linux的起源与发展2、什么是计划任务(crontab)二、crontab基础1、cro

java之Objects.nonNull用法代码解读

《java之Objects.nonNull用法代码解读》:本文主要介绍java之Objects.nonNull用法代码,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录Java之Objects.nonwww.chinasem.cnNull用法代码Objects.nonN

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

如何使用Python实现一个简单的window任务管理器

《如何使用Python实现一个简单的window任务管理器》这篇文章主要为大家详细介绍了如何使用Python实现一个简单的window任务管理器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 任务管理器效果图完整代码import tkinter as tkfrom tkinter i

SpringCloud负载均衡spring-cloud-starter-loadbalancer解读

《SpringCloud负载均衡spring-cloud-starter-loadbalancer解读》:本文主要介绍SpringCloud负载均衡spring-cloud-starter-loa... 目录简述主要特点使用负载均衡算法1. 轮询负载均衡策略(Round Robin)2. 随机负载均衡策略(

解读spring.factories文件配置详情

《解读spring.factories文件配置详情》:本文主要介绍解读spring.factories文件配置详情,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录使用场景作用内部原理机制SPI机制Spring Factories 实现原理用法及配置spring.f

Spring MVC使用视图解析的问题解读

《SpringMVC使用视图解析的问题解读》:本文主要介绍SpringMVC使用视图解析的问题解读,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring MVC使用视图解析1. 会使用视图解析的情况2. 不会使用视图解析的情况总结Spring MVC使用视图

Spring Boot 集成 Quartz 使用Cron 表达式实现定时任务

《SpringBoot集成Quartz使用Cron表达式实现定时任务》本文介绍了如何在SpringBoot项目中集成Quartz并使用Cron表达式进行任务调度,通过添加Quartz依赖、创... 目录前言1. 添加 Quartz 依赖2. 创建 Quartz 任务3. 配置 Quartz 任务调度4. 启

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir