Hadoop之MapReduce框架Partitioner分区

2023-10-30 19:32

本文主要是介绍Hadoop之MapReduce框架Partitioner分区,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 Partitioner分区

1.1 Partitioner分区描述

 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中;按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner

 

1.2 MapReduce运行原理

 

  

MapReduce流程图 - 1.1

 

1.3 数据需求

将文件input_data.txt中的用户数据,根据用户的手机号,按照手机号进行分区。

附件地址链接:http://download.csdn.net/detail/yuan_xw/9459721

1.4 实现步骤

1、    编写UserMapper类,分析用户数据信息。

2、    编写UserReducer类,计算用户的年收数据信息。

3、    编写ProviderPartitioner类,Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理。

 

1.5 UserMapper代码编写

UserMapper类,读取和分析用户数据。

[java]  view plain copy
  1. package com.hadoop.mapreduce;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.LongWritable;  
  6. import org.apache.hadoop.io.NullWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9.   
  10. import entity.UserEntity;  
  11.   
  12. /*  
  13.  * 继承Mapper类需要定义四个输出、输出类型泛型: 
  14.  * 四个泛型类型分别代表: 
  15.  * KeyIn        Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...) 
  16.  * ValueIn      Mapper的输入数据的Value,这里是每行文字 
  17.  * KeyOut       Mapper的输出数据的Key,这里是序列化对象UserEntity 
  18.  * ValueOut     Mapper的输出数据的Value,不返回任何值 
  19.  *  
  20.  * Writable接口是一个实现了序列化协议的序列化对象。 
  21.  * 在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。 
  22.  * LongWritable类型:Hadoop.io对Long类型的封装类型 
  23.  */  
  24. public class UserMapper extends Mapper<LongWritable, Text, UserEntity, NullWritable> {  
  25.   
  26.     private UserEntity userEntity = new UserEntity();  
  27.       
  28.     @Override  
  29.     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, UserEntity, NullWritable>.Context context)  
  30.             throws IOException, InterruptedException {  
  31.   
  32.         //将每行的数据以空格切分数据,获得每个字段数据 1 135****9365 林*彬 2484 北京市昌平区北七家东三旗村  
  33.         String[] fields = value.toString().split("\t");  
  34.           
  35.         // 赋值userEntity  
  36.         userEntity.set(Integer.parseInt(fields[0]), fields[1], fields[2],Double.parseDouble(fields[3]), fields[4],0.00);  
  37.           
  38.         // 将对象序列化  
  39.         context.write(userEntity,NullWritable.get());  
  40.     }  
  41. }  


1.6 UserReducer代码编写

UserReducer类,计算用户的年收数据信息。

[java]  view plain copy
  1. package com.hadoop.mapreduce;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.NullWritable;  
  6. import org.apache.hadoop.mapreduce.Reducer;  
  7.   
  8. import entity.UserEntity;  
  9.   
  10. /* 
  11.  * Reducer需要定义四个输出、输出类型泛型: 
  12.  * 四个泛型类型分别代表: 
  13.  * KeyIn        Reducer的输入数据的Key,这里是序列化对象UserEntity 
  14.  * ValueIn      Reducer的输入数据的Value,这里是NullWritable 
  15.  * KeyOut       Reducer的输出数据的Key,这里是序列化对象UserEntity 
  16.  * ValueOut     Reducer的输出数据的Value,NullWritable 
  17.  */  
  18. public class UserReducer extends Reducer<UserEntity, NullWritable, UserEntity, NullWritable>{  
  19.   
  20.     @Override  
  21.     protected void reduce(UserEntity userEntity, Iterable<NullWritable> values,  
  22.             Reducer<UserEntity, NullWritable, UserEntity, NullWritable>.Context context)  
  23.                     throws IOException, InterruptedException {  
  24.           
  25.             // 年收入 = 月收入 * 12  四舍五入  
  26.             String yearIncome = String.format("%.2f", userEntity.getMonthIncome() * 12);  
  27.             userEntity.setYearIncome(Double.parseDouble(yearIncome));  
  28.             context.write(userEntity, NullWritable.get());  
  29.     }  
  30. }  

1.7 ProviderPartitioner代码编写

Partitioner用于划分键值空间(key space)。

 Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理。分区的数量与一个作业的reduce任务的数量是一样的。它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。HashPartitioner是默认的Partitioner。

