关于Maptask任务单线程与多线程执行器解读

2023-12-01 22:32

本文主要是介绍关于Maptask任务单线程与多线程执行器解读,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相比Mpareduce 老版本的API, 新版本的API 在maptask执行map任务的接口设计上有比较大的改动。  

在老版的API中, MapRunner的run函数中:

public void run(RecordReader input, OutputCollector output, Reporter reporter)
        throws IOException
    {
        Object key = input.createKey();
        Object value = input.createValue();
        do
        {
            if(!input.next(key, value))
                break;
            mapper.map(key, value, output, reporter);
            if(incrProcCount)
                reporter.incrCounter("SkippingTaskCounters", "MapProcessedRecords", 1L);
        } while(true);
        mapper.close();
        break MISSING_BLOCK_LABEL_91;
        Exception exception;
        exception;
        mapper.close();
        throw exception;
    }

从代码中可以发现,对于当前maptask的input,就是一个recoredreader,调用nextkey函数,获取key value对, 然后直接交给mapper的map函数来执行。

新版的API中,在maptask的runNewMapper函数里面,先获取当前的mapper类, 

mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

然后直接调用mapper对象的run函数:

mapper.run(mapperContext);

在run函数里面,会将每个key value对 交给map函数进行处理。


一般情况下,我们都使用单线程处理器来完成map任务,mappreunner就是一个单线程处理器,同时hadoop也为maptask定义了多线程处理器,适合cpu多核条件下。MultithreadedMapRunner的代码如下:


 /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import org.apache.hadoop.util.ReflectionUtils;
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.mapred.MapRunnable;
025    import org.apache.hadoop.mapred.JobConf;
026    import org.apache.hadoop.mapred.Mapper;
027    import org.apache.hadoop.mapred.RecordReader;
028    import org.apache.hadoop.mapred.OutputCollector;
029    import org.apache.hadoop.mapred.Reporter;
030    import org.apache.hadoop.mapred.SkipBadRecords;
031    import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import java.io.IOException;
036    import java.util.concurrent.*;
037    
038    /**
039     * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
040     * <p>
041     * It can be used instead of the default implementation,
042     * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
043     * bound in order to improve throughput.
044     * <p>
045     * Map implementations using this MapRunnable must be thread-safe.
046     * <p>
047     * The Map-Reduce job has to be configured to use this MapRunnable class (using
048     * the JobConf.setMapRunnerClass method) and
049     * the number of thread the thread-pool can use with the
050     * <code>mapred.map.multithreadedrunner.threads</code> property, its default
051     * value is 10 threads.
052     * <p>
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Stable
056    public class MultithreadedMapRunner<K1, V1, K2, V2>
057        implements MapRunnable<K1, V1, K2, V2> {
058    
059      private static final Log LOG =
060        LogFactory.getLog(MultithreadedMapRunner.class.getName());
061    
062      private JobConf job;
063      private Mapper<K1, V1, K2, V2> mapper;
064      private ExecutorService executorService;
065      private volatile IOException ioException;
066      private volatile RuntimeException runtimeException;
067      private boolean incrProcCount;
068    
069      @SuppressWarnings("unchecked")
070      public void configure(JobConf jobConf) {
071        int numberOfThreads =
072          jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
073        if (LOG.isDebugEnabled()) {
074          LOG.debug("Configuring jobConf " + jobConf.getJobName() +
075                    " to use " + numberOfThreads + " threads");
076        }
077    
078        this.job = jobConf;
079        //increment processed counter only if skipping feature is enabled
080        this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
081          SkipBadRecords.getAutoIncrMapperProcCount(job);
082        this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
083            jobConf);
084    
085        // Creating a threadpool of the configured size to execute the Mapper
086        // map method in parallel.
087        executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
088                                                 0L, TimeUnit.MILLISECONDS,
089                                                 new BlockingArrayQueue
090                                                   (numberOfThreads));
091      }
092    
093      /**
094       * A blocking array queue that replaces offer and add, which throws on a full
095       * queue, to a put, which waits on a full queue.
096       */
097      private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
098     
099        private static final long serialVersionUID = 1L;
100        public BlockingArrayQueue(int capacity) {
101          super(capacity);
102        }
103        public boolean offer(Runnable r) {
104          return add(r);
105        }
106        public boolean add(Runnable r) {
107          try {
108            put(r);
109          } catch (InterruptedException ie) {
110            Thread.currentThread().interrupt();
111          }
112          return true;
113        }
114      }
115    
116      private void checkForExceptionsFromProcessingThreads()
117          throws IOException, RuntimeException {
118        // Checking if a Mapper.map within a Runnable has generated an
119        // IOException. If so we rethrow it to force an abort of the Map
120        // operation thus keeping the semantics of the default
121        // implementation.
122        if (ioException != null) {
123          throw ioException;
124        }
125    
126        // Checking if a Mapper.map within a Runnable has generated a
127        // RuntimeException. If so we rethrow it to force an abort of the Map
128        // operation thus keeping the semantics of the default
129        // implementation.
130        if (runtimeException != null) {
131          throw runtimeException;
132        }
133      }
134    
135      public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
136                      Reporter reporter)
137        throws IOException {
138        try {
139          // allocate key & value instances these objects will not be reused
140          // because execution of Mapper.map is not serialized.
141          K1 key = input.createKey();
142          V1 value = input.createValue();
143    
144          while (input.next(key, value)) {
145    
146            executorService.execute(new MapperInvokeRunable(key, value, output,
147                                    reporter));
148    
149            checkForExceptionsFromProcessingThreads();
150    
151            // Allocate new key & value instances as mapper is running in parallel
152            key = input.createKey();
153            value = input.createValue();
154          }
155    
156          if (LOG.isDebugEnabled()) {
157            LOG.debug("Finished dispatching all Mappper.map calls, job "
158                      + job.getJobName());
159          }
160    
161          // Graceful shutdown of the Threadpool, it will let all scheduled
162          // Runnables to end.
163          executorService.shutdown();
164    
165          try {
166    
167            // Now waiting for all Runnables to end.
168            while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
169              if (LOG.isDebugEnabled()) {
170                LOG.debug("Awaiting all running Mappper.map calls to finish, job "
171                          + job.getJobName());
172              }
173    
174              // NOTE: while Mapper.map dispatching has concluded there are still
175              // map calls in progress and exceptions would be thrown.
176              checkForExceptionsFromProcessingThreads();
177    
178            }
179    
180            // NOTE: it could be that a map call has had an exception after the
181            // call for awaitTermination() returing true. And edge case but it
182            // could happen.
183            checkForExceptionsFromProcessingThreads();
184    
185          } catch (IOException ioEx) {
186            // Forcing a shutdown of all thread of the threadpool and rethrowing
187            // the IOException
188            executorService.shutdownNow();
189            throw ioEx;
190          } catch (InterruptedException iEx) {
191            throw new RuntimeException(iEx);
192          }
193    
194        } finally {
195          mapper.close();
196        }
197      }
198    
199    
200      /**
201       * Runnable to execute a single Mapper.map call from a forked thread.
202       */
203      private class MapperInvokeRunable implements Runnable {
204        private K1 key;
205        private V1 value;
206        private OutputCollector<K2, V2> output;
207        private Reporter reporter;
208    
209        /**
210         * Collecting all required parameters to execute a Mapper.map call.
211         * <p>
212         *
213         * @param key
214         * @param value
215         * @param output
216         * @param reporter
217         */
218        public MapperInvokeRunable(K1 key, V1 value,
219                                   OutputCollector<K2, V2> output,
220                                   Reporter reporter) {
221          this.key = key;
222          this.value = value;
223          this.output = output;
224          this.reporter = reporter;
225        }
226    
227        /**
228         * Executes a Mapper.map call with the given Mapper and parameters.
229         * <p>
230         * This method is called from the thread-pool thread.
231         *
232         */
233        public void run() {
234          try {
235            // map pair to output
236            MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
237            if(incrProcCount) {
238              reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
239                  SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
240            }
241          } catch (IOException ex) {
242            // If there is an IOException during the call it is set in an instance
243            // variable of the MultithreadedMapRunner from where it will be
244            // rethrown.
245            synchronized (MultithreadedMapRunner.this) {
246              if (MultithreadedMapRunner.this.ioException == null) {
247                MultithreadedMapRunner.this.ioException = ex;
248              }
249            }
250          } catch (RuntimeException ex) {
251            // If there is a RuntimeException during the call it is set in an
252            // instance variable of the MultithreadedMapRunner from where it will be
253            // rethrown.
254            synchronized (MultithreadedMapRunner.this) {
255              if (MultithreadedMapRunner.this.runtimeException == null) {
256                MultithreadedMapRunner.this.runtimeException = ex;
257              }
258            }
259          }
260        }
261      }
262    
263    }

