大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)

本文主要是介绍大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术之_05_Hadoop学习_02_MapReduce

    • 第3章 MapReduce框架原理
      • 3.1 InputFormat数据输入
        • 3.1.1 切片与MapTask并行度决定机制
        • 3.1.2 Job提交流程源码和切片源码详解
        • 3.1.3 FileInputFormat切片机制
        • 3.1.4 CombineTextInputFormat切片机制
        • 3.1.5 CombineTextInputFormat案例实操
        • 3.1.6 FileInputFormat实现类
        • 3.1.7 KeyValueTextInputFormat使用案例
        • 3.1.8 NLineInputFormat使用案例
        • 3.1.9 自定义InputFormat
        • 3.1.10 自定义InputFormat案例实操
      • 3.2 MapReduce工作流程(面试重点)
      • 3.3 Shuffle机制(面试重点)
        • 3.3.1 Shuffle机制
        • 3.3.2 Partition分区
        • 3.3.3 Partition分区案例实操

第3章 MapReduce框架原理

3.1 InputFormat数据输入

3.1.1 切片与MapTask并行度决定机制

1、问题引出
  MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
  思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

2、MapTask并行度决定机制
  数据块: Block是HDFS物理上把数据分成一块一块的。
  数据切片: 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

3.1.2 Job提交流程源码和切片源码详解

1、Job提交流程源码详解,如下图所示:

boolean result = job.waitForCompletion(true);submit();// 1、建立连接connect();	// 1)创建提交Job的代理new Cluster(getConfiguration());// (1)判断是本地yarn还是远程initialize(jobTrackAddr, conf); // 2、提交jobsubmitter.submitJobInternal(Job.this, cluster)// 1)创建给集群提交数据的Stag路径Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 2)获取jobid,并创建job路径JobID jobId = submitClient.getNewJobID();// 3)拷贝jar包到集群copyAndConfigureFiles(job, submitJobDir);	rUploader.uploadFiles(job, jobSubmitDir);// 4)计算切片,生成切片规划文件writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job);// 5)向Stag路径写XML配置文件writeConf(conf, submitJobFile);conf.writeXml(out);// 6)提交job,返回提交状态status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

如下图所示:

2、FileInputFormat切片源码解析(input.getSplits(job))

3.1.3 FileInputFormat切片机制

FileInputFormat切片机制

FileInputFormat切片大小的参数配置

3.1.4 CombineTextInputFormat切片机制

  框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。之前处理小文件(har文件)使用的方法是归档。

  • 1、应用场景:
      CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
  • 2、虚拟存储切片最大值设置
      CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
      注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
  • 3、切片机制
      生成切片过程包括:虚拟存储过程和切片过程二部分。

