本文主要是介绍HadoopSpark解决二次排序问题(Hadoop篇),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
问题描述
二次排序就是对每一个key对应的value进行排序,也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):
[root@iteblog.com /tmp ] # vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0 |
我们期望的输出结果是
2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5 |
但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:
2014-2 64 2015-1 21,4,3,24 2015-2 0,35,-43 2015-3 46,56 2015-4 5 |
解决方案
针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError
,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key
;
2、我们可以将k和v组合成新的key(可以称为composite key
),也就是((k,v), v)
3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
4、自定义分组函数,将k相同的键值对当作一个分组。
文字比较枯燥,我们来看看下面实例:
1、原始数据是
[root@iteblog.com /tmp ] # vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0 |
我们将年、月组成key(natural key
),总数作为value,结果变成:
(2015-1,24) (2015-3,56) (2015-1,3) (2015-2,-43) (2015-4,5) (2015-3,46) (2014-2,64) (2015-1,4) (2015-1,21) (2015-2,35) (2015-2,0) |
2、将value和key(natural key
)组成新的key(composite key
),如下:
((2015-1,24),24) ((2015-3,56),56) ((2015-1,3),3) ((2015-2,-43),-43) ((2015-4,5),5) ((2015-3,46),46) ((2014-2,64),64) ((2015-1,4),4) ((2015-1,21),21) ((2015-2,35),35) ((2015-2,0),0) |
3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:
[((2014-2,64),64)] [((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,56),56),((2015-3,46),46)] [((2015-4,5),5)] |
4、自定义组排序函数,结果如下:
[((2014-2,64),64)] [((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,46),46),((2015-3,56),56)] [((2015-4,5),5)] |
5、自定义分组函数,结果如下:
((2014-2,64),(64)) ((2015-1,24),(3,4,21,24)) ((2015-2,35),(-43,0,35)) ((2015-3,56),(46,56)) ((2015-4,5),(5)) |
6、最后输出的结果就是我们要的:
2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5 |
代码实例
下面将贴出使用MapReduce解决这个问题的代码:
package com.iteblog; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午23:49 * bolg: http://www.iteblog.com * 本文地址:http://www.iteblog.com/archives/1415 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ public class Entry implements WritableComparable<Entry> { private String yearMonth; private int count; public Entry() { } @Override public int compareTo(Entry entry) { int result = this .yearMonth.compareTo(entry.getYearMonth()); if (result == 0 ) { result = compare(count, entry.getCount()); } return result; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(yearMonth); dataOutput.writeInt(count); } @Override public void readFields(DataInput dataInput) throws IOException { this .yearMonth = dataInput.readUTF(); this .count = dataInput.readInt(); } public String getYearMonth() { return yearMonth; } public void setYearMonth(String yearMonth) { this .yearMonth = yearMonth; } public int getCount() { return count; } public void setCount( int count) { this .count = count; } public static int compare( int a, int b) { return a < b ? - 1 : (a > b ? 1 : 0 ); } @Override public String toString() { return yearMonth; } } |
上面就是将旧的Key(natural key
)和Value组合成新的Key(composite key
)的代码,接下来看下自定义的分区类:
package com.iteblog; import org.apache.hadoop.mapreduce.Partitioner; public class EntryPartitioner extends Partitioner<Entry, Integer> { @Override public int getPartition(Entry entry, Integer integer, int numberPartitions) { return Math.abs((entry.getYearMonth().hashCode() % numberPartitions)); } } |
这个类使得natural key
相同的数据分派到同一个Reduce中。然后看下自定义分组类:
package com.iteblog; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午23:49 * bolg: http://www.iteblog.com * 本文地址:http://www.iteblog.com/archives/1415 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ public class EntryGroupingComparator extends WritableComparator { public EntryGroupingComparator() { super (Entry. class , true ); } @Override public int compare(WritableComparable a, WritableComparable b) { Entry a1 = (Entry) a; Entry b1 = (Entry) b; return a1.getYearMonth().compareTo(b1.getYearMonth()); } } |
只要是natural key
相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类
public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> { private Entry entry = new Entry(); private Text value = new Text(); @Override protected void map(LongWritable key, Text lines, Context context) throws IOException, InterruptedException { String line = lines.toString(); String[] tokens = line.split( "," ); // YYYY = tokens[0] // MM = tokens[1] // count = tokens[2] String yearMonth = tokens[ 0 ] + "-" + tokens[ 1 ]; int count = Integer.parseInt(tokens[ 2 ]); entry.setYearMonth(yearMonth); entry.setCount(count); value.set(tokens[ 2 ]); context.write(entry, value); } } |
其实就是解析每一行的数据,然后将旧的Key(natural key
)和Value组合成新的Key(composite key
)。接下来看下Reduce类实现
public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> { @Override protected void reduce(Entry key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder builder = new StringBuilder(); for (Text value : values) { builder.append(value.toString()); builder.append( "," ); } context.write(key, new Text(builder.toString())); } } |
builder存储的就是排序好的Value序列,最后来看看启动程序的使用:
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Iteblog. class ); job.setJobName( "SecondarySort" ); FileInputFormat.setInputPaths(job, new Path(args[ 0 ])); FileOutputFormat.setOutputPath(job, new Path(args[ 1 ])); job.setOutputKeyClass(Entry. class ); job.setOutputValueClass(Text. class ); job.setMapperClass(SecondarySortMapper. class ); job.setReducerClass(SecondarySortReducer. class ); job.setPartitionerClass(EntryPartitioner. class ); job.setGroupingComparatorClass(EntryGroupingComparator. class ); |
关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:
[root@iteblog.com /hadoop ] # bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar com.iteblog.Main /iteblog/data .txt /iteblog/output [root@iteblog.com /hadoop ] # bin/hadoop fs -cat /iteblog/output/pa* 2014-2 64, 2015-1 3,4,21,24, 2015-2 -43,0,35, 2015-3 46,56, 2015-4 5, |
尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)
这篇关于HadoopSpark解决二次排序问题(Hadoop篇)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!