[java]  view plain copy
  1. package com.hadoop.mapreduce;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.Map;  
  5.   
  6. import org.apache.hadoop.io.NullWritable;  
  7. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;  
  8.   
  9. import entity.UserEntity;  
  10.   
  11. /* 
  12.  * Partitioner用于划分键值空间(key space)。 
  13.  * Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理。 
  14.  * 分区的数量与一个作业的reduce任务的数量是一样的。 
  15.  * 它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。 
  16.  * HashPartitioner是默认的 Partitioner。  
  17.  */  
  18.   
  19. /** 
  20.  * 继承抽象类Partitioner,实现自定义的getPartition()方法 
  21.  * 通过job.setPartitionerClass()来设置自定义的Partitioner; 
  22.  */  
  23. public class ProviderPartitioner extends HashPartitioner<UserEntity, NullWritable> {  
  24.       
  25.     // 声明providerMap,并且在static静态块中初始化  
  26.     private static Map<String, Integer> providerMap = new HashMap<String, Integer>();  
  27.     static {  
  28.         providerMap.put("130"0);  
  29.         providerMap.put("133"0);  
  30.         providerMap.put("134"0);  
  31.         providerMap.put("135"0);  
  32.         providerMap.put("136"0);  
  33.         providerMap.put("137"0);  
  34.         providerMap.put("138"0);  
  35.         providerMap.put("139"0);  
  36.         providerMap.put("150"1);  
  37.         providerMap.put("151"1);  
  38.         providerMap.put("153"1);  
  39.         providerMap.put("158"1);  
  40.         providerMap.put("159"1);  
  41.         providerMap.put("170"2);  
  42.         providerMap.put("180"3);  
  43.         providerMap.put("181"3);  
  44.         providerMap.put("183"3);  
  45.         providerMap.put("185"3);  
  46.         providerMap.put("186"3);  
  47.         providerMap.put("187"3);  
  48.         providerMap.put("188"3);  
  49.         providerMap.put("189"3);  
  50.     }  
  51.   
  52.     /** 
  53.      * 实现自定义的getPartition()方法,自定义分区规则 
  54.      */  
  55.     @Override  
  56.     public int getPartition(UserEntity key, NullWritable value, int numPartitions) {  
  57.         String prefix = key.getMobile().substring(03);  
  58.         return providerMap.get(prefix);  
  59.     }  
  60. }  

1.8 UserAnalysis代码编写

UserAnalysis类,程序执行入口类。

[java]  view plain copy
  1. package com.hadoop.mapreduce;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.NullWritable;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  11.   
  12. import entity.UserEntity;  
  13.   
  14. public class UserAnalysis {  
  15.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
  16.         // 创建job对象  
  17.         Job job = Job.getInstance(new Configuration());  
  18.         // 指定程序的入口  
  19.         job.setJarByClass(UserAnalysis.class);  
  20.           
  21.         // 指定自定义的Mapper阶段的任务处理类  
  22.         job.setMapperClass(UserMapper.class);  
  23.         job.setMapOutputKeyClass(UserEntity.class);  
  24.         job.setMapOutputValueClass(NullWritable.class);  
  25.         // 数据HDFS文件服务器读取数据路径  
  26.         FileInputFormat.setInputPaths(job, new Path("/mapreduce/partitioner/input_data.txt"));  
  27.   
  28.         // 指定自定义的Reducer阶段的任务处理类  
  29.         job.setReducerClass(UserReducer.class);  
  30.         // 设置最后输出结果的Key和Value的类型  
  31.         job.setOutputKeyClass(UserEntity.class);  
  32.         job.setOutputValueClass(NullWritable.class);  
  33.           
  34.         // 设置定义分区的处理类  
  35.         job.setPartitionerClass(ProviderPartitioner.class);  
  36.           
  37.         // 默认ReduceTasks数是1  
  38.         // 我们对手机号分成4类,所以应该设置为4  
  39.         job.setNumReduceTasks(4);  
  40.           
  41.         // 将计算的结果上传到HDFS服务  
  42.         FileOutputFormat.setOutputPath(job, new Path("/mapreduce/partitioner/output_data"));  
  43.   
  44.         // 执行提交job方法,直到完成,参数true打印进度和详情  
  45.         job.waitForCompletion(true);  
  46.         System.out.println("Finished");  
  47.     }  
  48. }  

转自:http://blog.csdn.net/yuan_xw/article/details/50867819

这篇关于Hadoop之MapReduce框架Partitioner分区的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/310123

相关文章

修改若依框架Token的过期时间问题

《修改若依框架Token的过期时间问题》本文介绍了如何修改若依框架中Token的过期时间,通过修改`application.yml`文件中的配置来实现,默认单位为分钟,希望此经验对大家有所帮助,也欢迎... 目录修改若依框架Token的过期时间修改Token的过期时间关闭Token的过期时js间总结修改若依

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

MyBatis框架实现一个简单的数据查询操作

《MyBatis框架实现一个简单的数据查询操作》本文介绍了MyBatis框架下进行数据查询操作的详细步骤,括创建实体类、编写SQL标签、配置Mapper、开启驼峰命名映射以及执行SQL语句等,感兴趣的... 基于在前面几章我们已经学习了对MyBATis进行环境配置,并利用SqlSessionFactory核

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个?

跨平台系列 cross-plateform 跨平台应用程序-01-概览 cross-plateform 跨平台应用程序-02-有哪些主流技术栈? cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个? cross-plateform 跨平台应用程序-04-React Native 介绍 cross-plateform 跨平台应用程序-05-Flutte

Spring框架5 - 容器的扩展功能 (ApplicationContext)

private static ApplicationContext applicationContext;static {applicationContext = new ClassPathXmlApplicationContext("bean.xml");} BeanFactory的功能扩展类ApplicationContext进行深度的分析。ApplicationConext与 BeanF

数据治理框架-ISO数据治理标准

引言 "数据治理"并不是一个新的概念,国内外有很多组织专注于数据治理理论和实践的研究。目前国际上,主要的数据治理框架有ISO数据治理标准、GDI数据治理框架、DAMA数据治理管理框架等。 ISO数据治理标准 改标准阐述了数据治理的标准、基本原则和数据治理模型,是一套完整的数据治理方法论。 ISO/IEC 38505标准的数据治理方法论的核心内容如下: 数据治理的目标:促进组织高效、合理地