HadoopSpark解决二次排序问题(Hadoop篇)

2024-05-27 12:58

本文主要是介绍HadoopSpark解决二次排序问题(Hadoop篇),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题描述

  二次排序就是对每一个key对应的value进行排序,也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们期望的输出结果是

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

  但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:

2014-2  64
2015-1  21,4,3,24
2015-2  0,35,-43
2015-3  46,56
2015-4  5

解决方案

  针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
  1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key;
  2、我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v)
  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
  4、自定义分组函数,将k相同的键值对当作一个分组。
  文字比较枯燥,我们来看看下面实例:
  1、原始数据是

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们将年、月组成key(natural key),总数作为value,结果变成:

(2015-1,24)
(2015-3,56)
(2015-1,3)
(2015-2,-43)
(2015-4,5)
(2015-3,46)
(2014-2,64)
(2015-1,4)
(2015-1,21)
(2015-2,35)
(2015-2,0)

  2、将value和key(natural key)组成新的key(composite key),如下:

((2015-1,24),24)
((2015-3,56),56)
((2015-1,3),3)
((2015-2,-43),-43)
((2015-4,5),5)
((2015-3,46),46)
((2014-2,64),64)
((2015-1,4),4)
((2015-1,21),21)
((2015-2,35),35)
((2015-2,0),0)

  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:

[((2014-2,64),64)]
[((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,56),56),((2015-3,46),46)]
[((2015-4,5),5)]

  4、自定义组排序函数,结果如下:

[((2014-2,64),64)]
[((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,46),46),((2015-3,56),56)]
[((2015-4,5),5)]

  5、自定义分组函数,结果如下:

((2014-2,64),(64))
((2015-1,24),(3,4,21,24))
((2015-2,35),(-43,0,35))
((2015-3,56),(46,56))
((2015-4,5),(5))

  6、最后输出的结果就是我们要的:

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

代码实例

  下面将贴出使用MapReduce解决这个问题的代码:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class Entry implements WritableComparable<Entry> {
     private String yearMonth;
     private int count;
     public Entry() {
     }
     @Override
     public int compareTo(Entry entry) {
         int result = this .yearMonth.compareTo(entry.getYearMonth());
         if (result == 0 ) {
             result = compare(count, entry.getCount());
         }
         return result;
     }
     @Override
     public void write(DataOutput dataOutput) throws IOException {
         dataOutput.writeUTF(yearMonth);
         dataOutput.writeInt(count);
     }
     @Override
     public void readFields(DataInput dataInput) throws IOException {
         this .yearMonth = dataInput.readUTF();
         this .count = dataInput.readInt();
     }
     public String getYearMonth() {
         return yearMonth;
     }
     public void setYearMonth(String yearMonth) {
         this .yearMonth = yearMonth;
     }
     public int getCount() {
         return count;
     }
     public void setCount( int count) {
         this .count = count;
     }
     public static int compare( int a, int b) {
         return a < b ? - 1 : (a > b ? 1 : 0 );
     }
     @Override
     public String toString() {
         return yearMonth;
     }
}

  上面就是将旧的Key(natural key)和Value组合成新的Key(composite key)的代码,接下来看下自定义的分区类:

package com.iteblog;
import org.apache.hadoop.mapreduce.Partitioner;
public class EntryPartitioner extends Partitioner<Entry, Integer> {
     @Override
     public int getPartition(Entry entry, Integer integer, int numberPartitions) {
         return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
     }
}

  这个类使得natural key相同的数据分派到同一个Reduce中。然后看下自定义分组类:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class EntryGroupingComparator extends WritableComparator {
     public EntryGroupingComparator() {
         super (Entry. class , true );
     }
     @Override
     public int compare(WritableComparable a, WritableComparable b) {
         Entry a1 = (Entry) a;
         Entry b1 = (Entry) b;
         return a1.getYearMonth().compareTo(b1.getYearMonth());
     }
}

  只要是natural key相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类

public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> {
     private Entry entry = new Entry();
     private Text value = new Text();
     @Override
     protected void map(LongWritable key, Text lines, Context context)
             throws IOException, InterruptedException {
         String line = lines.toString();
         String[] tokens = line.split( "," );
         // YYYY = tokens[0]
         // MM = tokens[1]
         // count = tokens[2]
         String yearMonth = tokens[ 0 ] + "-" + tokens[ 1 ];
         int count = Integer.parseInt(tokens[ 2 ]);
         entry.setYearMonth(yearMonth);
         entry.setCount(count);
         value.set(tokens[ 2 ]);
         context.write(entry, value);
     }
}

  其实就是解析每一行的数据,然后将旧的Key(natural key)和Value组合成新的Key(composite key)。接下来看下Reduce类实现

