大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客案例

本文主要是介绍大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术之_05_Hadoop学习_04_MapReduce

    • 第6章 Hadoop企业优化(重中之重)
      • 6.1 MapReduce 跑的慢的原因
      • 6.2 MapReduce优化方法
        • 6.2.1 数据输入
        • 6.2.2 Map阶段
        • 6.2.3 Reduce阶段
        • 6.2.4 I/O传输
        • 6.2.5 数据倾斜问题
        • 6.2.6 常用的调优参数
      • 6.3 HDFS小文件优化方法
        • 6.3.1 HDFS小文件弊端
        • 6.3.2 HDFS小文件解决方案
    • 第7章 MapReduce扩展案例
      • 7.1 倒排索引案例(多job串联)
      • 7.2 TopN案例
      • 7.3 找博客共同粉丝案例
    • 第8章 常见错误及解决方案

第6章 Hadoop企业优化(重中之重)

6.1 MapReduce 跑的慢的原因

[外链图片转存失败(img-dpqE4AtA-1562298828595)(https://s2.ax1x.com/2019/02/20/kRiBEn.png)]

6.2 MapReduce优化方法

  MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

6.2.1 数据输入

6.2.2 Map阶段

[外链图片转存失败(img-shQMMTWo-1562298828597)(https://s2.ax1x.com/2019/02/20/kRir40.png)]

6.2.3 Reduce阶段

[外链图片转存失败(img-JFRLNu8D-1562298828597)(https://s2.ax1x.com/2019/02/20/kRiDNq.png)]
[外链图片转存失败(img-d0RJ80q0-1562298828597)(https://s2.ax1x.com/2019/02/20/kRiwHs.png)]

6.2.4 I/O传输

6.2.5 数据倾斜问题


[外链图片转存失败(img-Ebtw4v9h-1562298828598)(https://s2.ax1x.com/2019/02/20/kRiWDJ.png)]

6.2.6 常用的调优参数

1、资源相关参数
(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)
[外链图片转存失败(img-gSkrzR9Z-1562298828598)(https://s2.ax1x.com/2019/02/20/kRicgU.png)]
(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)
[外链图片转存失败(img-moX35UEj-1562298828598)(https://s2.ax1x.com/2019/02/20/kRigvF.png)]
(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)
[外链图片转存失败(img-YvNrOdth-1562298828598)(https://s2.ax1x.com/2019/02/20/kRi4ER.png)]
2、容错相关参数(MapReduce性能优化)
[外链图片转存失败(img-2YeSz33k-1562298828598)(https://s2.ax1x.com/2019/02/20/kRifb9.png)]

6.3 HDFS小文件优化方法

6.3.1 HDFS小文件弊端

  HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢

6.3.2 HDFS小文件解决方案

小文件的优化无非以下几种方式:
  (1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
  (2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
  (3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。
[外链图片转存失败(img-9hC5v0UA-1562298828598)(https://s2.ax1x.com/2019/02/20/kRiTC6.png)]

[外链图片转存失败(img-wFYJ8zhM-1562298828599)(https://s2.ax1x.com/2019/02/20/kRi78K.png)]

第7章 MapReduce扩展案例

7.1 倒排索引案例(多job串联)

1、需求
  有大量的文本(文档、网页),需要建立搜索索引,如下图所示。
(1)数据输入
  
(2)期望输出数据

atguigu	c.txt-->2	b.txt-->2	a.txt-->3	
pingping	c.txt-->1	b.txt-->3	a.txt-->1	
ss	c.txt-->1	b.txt-->1	a.txt-->2	

2、需求分析

3、第一次处理
(1)第一次处理,编写OneIndexMapper类

package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> {String name;Text k = new Text();IntWritable v = new IntWritable();@Overrideprotected void setup(Context context)throws IOException, InterruptedException {// 获取文件名称FileSplit split = (FileSplit) context.getInputSplit();name = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// atguigu pingping// 1、获取一行数据String line = value.toString();// 2、切割String[] fields = line.split(" ");for (String word : fields) {// 3、拼接k.set(word + "---" + name); // atguigu---a.txtv.set(1);// 4、写出context.write(k, v); // <atguigu---a.txt,1>}}
}

(2)第一次处理,编写OneIndexReducer类

package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// 1、累加求和int sum = 0;for (IntWritable value : values) {sum += value.get();}v.set(sum);// 2、写出context.write(key, v);}
}

(3)第一次处理,编写OneIndexDriver类

package com.atguigu.mr.index;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OneIndexDriver {public static void main(String[] args) throws Exception {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "d:/temp/atguigu/0529/input/inputoneindex", "d:/temp/atguigu/0529/output17" };Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(OneIndexDriver.class);job.setMapperClass(OneIndexMapper.class);job.setReducerClass(OneIndexReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}
}

(4)查看第一次输出结果

atguigu---a.txt	3
atguigu---b.txt	2
atguigu---c.txt	2
pingping---a.txt	1
pingping---b.txt	3
pingping---c.txt	1
ss---a.txt	2
ss---b.txt	1
ss---c.txt	1

4、第二次处理
(1)第二次处理,编写TwoIndexMapper类

package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text> {Text k = new Text();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 输入为:// atguigu--a.txt  	3// atguigu--b.txt  	2// atguigu--c.txt  	2// 输出为:(atguigu,a.txt  	3)atguigu	c.txt-->2	b.txt-->2	a.txt-->3// 1、获取一行数据String line = value.toString();// 2、用“--”切割String[] fields = line.split("--"); // 结果为:(atguigu,a.txt  	3)// 3、封装数据k.set(fields[0]);v.set(fields[1]);// 4、写出context.write(k, v);}
}

(2)第二次处理,编写TwoIndexReducer类

package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 输入为:(atguigu,a.txt  	3)(atguigu,b.txt  	2)(atguigu,c.txt  	2)// 输出为:atguigu	c.txt-->2	b.txt-->2	a.txt-->3StringBuffer sb = new StringBuffer();// 拼接for (Text value : values) {sb.append(value.toString().replace("\t", "-->") + "\t");}// 封装v.set(sb.toString());// 写出context.write(key, v);}
}

(3)第二次处理,编写TwoIndexDriver类

package com.atguigu.mr.index;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TwoIndexDriver {public static void main(String[] args) throws Exception {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "d:/temp/atguigu/0529/input/inputtowindex", "d:/temp/atguigu/0529/output18" };Configuration config = new Configuration();Job job = Job.getInstance(config);job.setJarByClass(TwoIndexDriver.class);job.setMapperClass(TwoIndexMapper.class);job.setReducerClass(TwoIndexReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

(4)第二次查看最终结果

atguigu	c.txt-->2	b.txt-->2	a.txt-->3
pingping	c.txt-->1	b.txt-->3	a.txt-->1
ss	c.txt-->1	b.txt-->1	a.txt-->2

7.2 TopN案例

1、需求
  对需求2.3输出结果进行加工,输出流量使用量在前10的用户信息。
(1)输入数据 (2)输出数据
[外链图片转存失败(img-vviDipWg-1562298828600)(https://s2.ax1x.com/2019/02/20/kRiI4x.png)]
2、需求分析
  同上图。
3、实现代码
(1)编写FlowBean类

package com.atguigu.mr.topn;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; // 上行流量private long downFlow; // 下行流量private long sumFlow; // 总流量public FlowBean() {super();}public FlowBean(long upFlow, long downFlow) {super();this.upFlow = upFlow;this.downFlow = downFlow;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}public void set(long downFlow2, long upFlow2) {downFlow = downFlow2;upFlow = upFlow2;sumFlow = downFlow2 + upFlow2;}@Overridepublic int compareTo(FlowBean bean) {int result;// 按照总流量大小,倒序排列if (this.sumFlow > bean.getSumFlow()) {result = -1;} else if (this.sumFlow < bean.getSumFlow()) {result = 1;} else {result = 0;}return result;}
}

(2)编写TopNMapper类

package com.atguigu.mr.topn;import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text> {// 定义一个TreeMap作为存储数据的容器(天然按key排序,降序)private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();private FlowBean kBean;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {kBean = new FlowBean();Text v = new Text();// 13470253144	180	180	360// 1、获取一行String line = value.toString();// 2、切割String[] fields = line.split("\t");// 3、封装数据String phoneNum = fields[0];long upFlow = Long.parseLong(fields[1]);long downFlow = Long.parseLong(fields[2]);long sumFlow = Long.parseLong(fields[3]);kBean.setUpFlow(upFlow);kBean.setDownFlow(downFlow);kBean.setSumFlow(sumFlow);v.set(phoneNum);// 4、向TreeMap中添加数据flowMap.put(kBean, v);// 5、限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据if (flowMap.size() > 10) {// flowMap.remove(flowMap.firstKey()); // 升序删除第一个flowMap.remove(flowMap.lastKey()); // 降序删除最后一个}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {// 6、遍历TreeMap集合,输出数据Iterator<FlowBean> bean = flowMap.keySet().iterator();while (bean.hasNext()) {FlowBean k = bean.next();context.write(k, flowMap.get(k));}}
}

(3)编写TopNReducer类

package com.atguigu.mr.topn;import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {// 定义一个TreeMap作为存储数据的容器(天然按key排序)TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {for (Text value : values) {FlowBean bean = new FlowBean();bean.set(key.getDownFlow(), key.getUpFlow());// 1、向treeMap集合中添加数据flowMap.put(bean, new Text(value));// 2、限制TreeMap数据量,超过10条就删除掉流量最小的一条数据if (flowMap.size() > 10) {// flowMap.remove(flowMap.firstKey()); // 升序删除第一个flowMap.remove(flowMap.lastKey()); // 降序删除最后一个}}}@Overrideprotected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context)throws IOException, InterruptedException {// 3、遍历集合,输出数据Iterator<FlowBean> bean = flowMap.keySet().iterator();while (bean.hasNext()) {FlowBean v = bean.next();context.write(new Text(flowMap.get(v)), v);}}
}

(4)编写TopNDriver类

package com.atguigu.mr.topn;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TopNDriver {public static void main(String[] args) throws Exception {args = new String[] { "d:/temp/atguigu/0529/input/inputtopn", "d:/temp/atguigu/0529/output20" };// 1、获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 6、指定本程序的jar包所在的本地路径job.setJarByClass(TopNDriver.class);// 2、指定本业务job要使用的mapper/reducer业务类job.setMapperClass(TopNMapper.class);job.setReducerClass(TopNReducer.class);// 3、指定mapper输出数据的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 4、指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 5、指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

7.3 找博客共同粉丝案例

1、需求
  以下是博客的粉丝列表数据,冒号前是一个用户,冒号后是该用户的所有粉丝(数据中的粉丝关系是单向的)
  求出哪些人两两之间有共同粉丝,及他俩的共同粉丝都有谁?
(1)数据输入

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

2、需求分析
先求出A、B、C、…等是谁的粉丝
第一次输出结果

A	I,K,C,B,G,F,H,O,D,
B	A,F,J,E,
C	A,E,B,H,F,G,K,
D	G,C,K,A,L,F,E,H,
E	G,M,L,H,A,F,B,D,
F	L,M,D,C,G,A,
G	M,
H	O,
I	O,C,
J	O,
K	B,
L	D,E,
M	E,F,
O	A,H,I,J,F,

第二次输出结果

A-B	E C 
A-C	D F 
A-D	E F 
A-E	D B C 
A-F	O B C D E 
A-G	F E C D 
A-H	E C D O 
A-I	O 
A-J	O B 
A-K	D C 
A-L	F E D 
A-M	E F 
B-C	A 
B-D	A E 
B-E	C 
B-F	E A C 
B-G	C E A 
B-H	A E C 
B-I	A 
B-K	C A 
B-L	E 
B-M	E 
B-O	A 
C-D	A F 
C-E	D 
C-F	D A 
C-G	D F A 
C-H	D A 
C-I	A 
C-K	A D 
C-L	D F 
C-M	F 
C-O	I A 
D-E	L 
D-F	A E 
D-G	E A F 
D-H	A E 
D-I	A 
D-K	A 
D-L	E F 
D-M	F E 
D-O	A 
E-F	D M C B 
E-G	C D 
E-H	C D 
E-J	B 
E-K	C D 
E-L	D 
F-G	D C A E 
F-H	A D O E C 
F-I	O A 
F-J	B O 
F-K	D C A 
F-L	E D 
F-M	E 
F-O	A 
G-H	D C E A 
G-I	A 
G-K	D A C 
G-L	D F E 
G-M	E F 
G-O	A 
H-I	O A 
H-J	O 
H-K	A C D 
H-L	D E 
H-M	E 
H-O	A 
I-J	O 
I-K	A 
I-O	A 
K-L	D 
K-O	A 
L-M	E F

3、代码实现
(1)第一次Mapper类

package com.atguigu.mr.friends;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{Text k = new Text();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// A:B,C,D,F,E,O// 1、获取一行String line = value.toString();// 2、切割String[] fields = line.split(":");// 3、获取用户和用户的粉丝String user = fields[0]; // person = AString[] friends = fields[1].split(","); // firends = [B, C, D, F, E, O]// 封装v.set(user);// 4、写出去for (String friend : friends) {k.set(friend);context.write(k, v); // <粉丝,用户>  <B,A><C,A><D,A>}}
}

(2)第一次Reducer类

package com.atguigu.mr.friends;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text> {Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();// <B,A><C,A><D,A>// 1、拼接for (Text user : values) {sb.append(user).append(","); // }v.set(sb.toString());// 2、写出context.write(key, v); // A	I,K,C,B,G,F,H,O,D,}
}

(3)第一次Driver类

package com.atguigu.mr.friends;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OneShareFriendsDriver {public static void main(String[] args) throws Exception {// 0、根据自己电脑路径重新配置args = new String[] { "d:/temp/atguigu/0529/input/inputfriend", "d:/temp/atguigu/0529/output21" };// 1、获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2、指定jar包运行的路径job.setJarByClass(OneShareFriendsDriver.class);// 3、指定map/reduce使用的类job.setMapperClass(OneShareFriendsMapper.class);job.setReducerClass(OneShareFriendsReducer.class);// 4、指定map输出的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 5、指定最终输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 6、指定job的输入原始所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

(4)第二次Mapper类

package com.atguigu.mr.friends;import java.io.IOException;
import java.util.Arrays;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// A   I,K,C,B,G,F,H,O,D,// 粉丝    用户,用户,用户// 1、获取一行String line = value.toString();// 2、切割String[] friend_users = line.split("\t");// AString friend = friend_users[0];// I,K,C,B,G,F,H,O,D,String[] users = friend_users[1].split(",");Arrays.sort(users); // B,C,D,F,G,H,I,K,Ofor (int i = 0; i < users.length - 1; i++) {for (int j = i + 1; j < users.length; j++) {context.write(new Text(users[i] + "-" + users[j]), new Text(friend));}}}
}

(5)第二次Reducer类

package com.atguigu.mr.friends;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text friend : values) {sb.append(friend).append(" ");}context.write(key, new Text(sb.toString()));}
}

(6)第二次Driver类

package com.atguigu.mr.friends;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TwoShareFriendsDriver {public static void main(String[] args) throws Exception {// 0、根据自己电脑路径重新配置args = new String[] { "d:/temp/atguigu/0529/input/inputfriends", "d:/temp/atguigu/0529/output22" };// 1、获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2、指定jar包运行的路径job.setJarByClass(TwoShareFriendsDriver.class);// 3、指定map/reduce使用的类job.setMapperClass(TwoShareFriendsMapper.class);job.setReducerClass(TwoShareFriendsReducer.class);// 4、指定map输出的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//// 5、指定最终输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 6、指定job的输入原始所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

第8章 常见错误及解决方案

1)导包容易出错。尤其Text和CombineTextInputFormat。
2)Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable,报的错误是类型转换异常。
3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656(4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。
4)如果分区数不是1,但是reducetask为1,是否执行分区过程。
  答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
5)在Windows环境编译的jar包导入到Linux环境中运行:

hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output

报如下错误:

Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0

  原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。
  解决方案:统一jdk版本。
6)缓存pd.txt小文件案例中,报找不到pd.txt文件
  原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。
7)报类型转换异常。
  通常都是在驱动函数中设置Map输出和最终输出时编写错误。
  Map输出的key如果没有排序,也会报类型转换异常。
8)集群中运行wc.jar时出现了无法获得输入文件。
  原因:WordCount案例的输入文件不能放在 HDFS 集群的根目录。
9)出现了如下相关异常

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Zat org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)

解决方案一:拷贝hadoop.dll文件(文件位置:D:\work\Hadoop\hadoop-2.7.2\bin)到Windows目录C:\Windows\System32。个别同学电脑还需要修改Hadoop源码。
解决方案二:创建如下包名,并将NativeIO.java拷贝到该包名下

10)自定义Outputformat时,注意在RecordWirter中的close()方法必须关闭流资源。否则输出的文件内容中数据为空。

    @Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (atguigufos != null) {atguigufos.close();}if (otherfos != null) {otherfos.close();}}

我的GitHub地址:https://github.com/heizemingjun
我的博客园地址:https://www.cnblogs.com/chenmingjun
我的蚂蚁笔记博客地址:https://blog.leanote.com/chenmingjun
Copyright ©2018~2019 黑泽君
【转载文章务必保留出处和署名,谢谢!】

这篇关于大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1100804

相关文章

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k