本文主要是介绍大数据技术Hadoop(MapReduceYarn)详细又精简的总结(案例、代码、图片)齐全,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
MapReduce
案例所需文件在文章末尾链接
第1章 MapReduce概述
1.1MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架
1.3 MapReduce核心思想
1.6 常用数据序列化类型
Java类型 | Hadoop Writable****类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Integer | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
1.8 WordCount案例实操
需求:统计文件中各单词出现的次数
//Mapper
package com.atguigu.mr.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*作用:实现MapTask中需要实现的业务逻辑代码说明1.MapTask会调用Mapper类。2.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>两组:第一组:KEYIN :读取数据时的偏移量的类型VALUEIN :读取的数据的类型(一行一行的数据)第二组:KEYOUT :写出的数据的key的类型VALUEOUT :写出的数据的Value的类型*/
public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> {private Text outKey = new Text();//封装的keyprivate IntWritable outValue = new IntWritable();//封装的value/** 作用:实现MapTask中需要实现的业务逻辑代码* 说明:* 1.map方法是被MapTask去调用(不是我们调用的)* 2.map方法被循环调用每调用一次传一行数据* @param key 读取的数据的偏移量* @param value 读取的数据(一行一行的数据)* @param context 上下文(在这用来将数据写出去)* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//调用父类中的方法--不要写!!!!!!!!!!!!//super.map(key, value, context);‘//1.切割数据//1.1将value转成StringString line = value.toString();//1.2切割数据String[] words = line.split(" ");//2.封装k,v//2.1遍历数组for(String word : words){//2.2封装keyoutKey.set(word);//2.3封装valueoutValue.set(1);//3.将K,V写出去 --注意:一定不要写到for循环外(在这个逻辑下)context.write(outKey,outValue);}}
}
//Reducer
package com.atguigu.mr.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*作用 :实现ReduceTask需要实现的业务逻辑代码说明:1.RedcuTask会调用Reducer这个类2. Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>两组:第一组KEYIN :读取数据时的key的类型(Mapper输出的key的类型)VALUEIN :读取数据时的value的类型(Mapper输出的value的类型)第二组KEYOUT :写出数据时的key的类型(在这就是单词的类型)VALUEOUT :写出数据时的value的类型(在这就是单词数量的类型)*/
public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable> {private IntWritable outValue = new IntWritable();//封装的value/** 作用 :实现ReduceTask需要实现的业务逻辑代码* 说明:* 1.reduce方法是ReduceTask调用的(在这就理解成是框架调用的)* 2.reduce方法在被循环调用,每调用一次传一组数据* @param key 读取的key的值* @param values 读取的所有的value* @param context 上下文 在这用来将k,v写出去* @throws IOException* @throws InterruptedException** aaa 1* aaa 1* aaa 1*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;//所有value的和//1.遍历所有的valuefor (IntWritable value : values) {//2.将所有的value累加//2.1将IntWritable转换成intint v = value.get();sum += v;}//3.封装k,voutValue.set(sum);//4.写出K,Vcontext.write(key,outValue);}
}
1.本地模式Driver
//Driver
package com.atguigu.mr.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*程序的入口大家常犯错误:1.在map方法和reduce方法中调用了父类的方法2.在给Job对象的属性赋值时两次都是setMapOutputKey 或 setOutputKey3.在给Job对象的属性赋值时k,v的类型导错包4.如果是3.1的同学排除代码外 报错可以尝试换成3.0(记住把c盘中hadoop.dll,winutils.exe也换掉)*/
public class WCDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.创建Job对象Configuration conf = new Configuration();//可以在该对象中做一些配置Job job = Job.getInstance(conf);//2.给Job对象的属性赋值//2.1设置Jar加载路径(如果是本地模式,可以不设置)job.setJarByClass(WCDriver.class);//2.2设置Mapper和Reducer类job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);//2.3设置Mapper输出的k,v的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//2.4设置最终输出的K,V的类型(在这是Reducer输出的K,V类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//2.5设置数据的输入和输出路径FileInputFormat.setInputPaths(job,new Path("D:\\io\\input"));//注意:输出路径不能存在!!!!!!!!!!!!FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output"));//3.提交Job对象(执行Job)/*参数 :是否打印进度返回值 :如果是true则表示job执行成功*/boolean b = job.waitForCompletion(true);//JVM是否是正常退出 : 0正常退出,1非正常退出System.exit(b ? 0 : 1);}
}
2.集群模式Driver2
/*
1.将Driver2打包后在集群上运行
2.hadoop jar xxx.jar com.dai.mr.wordcount.WCDriver2 /input /output
*/
package com.atguigu.mr.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*
将MR打包在集群上运行
hadoop jar xxx.jar com.atguigu.mr.wordcount.WCDriver2 参数1 参数2*/
public class WCDriver2 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WCDriver2.class);job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);)job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//2.5设置数据的输入和输出路径FileInputFormat.setInputPaths(job,new Path(args[0]));//注意:输出路径不能存在!!!!!!!!!!!!FileOutputFormat.setOutputPath(job,new Path(args[1]));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
3.本地向集群提交任务模式Driver3
一般用于调试
/*
在本地向集群提交Job
1.配置内容
2.打包
3.设置Jar包的路径
4.将job.setJarByClass(WCDriver3.class);注释掉
5.在editconfiguration中配置VM Options :-DHADOOP_USER_NAME=atguiguProgram Argument :hdfs://hadoop102:9820/input hdfs://hadoop102:9820/output2*/
package com.dai.mr.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*
在本地向集群提交Job
1.配置内容
2.打包
3.设置Jar包的路径:将jar包放在设置的路径,类似于设置input路径
4.将job.setJarByClass(WCDriver3.class);注释掉
5.在editconfiguration中配置VM Options :-DHADOOP_USER_NAME=atguiguProgram Argument :hdfs://hadoop102:9820/input hdfs://hadoop102:9820/output2*/
public class WCDriver3 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();/*1.配置内容*///设置在集群运行的相关参数-设置HDFS,NAMENODE的地址conf.set("fs.defaultFS", "hdfs://hadoop102:9820");//指定MR运行在Yarn上conf.set("mapreduce.framework.name","yarn");//指定MR可以在远程集群运行conf.set("mapreduce.app-submission.cross-platform","true");//指定yarn resourcemanager的位置conf.set("yarn.resourcemanager.hostname", "hadoop103");Job job = Job.getInstance(conf);//job.setJarByClass(WCDriver3.class);/*3.设置jar包路径*///设置jar包的路径job.setJar("G:\\0521\\06-hadoop\\3.代码\\MRDemo\\target\\MRDemo-1.0-SNAPSHOT.jar");job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
第2章 Hadoop序列化
2.1什么是序列化
序列化就是内存中的对象,转换成字节序列以便于存储到磁盘(持久化)
反序列化就是将字节序列或者是磁盘持久化的数据,转换成内存中的对象
2.2 自定义bean对象实现序列化接口(Writable)
具体实现bean对象序列化步骤如下7步
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
@Override
public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);
}
(4)重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();
}
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序
2.3 序列化案例实操
//FlowBean
package com.atguigu.mr.writable2;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements Writable {private long upFlow;private long downFlow;private long sumFlow;public FlowBean(){}public FlowBean(long upFlow, long downFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}/*当序列化时调用此方法*/public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*当反序列化时调用此方法注意:反序列化时读取数据的顺序要和序列化时写数据的顺序保持一致*/public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {return upFlow + " " + downFlow + " " + sumFlow;}
}
//Mapper
package com.atguigu.mr.writable2;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*作用:实现MapTask中需要实现的业务逻辑代码说明1.MapTask会调用Mapper类。2.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>两组:第一组:KEYIN :读取数据时的偏移量的类型VALUEIN :读取的数据的类型(一行一行的数据)第二组:KEYOUT :写出的数据的key的类型(手机号)VALUEOUT :写出的数据的Value的类型(FlowBean)*/
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {/** 作用:实现MapTask中需要实现的业务逻辑代码* 说明:* 1.map方法是被MapTask去调用(不是我们调用的)* 2.map方法被循环调用每调用一次传一行数据* @param key 读取的数据的偏移量* @param value 读取的数据(一行一行的数据)* @param context 上下文(在这用来将数据写出去)* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1.将value转成字符串String line = value.toString();//2.将数据切割String[] flowInfo = line.split("\t");//3.封装k,vText outkey = new Text(flowInfo[1]);FlowBean outValue = new FlowBean(Long.parseLong(flowInfo[flowInfo.length - 3]),Long.parseLong(flowInfo[flowInfo.length - 2]));//4.写出K,Vcontext.write(outkey,outValue);}
}
//Reducer
package com.atguigu.mr.writable2;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*作用 :实现ReduceTask需要实现的业务逻辑代码说明:1.RedcuTask会调用Reducer这个类2. Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>两组:第一组KEYIN :读取数据时的key的类型(Mapper输出的key的类型)VALUEIN :读取数据时的value的类型(Mapper输出的value的类型)第二组KEYOUT :写出数据时的key的类型(在这就是单词的类型)VALUEOUT :写出数据时的value的类型(在这就是单词数量的类型)*/
public class FLowReducer extends Reducer <Text,FlowBean,Text,FlowBean>{
/** 作用 :实现ReduceTask需要实现的业务逻辑代码* 说明:* 1.reduce方法是ReduceTask调用的(在这就理解成是框架调用的)* 2.reduce方法在被循环调用,每调用一次传一组数据* @param key 读取的key的值 --- 手机号* @param values 读取的所有的value --- FlowBean* @param context 上下文 在这用来将k,v写出去* @throws IOException* @throws InterruptedException** key value* 手机号1,对象(upflow,downflow,sumflow)手机号1,对象 (upflow,downflow,sumflow)*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {int sumUpFlow = 0;//总上行流量int sumDownFLow = 0;//总下行流量//1.遍历所有的valuefor (FlowBean value : values) {//2.将上行流量,下行流量进行累加sumUpFlow += value.getUpFlow();sumDownFLow += value.getDownFlow();}//3.封装k,vFlowBean outValue = new FlowBean(sumUpFlow, sumDownFLow);//4.写出K,Vcontext.write(key,outValue);}
}
//Driver
package com.atguigu.mr.writable2;import com.atguigu.mr.wordcount.WCDriver;
import com.atguigu.mr.wordcount.WCMapper;
import com.atguigu.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.创建Job对象Job job = Job.getInstance(new Configuration());//2.给Job对象的属性赋值//2.1设置Jar加载路径(如果是本地模式,可以不设置)job.setJarByClass(FlowDriver.class);//2.2设置Mapper和Reducer类job.setMapperClass(FlowMapper.class);job.setReducerClass(FLowReducer.class);//2.3设置Mapper输出的k,v的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//2.4设置最终输出的K,V的类型(在这是Reducer输出的K,V类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//2.5设置数据的输入和输出路径FileInputFormat.setInputPaths(job,new Path("D:\\io\\input2"));//注意:输出路径不能存在!!!!!!!!!!!!FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output2"));//3.提交Job对象(执行Job)/*参数 :是否打印进度返回值 :如果是true则表示job执行成功*/job.waitForCompletion(true);}
}
第3章 MapReduce框架原理
3.1 InputFormat数据输入
3.1.1同时运行几个maptask的决定机制
3.1.2 Job提交流程源码和切片源码详解
//1提交Job
job.waitForCompletion(true);//1.1submit();//1.1.1建立连接connect(); //创建提交Job的代理new Cluster(getConfiguration());//判断是本地yarn还是远程(如果是本地那么创建LocalJobRunner,如果是集群创建YarnRunner)initialize(jobTrackAddr, conf); //1.1.2 提交jobsubmitter.submitJobInternal(Job.this, cluster)//创建给集群提交数据的Stag路径(本地提交就在MR程序所在的盘符.如果是集群就在HDFS上)Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//获取jobid ,并创建Job路径JobID jobId = submitClient.getNewJobID();//拷贝jar包到集群copyAndConfigureFiles(job, submitJobDir); //如果是向集群提交,那么会上传jar包rUploader.uploadFiles(job, jobSubmitDir);//计算切片,生成切片规划文件writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);//调用的InputFormat中的getSplits方法用来生成切片信息input.getSplits(job);//向Stag路径写XML配置文件writeConf(conf, submitJobFile);//将配置信息写到指定的staging路径 job.xmlconf.writeXml(out);//提交Job,返回提交状态(submitClient如果是本地那就是LocalJobRunner,否则就是YarnRunner)status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
3.1.3 FileInputFormat切片机制
作用:
1.InputFormat中的getSplits方法实现对信息切片
FileInputFormat重写了getSplits方法
2.InputFormat中的RecordReader实现数据的读取
TextInputFormat重写了RecordReader方法,实现对数据一行一行的读取
Input --> InputFormat(切片 一行一行读信息) --> Mapper --->Shuffle ---> Reducer ---> OutputFormat --->Output1.InputFormat的继承树|-----InputFormat(抽象类)|------FileInputFormat(抽象类)|-------TextInputFormat(默认使用的InputFormat)2.InputFormat(抽象类)/*生成切片信息*/public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;/*创建RecordReader对象,该对象是用来读取数据的。
*/ public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) 3.FileInputFormat(抽象类)该类中重写了getSplits方法。4.TextInputFormat(默认使用的InputFormat)
/*1.重写了 createRecordReader方法。2.在createRecordReader方法中创建了LineRecordReader,该对象是用来读取数据的,一行一行的读取数据。3.LineRecordReader是RecordReader的子类。
*/ @Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) {return new LineRecordReader(recordDelimiterBytes);}
3.2 MapReduce工作流程
1.InputFormat进行切片后将job提交过程中生产的配置传到HDFS
2.客户端向Yarn提交job
3.Yarn中的RM分配appmaster,appmaster根据提交的信息的大小向RM申请多少资源
4.RM分配Container执行MapTask
5.InputFormat中的RecordReader将文本信息一行一行的读入Mapper中
6.Mapper根据读入的信息执行Map方法
3.3 Shuffle机制
3.3.2 Partition分区
1.如果有多个ReduceTask时的默认分区:public class HashPartitioner<K, V> extends Partitioner<K, V> {/** 作用 :返回分区号K : map输出的keyV : map输出的valuenumReduceTasks : ReduceTask的数量*/public int getPartition(K key, V value,int numReduceTasks) {/*分区号 = key.hashCode() % numReduceTaskskey.hashCode() & Integer.MAX_VALUE : 作用是用来保证结果为正数*/return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
2.如果只有1个ReduceTask时的默认分区:partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {@Overridepublic int getPartition(K key, V value, int numPartitions) {return partitions - 1; //0}};
3.3.3 Partition分区案例实操
1.设置ReducerTask数量
job.setNumReduceTasks(5);
2.设置自定义分区类
job.setPartitionerClass(MyPartitoner.class);
3.自定义分区类
package com.atguigu.mr.partition2;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*自定义分区类:1.自定义类并继承PartitionerPartitioner<KEY, VALUE> :KEY :mapper写出的key的类型VALUE :mapper写出的value的类型2.重写getPartition方法*/
public class MyPartitioner extends Partitioner<Text,FlowBean> {/*** 返回分区号* @param text :mapper输出的key --手机号* @param flowBean mapper输出的value - flowBean* @param numPartitions reduceTask的数量* @return*/public int getPartition(Text text, FlowBean flowBean, int numPartitions) {/*需求 :手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。*/String phoneNumber = text.toString();//2.匹配手机号if(phoneNumber.startsWith("136")){return 0;}else if(phoneNumber.startsWith("137")){return 1;}else if(phoneNumber.startsWith("138")){return 2;}else if(phoneNumber.startsWith("139")){return 3;}else{return 4;}}
}
3.3.4 WritableComparable排序
自定义排序WritableComparable原理分析
bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
要记得加泛型
3.3.5WritableComparable排序案例实操(全排序)
需求:根据案例2.3产生的结果再次对总流量进行排序
//FlowBean
public class FlowBean implements WritableComparable<FlowBean>//要记得加泛型/*在该方法中指定按照哪个属性进行排序*/public int compareTo(FlowBean o) {//按照总流量进行排序return Long.compare(this.sumFlow,o.sumFlow);}
//Mapper
package com.atguigu.mr.comparable2;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*注意: 一定要让FlowBean作为key,因为只能对key进行排序。*/
public class CPMapper extends Mapper<LongWritable,Text,FlowBean, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1.将value转成StringString ling = value.toString();//2.切割String[] phoneInfo = ling.split("\t");//3.封装K,VFlowBean outKey = new FlowBean(Long.parseLong(phoneInfo[1]),Long.parseLong(phoneInfo[2]), Long.parseLong(phoneInfo[3]));Text outValue = new Text(phoneInfo[0]);//4.将k,v写出去context.write(outKey,outValue);}
}
//Reducer
package com.atguigu.mr.comparable2;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*只是将K,V进行调换注意 : 一定要将泛型调换顺序因为value是唯一的*/
public class CPReducer extends Reducer<FlowBean, Text,Text,FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//遍历所有的valuefor (Text outKey : values) {//将K,V写出去(原来的K变成V,原料来的V变成K)context.write(outKey,key);}}
}
//Driverpackage com.atguigu.mr.comparable2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CPDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.创建Job对象Job job = Job.getInstance(new Configuration());//2.给Job对象赋值job.setMapperClass(CPMapper.class);job.setReducerClass(CPReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job,new Path("D:\\io\\input10"));FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output1111"));//3.提交Jobjob.waitForCompletion(true);}
}
3.3.6WritableComparable排序案例实操(区内排序)
需求:要求每个省份手机号输出的文件中按照总流量内部排序,基于前一个需求,增加自定义分区类,分区按照省份手机号设置
//Driverpackage com.atguigu.mr.comparable3;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CPDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());//1.设置使用自定义分区类job.setPartitionerClass(MyPartitioner.class);//2.设置ReduceTask的数量job.setNumReduceTasks(5);job.setMapperClass(CPMapper.class);job.setReducerClass(CPReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job,new Path("D:\\io\\input10"));FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output11"));job.waitForCompletion(true);}
}
//MyPartitionerpackage com.atguigu.mr.comparable3;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*自定义分区类 :key : mapper输出的keyvalue : mappper输出的value*/
public class MyPartitioner extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {//1.将text转成StringString phoneNumber = text.toString();//2.匹配手机号if(phoneNumber.startsWith("136")){return 0;}else if(phoneNumber.startsWith("137")){return 1;}else if(phoneNumber.startsWith("138")){return 2;}else if(phoneNumber.startsWith("139")){return 3;}else{return 4;}}
}
3.6 OutputFormat数据输出
3.6.1 OutputFormat接口实现类
作用:
1.OutputFormat中的checkOutputSpecs检查输出路径
FileOutputFormat重写了checkOutputSpecs方法,实现
1.检查输出路径是否存在
2.输出路径是否设置
2.OutputFormat中的RecordWriter写数据
TextOutputForam重写了RecordWriter方法,实现一行一行的将数据写出
一 查看OutputFormat的继承树|-----OutputFormat(抽象类)|------FileOutputFormat(抽象类)|-----TextOutputForamt(默认使用的OutputFormat类)二 OutputFormat(抽象类)/*返回一个RecordWriter对象,该对象是真正用来写数据的对象。*/public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;/*该方法用来检查输出的一些设置(1.输出的路径是否存在 2.是否设置了输出路径)*/public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;三 FileOutputFormat(抽象类) 该类中重写了checkOutputSpecs用来检查1.输出的路径是否存在 2.是否设置了输出路径四 TextOutputForamt(默认使用的OutputFormat类) 1.重写了getRecordWriter方法,返回了LineRecordWriter对象。2.LineRecordWriter就是真正用来写数据的对象,一行一行的写。3.LineRecordWriter是RecordWriter的子类。public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {return new LineRecordWriter<>(fileOut, keyValueSeparator);}
3.6.2 自定义OutputFormat
1.为了控制最终文件的输出路径和输出格式,可以自定义OutputFormat
2.自定义OutputFormat步骤
(1)自定义一个类MyOutputFormat继承FileOutputFormat
(2)改写RecordWriter,具体改写输出数据的方法write()
3.6.3 自定义OutputFormat案例实操
1)需求
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
2)步骤
1.在Driver中配置自定义OutputFormat
//自定义OutputFormat,如果不自定义使用默认FileOutputFormat
job.setOutputFormatClass(MyOutputFormat.class);
2.自定义MyOutputFormat类
3.在MyOutputForma中自定义MyRecordWriter类
// 1.在Driver中配置自定义OutputFormat
package com.atguigu.mr.output;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class OutputDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(Text.class);//设置自定义的OutputFormat类 ---- 如果不指定使用TextOutputFormatjob.setOutputFormatClass(MyOutputFormat.class);FileInputFormat.setInputPaths(job,new Path("D:\\io\\input7"));FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output"));job.waitForCompletion(true);}
}
//2.自定义MyOutputFormat类package com.atguigu.mr.output;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*自定义OutputFormat类1.自定义一个类并继承FileOutputFormat,因为继承此类只需要重写getRecordWriter方法。而此方法正是我们所需要的。2.FileOutputFormat泛型的类型是Reducer输出的K,V类型。没有Reducer是Mapper输出的K,V没有Mapper也没有那么是InputFormat输出的K,V类型*/
public class MyOutputFormat extends FileOutputFormat<LongWritable, Text> {/*返回RecordWriter对象。*/@Overridepublic RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {//通过构造器传递Jobreturn new MyRecordWriter(job);}
}
package com.atguigu.mr.output;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*自定义RecordWriter类。*/
public class MyRecordWriter extends RecordWriter<LongWritable, Text> {private FSDataOutputStream atguigu;private FSDataOutputStream other;/*创建流*//*TaskAttemptContext job通过构造器传参,因为FileSystem.get(configuration)中的configuration在Driver中已经配置了, 只能进行调用保证在运行环境中的configuration保持一致*/public MyRecordWriter(TaskAttemptContext job){try {//1.创建流FileSystem fs = FileSystem.get(job.getConfiguration());//2.创建流 -- 输出流//2.1获取输出路径Path outputPath = FileOutputFormat.getOutputPath(job);//2.2创建流//将输出路径进行拼接 outputPath/atguigu.txt,因为Driver中配置了输出路径atguigu = fs.create(new Path(outputPath, "atguigu.txt"));other = fs.create(new Path(outputPath, "other.txt"));}catch (Exception e){//终止程序运行throw new RuntimeException(e.getMessage()); //将编译时异常转成运行异常}}/*** 写数据* @param key 偏移量* @param value 一行一行的数据。该方法在被循环调用,每调用一次传一行数据* @throws IOException* @throws InterruptedException*/@Overridepublic void write(LongWritable key, Text value) throws IOException, InterruptedException {//1.先判断网址有没有包含atguiguString address = value.toString() + "\n";if (address.contains("atguigu")){//写到atguigu.txtatguigu.write(address.getBytes());}else{//写到other.txtother.write(address.getBytes());}}/*在该方法中用来关闭资源 --- 当写操作结束以后才会调用此方法(框架调用的)*/@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {IOUtils.closeStream(atguigu);IOUtils.closeStream(other);}
}
3.7 Join多种应用
3.7.2 Reduce Join案例实操
需求:4-4和4-5合成4-6
表4-4 订单数据表t_order
id | pid | amount |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 2 |
1003 | 03 | 3 |
1004 | 01 | 4 |
1005 | 02 | 5 |
1006 | 03 | 6 |
表4-5 商品信息表t_product
pid | pname |
---|---|
01 | 小米 |
02 | 华为 |
03 | 格力 |
表4-6 最终数据形式
id | pname | amount |
---|---|---|
1001 | 小米 | 1 |
1004 | 小米 | 4 |
1002 | 华为 | 2 |
1005 | 华为 | 5 |
1003 | 格力 | 3 |
1006 | 格力 | 6 |
package com.atguigu.mr.reducejoin;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*JavaBean*/
public class OrderBean implements WritableComparable<OrderBean> {private String id;private String pid;private String amount;private String pname;public OrderBean(){}public OrderBean(String id, String pid, String amount, String pname) {this.id = id;this.pid = pid;this.amount = amount;this.pname = pname;}/*排序 : 先按照pid排序,pid相同再按照pname排序"" 1 "" 小米1001 1 1 ""1004 1 4 ""*/@Overridepublic int compareTo(OrderBean o) {int pidValue = this.pid.compareTo(o.pid);if (pidValue == 0){//说明pid相同,再按照pname排序return -this.pname.compareTo(o.pname);}return pidValue;}/*序列化*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeUTF(pname);out.writeUTF(amount);}/*反序列化*/@Overridepublic void readFields(DataInput in) throws IOException {id = in.readUTF();pid = in.readUTF();pname = in.readUTF();amount = in.readUTF();}@Overridepublic String toString() {return id + " " + pid + " " + amount + " " + pname;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public String getAmount() {return amount;}public void setAmount(String amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}}
package com.atguigu.mr.reducejoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class RJMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {private String fileName;/*该方法在任务开始的时候只执行一次,在map方法调用前执行。初始化*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//1.获取切片信息FileSplit fs = (FileSplit) context.getInputSplit();// 2.通过切片获取文件名fileName = fs.getPath().getName();}/*该方法在任务结束的时候只执行一次,在map方法调用后执行。关闭资源*/@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();//切割数据String[] info = line.split("\t");//创建对象OrderBean bean = new OrderBean();//封装if("order.txt".equals(fileName)){bean.setId(info[0]);bean.setPid(info[1]);bean.setAmount(info[2]);//注意:一定要给Pname一个空串 "" ---- 因为排序的时候先按照pid排序再按照Pname排序bean.setPname("");}else if("pd.txt".equals(fileName)){bean.setPid(info[0]);bean.setPname(info[1]);bean.setId("");bean.setAmount("");}//写出K,Vcontext.write(bean,NullWritable.get());}
}
//Reducer
package com.atguigu.mr.reducejoin;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Iterator;public class RJReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {/*key valuenull 1 null 小米 null1001 1 1 null null1004 1 4 null null*/@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {//获取迭代器对象。Iterator<NullWritable> iterator = values.iterator();//1.获取该组数据中的第一行iterator.next();//指针下移并返回指针指向的数据//2.获取pname:小米String pname = key.getPname();//3.将剩余的所有的数据的pname全部进行替换while(iterator.hasNext()){//是否有下一条数据iterator.next();//指针下移并返回指针指向的数据//替换pname值key.setPname(pname);//4.将k,v写出去context.write(key,NullWritable.get());}}
}
//Driver
package com.atguigu.mr.reducejoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class RJDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());//设置自定义分组类job.setGroupingComparatorClass(MyCompartor.class);job.setMapperClass(RJMapper.class);job.setReducerClass(RJReducer.class);job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new Path("D:\\io\\input16"));FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output17"));job.waitForCompletion(true);}
}
3.9 数据清洗(ETL)
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序
需求:
去除日志中字段个数小于等于11的日志(注意是字段) 如图是4个字段
//Mapperpackage com.atguigu.mrfinal.ETL;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;import java.io.IOException;public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();//切割后将切割的元素存入数组 String[] lineSize = line.split(" ");//判断数组的长度if(lineSize.length>11){//输出value,即切割后字段长度大于11的Textcontext.write(value,NullWritable.get());}}
}
//Driver
package com.atguigu.mrfinal.ETL;import com.atguigu.mrfinal.writable.FMapper;
import com.atguigu.mrfinal.writable.FReducer;
import com.atguigu.mrfinal.writable.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.创建Job对象Job job = Job.getInstance(new Configuration());//2.给Job对象的属性赋值//2.1设置Jar加载路径(如果是本地模式,可以不设置)job.setJarByClass(FDriver.class);//2.2设置Mapper和Reducer类job.setMapperClass(Mapper.class);//2.3设置Mapper输出的k,v的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//2.5设置数据的输入和输出路径FileInputFormat.setInputPaths(job,new Path("D:\\io\\input"));//注意:输出路径不能存在!!!!!!!!!!!!FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output"));//3.提交Job对象(执行Job)/*参数 :是否打印进度返回值 :如果是true则表示job执行成功*/job.waitForCompletion(true);}
}
第4章 Yarn资源调度器
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
分配资源给MR执行MR操作
4.1 Yarn基本架构
1.客户端向RM提交job
2.RM接到 Job后将job放入任务队列
3.NM去RM中领取任务创建Container,并产生MR AppMstr
4.MR AppMstr向RM申请资源执行MT
5.RM将任务放入队列
6.NM来领取RM中的任务创建Container,并执行MT
7.MR AppMstr将Map程序启动脚本发给执行MT的NM
8.MT执行完毕后,MR AppMstr向RM申请资源执行RT
9.Reducer向Map获取相应分区的数据后归并排序
NodeManager:管理单个节点上的资源
ResourceManager:管理所有的NodeManager
App Mstr:任务(MapTask,ReduceTask)的老大
负责数据的切分:将数据进行实际的切分后给到MapTask
Container:MT和RT都是在Container中运行
4.2 Yarn工作机制
(1)YarnRunner向ResourceManager申请一个Application。
(2)RM将该应用程序的资源路径返回给YarnRunner。
(3)该程序将运行所需资源提交到HDFS上。
(4)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager去领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
案例所需材料:
链接:https://pan.baidu.com/s/1jKsZAiHEFormIJJjRW_mBg
提取码:a2a2
–来自百度网盘超级会员V1的分享
参考资料:
尚硅谷大数据技术之Hadoop(MapReduce&Yarn)
(作者:尚硅谷大数据研发部)
这篇关于大数据技术Hadoop(MapReduceYarn)详细又精简的总结(案例、代码、图片)齐全的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!