public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> {
     @Override
     protected void reduce(Entry key, Iterable<Text> values, Context context)
             throws IOException, InterruptedException {
         StringBuilder builder = new StringBuilder();
         for (Text value : values) {
             builder.append(value.toString());
             builder.append( "," );
         }
         context.write(key, new Text(builder.toString()));
     }
}

builder存储的就是排序好的Value序列,最后来看看启动程序的使用:

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Iteblog. class );
job.setJobName( "SecondarySort" );
FileInputFormat.setInputPaths(job, new Path(args[ 0 ]));
FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
job.setOutputKeyClass(Entry. class );
job.setOutputValueClass(Text. class );
job.setMapperClass(SecondarySortMapper. class );
job.setReducerClass(SecondarySortReducer. class );
job.setPartitionerClass(EntryPartitioner. class );
job.setGroupingComparatorClass(EntryGroupingComparator. class );

关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:

[root@iteblog.com /hadoop ] # bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar 
     com.iteblog.Main  /iteblog/data .txt /iteblog/output
[root@iteblog.com /hadoop ] # bin/hadoop fs -cat /iteblog/output/pa*
2014-2  64,
2015-1  3,4,21,24,
2015-2  -43,0,35,
2015-3  46,56,
2015-4  5,
明天我将使用Spark来解决这个问题,敬请关注本博客。
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)

这篇关于HadoopSpark解决二次排序问题(Hadoop篇)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007543

相关文章

mybatis和mybatis-plus设置值为null不起作用问题及解决

《mybatis和mybatis-plus设置值为null不起作用问题及解决》Mybatis-Plus的FieldStrategy主要用于控制新增、更新和查询时对空值的处理策略,通过配置不同的策略类型... 目录MyBATis-plusFieldStrategy作用FieldStrategy类型每种策略的作

linux下多个硬盘划分到同一挂载点问题

《linux下多个硬盘划分到同一挂载点问题》在Linux系统中,将多个硬盘划分到同一挂载点需要通过逻辑卷管理(LVM)来实现,首先,需要将物理存储设备(如硬盘分区)创建为物理卷,然后,将这些物理卷组成... 目录linux下多个硬盘划分到同一挂载点需要明确的几个概念硬盘插上默认的是非lvm总结Linux下多

Python Jupyter Notebook导包报错问题及解决

《PythonJupyterNotebook导包报错问题及解决》在conda环境中安装包后,JupyterNotebook导入时出现ImportError,可能是由于包版本不对应或版本太高,解决方... 目录问题解决方法重新安装Jupyter NoteBook 更改Kernel总结问题在conda上安装了

pip install jupyterlab失败的原因问题及探索

《pipinstalljupyterlab失败的原因问题及探索》在学习Yolo模型时,尝试安装JupyterLab但遇到错误,错误提示缺少Rust和Cargo编译环境,因为pywinpty包需要它... 目录背景问题解决方案总结背景最近在学习Yolo模型,然后其中要下载jupyter(有点LSVmu像一个

Goland debug失效详细解决步骤(合集)

《Golanddebug失效详细解决步骤(合集)》今天用Goland开发时,打断点,以debug方式运行,发现程序并没有断住,程序跳过了断点,直接运行结束,网上搜寻了大量文章,最后得以解决,特此在这... 目录Bug:Goland debug失效详细解决步骤【合集】情况一:Go或Goland架构不对情况二:

解决jupyterLab打开后出现Config option `template_path`not recognized by `ExporterCollapsibleHeadings`问题

《解决jupyterLab打开后出现Configoption`template_path`notrecognizedby`ExporterCollapsibleHeadings`问题》在Ju... 目录jupyterLab打开后出现“templandroidate_path”相关问题这是 tensorflo

如何解决Pycharm编辑内容时有光标的问题

《如何解决Pycharm编辑内容时有光标的问题》文章介绍了如何在PyCharm中配置VimEmulator插件,包括检查插件是否已安装、下载插件以及安装IdeaVim插件的步骤... 目录Pycharm编辑内容时有光标1.如果Vim Emulator前面有对勾2.www.chinasem.cn如果tools工

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

Java多线程父线程向子线程传值问题及解决

《Java多线程父线程向子线程传值问题及解决》文章总结了5种解决父子之间数据传递困扰的解决方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDeco... 目录1 背景2 ThreadLocal+TaskDecorator3 RequestContextH

关于Spring @Bean 相同加载顺序不同结果不同的问题记录

《关于Spring@Bean相同加载顺序不同结果不同的问题记录》本文主要探讨了在Spring5.1.3.RELEASE版本下,当有两个全注解类定义相同类型的Bean时,由于加载顺序不同,最终生成的... 目录问题说明测试输出1测试输出2@Bean注解的BeanDefiChina编程nition加入时机总结问题说明