这篇关于关于Maptask任务单线程与多线程执行器解读的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/442938

相关文章

MCU7.keil中build产生的hex文件解读

1.hex文件大致解读 闲来无事,查看了MCU6.用keil新建项目的hex文件 用FlexHex打开 给我的第一印象是:经过软件的解释之后,发现这些数据排列地十分整齐 :02000F0080FE71:03000000020003F8:0C000300787FE4F6D8FD75810702000F3D:00000001FF 把解释后的数据当作十六进制来观察 1.每一行数据

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

多线程解析报表

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。 Way1 join import java.time.LocalTime;public class Main {public static void main(String[] args) thro

GPT系列之:GPT-1,GPT-2,GPT-3详细解读

一、GPT1 论文:Improving Language Understanding by Generative Pre-Training 链接:https://cdn.openai.com/research-covers/languageunsupervised/language_understanding_paper.pdf 启发点:生成loss和微调loss同时作用,让下游任务来适应预训

Java 多线程概述

多线程技术概述   1.线程与进程 进程:内存中运行的应用程序,每个进程都拥有一个独立的内存空间。线程:是进程中的一个执行路径,共享一个内存空间,线程之间可以自由切换、并发执行,一个进程最少有一个线程,线程实际数是在进程基础之上的进一步划分,一个进程启动之后,进程之中的若干执行路径又可以划分成若干个线程 2.线程的调度 分时调度:所有线程轮流使用CPU的使用权,平均分配时间抢占式调度

Java 多线程的基本方式

Java 多线程的基本方式 基础实现两种方式: 通过实现Callable 接口方式(可得到返回值):

JAVA- 多线程

一,多线程的概念 1.并行与并发 并行:多个任务在同一时刻在cpu 上同时执行并发:多个任务在同一时刻在cpu 上交替执行 2.进程与线程 进程:就是操作系统中正在运行的一个应用程序。所以进程也就是“正在进行的程序”。(Windows系统中,我们可以在任务管理器中看 到进程) 线程:是程序运行的基本执行单元。当操作系统执行一个程序时, 会在系统中建立一个进程,该进程必须至少建立一个线

FreeRTOS学习笔记(二)任务基础篇

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、 任务的基本内容1.1 任务的基本特点1.2 任务的状态1.3 任务控制块——任务的“身份证” 二、 任务的实现2.1 定义任务函数2.2 创建任务2.3 启动任务调度器2.4 任务的运行与切换2.4.1 利用延时函数2.4.2 利用中断 2.5 任务的通信与同步2.6 任务的删除2.7 任务的通知2

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe