2019独角兽企业重金招聘Python工程师标准>>>
MapReduce的设计,最重要的是要找准key,然后制定一系列的数据处理流程。MapReduce的Map中,会把key相同的分配到同一个reduce中,对于key的选择,可以找到某个相同的因素。以下面的几个例子说明。
查找共同好友
有一份多个人的好友名单,求哪两个人之间有共同好友,共同好友是谁。测试数据如下:
A B,C,D,E,F,O
B A,C,E,K
C F,A,D,I
D A,E,F,L
E B,C,D,M,L
F A,B,C,D,E,O,M
G A,C,D,E,F
H A,C,D,E,O
I A,O
J B,O
K A,C,D
L D,E,F
M E,F,G
O A,H,I,H
问题分析问题要求解的是共同好友,如A有好友D,C也有好友D,那么这里的共同因素就是共同好友D,因此会想到把这个共同好友作为一个key,而这个共同好友的所有owners作为value,这样在reduce中,循环遍历values两两配对就可以求解。
查找互粉的情况
问题分析如A的好友中有B,B的好友中有A,则这种情况就是互粉。这种情况下,不变的共同因素其实是互相之间的关系:我们可以将A和B组成一对,当作一个key,如“A-B”,value则是此种关系对的数目。如果某个关系对的数目等于2,则表明A是B的好友,B也是A的好友。从而就是互粉的情况。代码如下。
package mutualfriend;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MutualFriendMapper extends Mapper<LongWritable, Text, Text, LongWritable> {Text out = new Text();LongWritable times = new LongWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] tmp = line.split(" ");String owner = tmp[0];String[] friends = tmp[1].split(",");for (String friend : friends) {if (owner.compareTo(friend) <= 0) {out.set(owner + "-" + friend);} else {out.set(friend + "-" + owner);}context.write(out, times);}}
}package mutualfriend;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MutualFriendReducer extends Reducer<Text, LongWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (LongWritable value : values) {sum += value.get();}if (sum >= 2) {context.write(key, null);}}
}package mutualfriend;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class MutualFriendDriver extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://master:9000");conf.set("mapreduce.framework.name", "local");Job job = Job.getInstance(conf, "MutualFriend");job.setJarByClass(getClass());job.setMapperClass(MutualFriendMapper.class);job.setReducerClass(MutualFriendReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job, new Path("mutualfriend/input"));FileOutputFormat.setOutputPath(job, new Path("mutualfriend/output"));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new MutualFriendDriver(), args);System.out.println(exitCode);}
}
代码运行结果如下:
A-B
A-C
A-D
A-F
A-O
B-E
C-F
D-E
D-F
D-L
E-L
E-M
F-M
H-O
I-O