(1)虚拟存储过程:
  将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
  例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
  (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
  (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
  (c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
  1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
  最终会形成3个切片,大小分别为:
  (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

3.1.5 CombineTextInputFormat案例实操

1、需求
  将输入的大量小文件合并成一个切片统一处理。
(1)输入数据
  准备4个小文件
(2)期望
  期望一个切片处理4个文件
2、实现过程
(1)不做任何处理,运行1.6节的WordCount案例程序,观察切片个数为4。

(2)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为3。
(a)驱动类中添加代码如下:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);// 虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

(b)运行如果为3个切片。

(3)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为1。
(a)驱动中添加代码如下:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);// 虚拟存储切片最大值设置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

(b)运行如果为1个切片。

3.1.6 FileInputFormat实现类


Ctrl + t 可得:

1、TextInputFormat

2、KeyValueTextInputFormat

3、NLineInputFormat

3.1.7 KeyValueTextInputFormat使用案例

1、需求
统计输入文件中每一行的第一个单词相同的行数。
(1)输入数据

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

(2)期望结果数据

banzhang	2
xihuan	2

2、需求分析

3、代码实现
(1)编写Mapper类

package com.atguigu.mr.kv;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable> {// 1、设置valueLongWritable v = new LongWritable(1);@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {// banzhang ni hao --> banzhang,1// 2、写出context.write(key, v);}
}

(2)编写Reducer类

package com.atguigu.mr.kv;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable> {LongWritable v = new LongWritable();@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {// bangzhang,1// bangzhang,1Long sum = 0L;// 1、汇总统计for (LongWritable value : values) {sum += value.get();}v.set(sum);// bangzhang,2// 2、输出context.write(key, v);}
}

(3)编写Driver类

package com.atguigu.mr.kv;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class KVTextDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "d:/temp/atguigu/0529/input/inputkv", "d:/temp/atguigu/0529/output5" };Configuration configuration = new Configuration();// 设置切割符configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");// 1、获取job对象Job job = Job.getInstance(configuration);// 2、设置jar包位置job.setJarByClass(KVTextDriver.class);// 3、关联mapper和reducerjob.setMapperClass(KVTextMapper.class);job.setReducerClass(KVTextReducer.class);// 4、设置map输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 5、设置最终输出数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 6、设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));// 设置输入格式job.setInputFormatClass(KeyValueTextInputFormat.class);FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
3.1.8 NLineInputFormat使用案例

1、需求
  对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少个切片。此案例要求每三行放入一个切片中。
(1)输入数据

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang

(2)期望输出数据

Number of splits:4

2、需求分析

3、代码实现
(1)编写Mapper类

package com.atguigu.mr.nline;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {private Text k = new Text();private LongWritable v = new LongWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 1、获取一行String line = value.toString();// 2、切割String[] splited = line.split(" ");// 3、循环写出for (String string : splited) {k.set(string);context.write(k, v);}}
}

(2)编写Reducer类

package com.atguigu.mr.nline;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable> {LongWritable v = new LongWritable();@Overrideprotected void reduce(Text k, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {Long sum = 0L;// 1、汇总求和for (LongWritable value : values) {sum += value.get();}v.set(sum);// 2、写出context.write(k, v);}
}

(3)编写Driver类

package com.atguigu.mr.nline;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NLineDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "d:/temp/atguigu/0529/input/inputnline", "d:/temp/atguigu/0529/output6" };Configuration configuration = new Configuration();// 1、获取job对象Job job = Job.getInstance(configuration);// 设置每个切片InputSplit中划分三条记录NLineInputFormat.setNumLinesPerSplit(job, 3);// 使用NLineInputFormat处理记录数  job.setInputFormatClass(NLineInputFormat.class);  // 2、设置jar包位置job.setJarByClass(NLineDriver.class);// 3、关联mapper和reducerjob.setMapperClass(NLineMapper.class);job.setReducerClass(NLineReducer.class);// 4、设置map输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 5、设置最终输出数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 6、设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

4.测试
(1)输入数据

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang

(2)输出结果的切片数,如下图所示:

3.1.9 自定义InputFormat

3.1.10 自定义InputFormat案例实操

  无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
1、需求
  将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key文件内容为value
(1)输入数据

(2)期望输出文件格式

2、需求分析

3.程序实现
(1)自定义InputFromat

package com.atguigu.mr.inputformat;import java.io.IOException;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {WholeRecordReader recordReader = new WholeRecordReader();recordReader.initialize(split, context);return recordReader;}
}

(2)自定义RecordReader类

package com.atguigu.mr.inputformat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class WholeRecordReader extends RecordReader<Text, BytesWritable> {private FileSplit split;private Configuration configuration;private Text k = new Text();private BytesWritable v = new BytesWritable();private boolean isProgress = true;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// 初始化this.split = (FileSplit) split;configuration = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// 核心业务逻辑处理if (isProgress) {// 1 、定义缓存区byte[] buf = new byte[(int) split.getLength()];FileSystem fs = null;FSDataInputStream fis = null;try {// 2、获取文件系统fsPath path = split.getPath();fs = path.getFileSystem(configuration);// 3、获取输入流fis = fs.open(path);// 4、读取文件内容IOUtils.readFully(fis, buf, 0, buf.length);// 5、输出文件内容(封装v)v.set(buf, 0, buf.length);// 6、获取文件路径及名称(封装k)k.set(path.toString());} catch (Exception e) {} finally {// 7、关闭资源IOUtils.closeStream(fis);}isProgress = false;return true;}return false;}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return k;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return v;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}

(3)编写SequenceFileMapper类处理流程

package com.atguigu.mr.inputformat;import java.io.IOException;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context)throws IOException, InterruptedException {context.write(key, value);}
}

(4)编写SequenceFileReducer类处理流程

package com.atguigu.mr.inputformat;import java.io.IOException;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values,Context context) throws IOException, InterruptedException {// 循环写出for (BytesWritable value : values) {context.write(key, value);}}
}

(5)编写SequenceFileDriver类处理流程

package com.atguigu.mr.inputformat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;public class SequenceFileDriver {public static void main(String[] args) throws IOException, Exception {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "d:/temp/atguigu/0529/input/inputself", "d:/temp/atguigu/0529/output7" };// 1、获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2、设置jar包存储位置、关联自定义的mapper和reducerjob.setJarByClass(SequenceFileDriver.class);job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);// 7、设置输入的inputFormatjob.setInputFormatClass(WholeFileInputformat.class);// 8、设置输出的outputFormatjob.setOutputFormatClass(SequenceFileOutputFormat.class);// 3、设置map输出端的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);// 4、设置最终输出端的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);// 5、设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

3.2 MapReduce工作流程(面试重点)

1、流程示意图,如下图所示:

2、流程详解
  上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:

  • 1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中。
  • 2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件。
  • 3)多个溢出文件会被合并成大的溢出文件。
  • 4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序。
  • 5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据。
  • 6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)。
  • 7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)。

