HadoopSpark解决二次排序问题(Hadoop篇)

2024-05-27 12:58

本文主要是介绍HadoopSpark解决二次排序问题(Hadoop篇),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题描述

  二次排序就是对每一个key对应的value进行排序,也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们期望的输出结果是

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

  但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:

2014-2  64
2015-1  21,4,3,24
2015-2  0,35,-43
2015-3  46,56
2015-4  5

解决方案

  针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
  1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key;
  2、我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v)
  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
  4、自定义分组函数,将k相同的键值对当作一个分组。
  文字比较枯燥,我们来看看下面实例:
  1、原始数据是

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们将年、月组成key(natural key),总数作为value,结果变成:

(2015-1,24)
(2015-3,56)
(2015-1,3)
(2015-2,-43)
(2015-4,5)
(2015-3,46)
(2014-2,64)
(2015-1,4)
(2015-1,21)
(2015-2,35)
(2015-2,0)

  2、将value和key(natural key)组成新的key(composite key),如下:

((2015-1,24),24)
((2015-3,56),56)
((2015-1,3),3)
((2015-2,-43),-43)
((2015-4,5),5)
((2015-3,46),46)
((2014-2,64),64)
((2015-1,4),4)
((2015-1,21),21)
((2015-2,35),35)
((2015-2,0),0)

  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:

[((2014-2,64),64)]
[((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,56),56),((2015-3,46),46)]
[((2015-4,5),5)]

  4、自定义组排序函数,结果如下:

[((2014-2,64),64)]
[((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,46),46),((2015-3,56),56)]
[((2015-4,5),5)]

  5、自定义分组函数,结果如下:

((2014-2,64),(64))
((2015-1,24),(3,4,21,24))
((2015-2,35),(-43,0,35))
((2015-3,56),(46,56))
((2015-4,5),(5))

  6、最后输出的结果就是我们要的:

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

代码实例

  下面将贴出使用MapReduce解决这个问题的代码:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class Entry implements WritableComparable<Entry> {
     private String yearMonth;
     private int count;
     public Entry() {
     }
     @Override
     public int compareTo(Entry entry) {
         int result = this .yearMonth.compareTo(entry.getYearMonth());
         if (result == 0 ) {
             result = compare(count, entry.getCount());
         }
         return result;
     }
     @Override
     public void write(DataOutput dataOutput) throws IOException {
         dataOutput.writeUTF(yearMonth);
         dataOutput.writeInt(count);
     }
     @Override
     public void readFields(DataInput dataInput) throws IOException {
         this .yearMonth = dataInput.readUTF();
         this .count = dataInput.readInt();
     }
     public String getYearMonth() {
         return yearMonth;
     }
     public void setYearMonth(String yearMonth) {
         this .yearMonth = yearMonth;
     }
     public int getCount() {
         return count;
     }
     public void setCount( int count) {
         this .count = count;
     }
     public static int compare( int a, int b) {
         return a < b ? - 1 : (a > b ? 1 : 0 );
     }
     @Override
     public String toString() {
         return yearMonth;
     }
}

  上面就是将旧的Key(natural key)和Value组合成新的Key(composite key)的代码,接下来看下自定义的分区类:

package com.iteblog;
import org.apache.hadoop.mapreduce.Partitioner;
public class EntryPartitioner extends Partitioner<Entry, Integer> {
     @Override
     public int getPartition(Entry entry, Integer integer, int numberPartitions) {
         return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
     }
}

  这个类使得natural key相同的数据分派到同一个Reduce中。然后看下自定义分组类:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class EntryGroupingComparator extends WritableComparator {
     public EntryGroupingComparator() {
         super (Entry. class , true );
     }
     @Override
     public int compare(WritableComparable a, WritableComparable b) {
         Entry a1 = (Entry) a;
         Entry b1 = (Entry) b;
         return a1.getYearMonth().compareTo(b1.getYearMonth());
     }
}

  只要是natural key相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类

