hadoop入门7:自定义GroupingComparator进行分组

2024-06-07 12:32

本文主要是介绍hadoop入门7:自定义GroupingComparator进行分组,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

摘要:

GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的bean,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了。 我们需要理清楚的还有map阶段你的几个自定义: parttioner中的getPartition()这个是map阶段自定义分区, bean中定义CopmareTo()是在溢出和merge时用来来排序的。 

demo数据:

订单id            金额     产品名称

order_234578,4789,笔记本
order_123456,7789,笔记本
order_123456,1789,手机
order_234578,4789,手机
order_123456,3789,笔记本
order_00001,4789,笔记本
order_00002,7789,笔记本
order_00001,5789,洗衣机
order_00002,17789,服务器

根据上面的订单信息需要求出每一个订单中成交金额最大的一笔交易。

设计思路:

1、利用“订单id和金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

groupingcomparator代码:

package com.zsy.mr.groupingcomparator;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class ItemIdGroupingComparator extends WritableComparator {protected ItemIdGroupingComparator() {super(OrderBean.class,true);}@SuppressWarnings("rawtypes")@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean aBean = (OrderBean)a;OrderBean bOrderBean = (OrderBean)b;return aBean.getItemId().compareTo(bOrderBean.getItemId());}
}

Partitioner代码:

package com.zsy.mr.groupingcomparator;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {//相同的id会发往相同的partitioner,产生的分区数是根据用户设置的reducetask数保持一致,即numReduceTasks数是用户在设置的数字@Overridepublic int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {return (key.getItemId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

OrderBean代码:

package com.zsy.mr.groupingcomparator;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class OrderBean implements WritableComparable<OrderBean> {private String itemId;private String productName;private Float price;@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemId);out.writeUTF(productName);out.writeFloat(price);}@Overridepublic void readFields(DataInput in) throws IOException {this.itemId = in.readUTF();this.productName = in.readUTF();this.price = in.readFloat();}@Overridepublic int compareTo(OrderBean o) {// 如果订单号相同,在进行价格比较int result = this.itemId.compareTo(o.getItemId());if (result == 0) {result = -this.price.compareTo(o.price);}return result;}public String getItemId() {return itemId;}public void setItemId(String itemId) {this.itemId = itemId;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}@Overridepublic String toString() {return "itemId=" + itemId + ", productName=" + productName + ", price=" + price;}}

GroupingCommparatorSort代码:

package com.zsy.mr.groupingcomparator;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.zsy.mr.groupingcomparator.GroupingCommparatorSort.GroupingCommparatorSortMapper.GroupingCommparatorSortReducer;public class GroupingCommparatorSort {static class GroupingCommparatorSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {OrderBean orderBean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)throws IOException, InterruptedException {String[] str = value.toString().split(",");orderBean.setItemId(str[0]);orderBean.setPrice(Float.parseFloat(str[1]));orderBean.setProductName(str[2]);context.write(orderBean, NullWritable.get());}static class GroupingCommparatorSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {@Overrideprotected void reduce(OrderBean arg0, Iterable<NullWritable> arg1,Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)throws IOException, InterruptedException {context.write(arg0, NullWritable.get());}}}/*** main:(这里用一句话描述这个方法的作用).* * @author zhaoshouyun* @param args* @since 1.0*/public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.set("mapreduce.framework.name", "yarn");* conf.set("yarn.resoucemanger.hostname", "hadoop01");*/Job job = Job.getInstance(conf);job.setJarByClass(GroupingCommparatorSort.class);// 指定本业务job要使用的业务类job.setMapperClass(GroupingCommparatorSortMapper.class);job.setReducerClass(GroupingCommparatorSortReducer.class);// 指定mapper输出的k v类型 如果map的输出和reduce的输出一样,只需要设置输出即可// job.setMapOutputKeyClass(Text.class);// job.setMapOutputValueClass(FlowBean.class);// 指定最终输出kv类型(reduce输出类型)job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);// 指定job的输入文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));// 指定job的输出结果目录FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置setGroupingComparatorClassjob.setGroupingComparatorClass(ItemIdGroupingComparator.class);// 设置自定义的setPartitionerClassjob.setPartitionerClass(ItemIdPartitioner.class);// 设置reducetask任务数为2job.setNumReduceTasks(2);// 将job中配置的相关参数,以及job所有的java类所在 的jar包,提交给yarn去运行// job.submit();无结果返回,建议不使用它boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

运行结果-part-00000:

运行结果-part-00001:

这篇关于hadoop入门7:自定义GroupingComparator进行分组的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1039183

相关文章

通过Spring层面进行事务回滚的实现

《通过Spring层面进行事务回滚的实现》本文主要介绍了通过Spring层面进行事务回滚的实现,包括声明式事务和编程式事务,具有一定的参考价值,感兴趣的可以了解一下... 目录声明式事务回滚:1. 基础注解配置2. 指定回滚异常类型3. ​不回滚特殊场景编程式事务回滚:1. ​使用 TransactionT

Java中使用Hutool进行AES加密解密的方法举例

《Java中使用Hutool进行AES加密解密的方法举例》AES是一种对称加密,所谓对称加密就是加密与解密使用的秘钥是一个,下面:本文主要介绍Java中使用Hutool进行AES加密解密的相关资料... 目录前言一、Hutool简介与引入1.1 Hutool简介1.2 引入Hutool二、AES加密解密基础

Mysql如何将数据按照年月分组的统计

《Mysql如何将数据按照年月分组的统计》:本文主要介绍Mysql如何将数据按照年月分组的统计方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql将数据按照年月分组的统计要的效果方案总结Mysql将数据按照年月分组的统计要的效果方案① 使用 DA

SpringSecurity6.0 如何通过JWTtoken进行认证授权

《SpringSecurity6.0如何通过JWTtoken进行认证授权》:本文主要介绍SpringSecurity6.0通过JWTtoken进行认证授权的过程,本文给大家介绍的非常详细,感兴趣... 目录项目依赖认证UserDetailService生成JWT token权限控制小结之前写过一个文章,从S

使用Jackson进行JSON生成与解析的新手指南

《使用Jackson进行JSON生成与解析的新手指南》这篇文章主要为大家详细介绍了如何使用Jackson进行JSON生成与解析处理,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 核心依赖2. 基础用法2.1 对象转 jsON(序列化)2.2 JSON 转对象(反序列化)3.

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

Python使用自带的base64库进行base64编码和解码

《Python使用自带的base64库进行base64编码和解码》在Python中,处理数据的编码和解码是数据传输和存储中非常普遍的需求,其中,Base64是一种常用的编码方案,本文我将详细介绍如何使... 目录引言使用python的base64库进行编码和解码编码函数解码函数Base64编码的应用场景注意

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

Java进行文件格式校验的方案详解

《Java进行文件格式校验的方案详解》这篇文章主要为大家详细介绍了Java中进行文件格式校验的相关方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、背景异常现象原因排查用户的无心之过二、解决方案Magandroidic Number判断主流检测库对比Tika的使用区分zip