3、注意
  Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
  缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M。
4、源码解析流程

context.write(k, NullWritable.get());output.write(key, value);collector.collect(key, value, partitioner.getPartition(key, value, partitions));HashPartitioner();  --> Hash分片collect()close()collector.flush()sortAndSpill()sort() --> QuickSort快排mergeParts();file.outfile.out.indexcollector.close();

3.3 Shuffle机制(面试重点)

3.3.1 Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。如下图所示。

3.3.2 Partition分区


自定义Partitioner分区步骤:

分区总结:

3.3.3 Partition分区案例实操

1、需求
  将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据

1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2	13846544121	192.196.100.2			        264     0	    200
3 	13956435636	192.196.100.3			        132	    1512	200
4 	13966251146	192.168.100.1			        240	    0	    404
5 	18271575951	192.168.100.2	www.atguigu.com	1527	2106	200
6 	84188413	192.168.100.3	www.atguigu.com	4116	1432	200
7 	13590439668	192.168.100.4			        1116	954	    200
8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
9 	13729199489	192.168.100.6			        240	    0	    200
10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	    200
11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
12 	15959002129	192.168.100.9	www.atguigu.com	1938	180	    500
13 	13560439638	192.168.100.10			        918	    4938	200
14 	13470253144	192.168.100.11			        180	    180	    200
15 	13682846555	192.168.100.12	www.qq.com	    1938	2910	200
16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
20 	13768778790	192.168.100.17			        120	    120	    200
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 	13568436656	192.168.100.19			        1116	954	    200

(2)期望输出数据
  手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
2、需求分析

3、在案例2.4的基础上,增加一个分区类

package com.atguigu.mr.flowsum;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner<Text, FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {// key是手机号,value是流量信息// 1、获取电话号码的前三位String prePhoneNum = key.toString().substring(0, 3);int partition = 4;// 2、判断是哪一个省份if ("136".equals(prePhoneNum)) {partition = 0;} else if ("137".equals(prePhoneNum)) {partition = 1;} else if ("138".equals(prePhoneNum)) {partition = 2;} else if ("139".equals(prePhoneNum)) {partition = 3;}return partition;}
}

4、在驱动函数中增加自定义数据分区设置和ReduceTask设置

package com.atguigu.mr.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowsumDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "d:/temp/atguigu/0529/input/inputflow", "d:/temp/atguigu/0529/output2" };// 1、获取配置信息,或者获取job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2、指定本程序的jar包所在的本地路径job.setJarByClass(FlowsumDriver.class);// 3、指定本业务job要使用的Mapper/Reducer业务类job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 4、指定Mapper输出的数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 5、指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 8、指定自定义数据分区job.setPartitionerClass(ProvincePartitioner.class);// 9、同时指定相应数量的reduce taskjob.setNumReduceTasks(5);// 6、指定job的输入输出原始文件所在的目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

我的GitHub地址:https://github.com/heizemingjun
我的博客园地址:https://www.cnblogs.com/chenmingjun
我的蚂蚁笔记博客地址:https://blog.leanote.com/chenmingjun
Copyright ©2018~2019 黑泽君
【转载文章务必保留出处和署名,谢谢!】

这篇关于大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1100802

相关文章

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

java中反射(Reflection)机制举例详解

《java中反射(Reflection)机制举例详解》Java中的反射机制是指Java程序在运行期间可以获取到一个对象的全部信息,:本文主要介绍java中反射(Reflection)机制的相关资料... 目录一、什么是反射?二、反射的用途三、获取Class对象四、Class类型的对象使用场景1五、Class

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

基于Flask框架添加多个AI模型的API并进行交互

《基于Flask框架添加多个AI模型的API并进行交互》:本文主要介绍如何基于Flask框架开发AI模型API管理系统,允许用户添加、删除不同AI模型的API密钥,感兴趣的可以了解下... 目录1. 概述2. 后端代码说明2.1 依赖库导入2.2 应用初始化2.3 API 存储字典2.4 路由函数2.5 应

Python GUI框架中的PyQt详解

《PythonGUI框架中的PyQt详解》PyQt是Python语言中最强大且广泛应用的GUI框架之一,基于Qt库的Python绑定实现,本文将深入解析PyQt的核心模块,并通过代码示例展示其应用场... 目录一、PyQt核心模块概览二、核心模块详解与示例1. QtCore - 核心基础模块2. QtWid

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个