public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> {
     private Entry entry = new Entry();
     private Text value = new Text();
     @Override
     protected void map(LongWritable key, Text lines, Context context)
             throws IOException, InterruptedException {
         String line = lines.toString();
         String[] tokens = line.split( "," );
         // YYYY = tokens[0]
         // MM = tokens[1]
         // count = tokens[2]
         String yearMonth = tokens[ 0 ] + "-" + tokens[ 1 ];
         int count = Integer.parseInt(tokens[ 2 ]);
         entry.setYearMonth(yearMonth);
         entry.setCount(count);
         value.set(tokens[ 2 ]);
         context.write(entry, value);
     }
}

  其实就是解析每一行的数据,然后将旧的Key(natural key)和Value组合成新的Key(composite key)。接下来看下Reduce类实现

public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> {
     @Override
     protected void reduce(Entry key, Iterable<Text> values, Context context)
             throws IOException, InterruptedException {
         StringBuilder builder = new StringBuilder();
         for (Text value : values) {
             builder.append(value.toString());
             builder.append( "," );
         }
         context.write(key, new Text(builder.toString()));
     }
}

builder存储的就是排序好的Value序列,最后来看看启动程序的使用:

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Iteblog. class );
job.setJobName( "SecondarySort" );
FileInputFormat.setInputPaths(job, new Path(args[ 0 ]));
FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
job.setOutputKeyClass(Entry. class );
job.setOutputValueClass(Text. class );
job.setMapperClass(SecondarySortMapper. class );
job.setReducerClass(SecondarySortReducer. class );
job.setPartitionerClass(EntryPartitioner. class );
job.setGroupingComparatorClass(EntryGroupingComparator. class );

关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:

[root@iteblog.com /hadoop ] # bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar 
     com.iteblog.Main  /iteblog/data .txt /iteblog/output
[root@iteblog.com /hadoop ] # bin/hadoop fs -cat /iteblog/output/pa*
2014-2  64,
2015-1  3,4,21,24,
2015-2  -43,0,35,
2015-3  46,56,
2015-4  5,
明天我将使用Spark来解决这个问题,敬请关注本博客。
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)

这篇关于HadoopSpark解决二次排序问题(Hadoop篇)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007543

相关文章

MybatisGenerator文件生成不出对应文件的问题

《MybatisGenerator文件生成不出对应文件的问题》本文介绍了使用MybatisGenerator生成文件时遇到的问题及解决方法,主要步骤包括检查目标表是否存在、是否能连接到数据库、配置生成... 目录MyBATisGenerator 文件生成不出对应文件先在项目结构里引入“targetProje

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

numpy求解线性代数相关问题

《numpy求解线性代数相关问题》本文主要介绍了numpy求解线性代数相关问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 在numpy中有numpy.array类型和numpy.mat类型,前者是数组类型,后者是矩阵类型。数组

解决systemctl reload nginx重启Nginx服务报错:Job for nginx.service invalid问题

《解决systemctlreloadnginx重启Nginx服务报错:Jobfornginx.serviceinvalid问题》文章描述了通过`systemctlstatusnginx.se... 目录systemctl reload nginx重启Nginx服务报错:Job for nginx.javas

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

Mysql DATETIME 毫秒坑的解决

《MysqlDATETIME毫秒坑的解决》本文主要介绍了MysqlDATETIME毫秒坑的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 今天写代码突发一个诡异的 bug,代码逻辑大概如下。1. 新增退款单记录boolean save = s

Python中lambda排序的六种方法

《Python中lambda排序的六种方法》本文主要介绍了Python中使用lambda函数进行排序的六种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录1.对单个变量进行排序2. 对多个变量进行排序3. 降序排列4. 单独降序1.对单个变量进行排序

vue解决子组件样式覆盖问题scoped deep

《vue解决子组件样式覆盖问题scopeddeep》文章主要介绍了在Vue项目中处理全局样式和局部样式的方法,包括使用scoped属性和深度选择器(/deep/)来覆盖子组件的样式,作者建议所有组件... 目录前言scoped分析deep分析使用总结所有组件必须加scoped父组件覆盖子组件使用deep前言

解决Cron定时任务中Pytest脚本无法发送邮件的问题

《解决Cron定时任务中Pytest脚本无法发送邮件的问题》文章探讨解决在Cron定时任务中运行Pytest脚本时邮件发送失败的问题,先优化环境变量,再检查Pytest邮件配置,接着配置文件确保SMT... 目录引言1. 环境变量优化:确保Cron任务可以正确执行解决方案:1.1. 创建一个脚本1.2. 修