hadoop入门7:自定义GroupingComparator进行分组

2024-06-07 12:32

本文主要是介绍hadoop入门7:自定义GroupingComparator进行分组,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

摘要:

GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的bean,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了。 我们需要理清楚的还有map阶段你的几个自定义: parttioner中的getPartition()这个是map阶段自定义分区, bean中定义CopmareTo()是在溢出和merge时用来来排序的。 

demo数据:

订单id            金额     产品名称

order_234578,4789,笔记本
order_123456,7789,笔记本
order_123456,1789,手机
order_234578,4789,手机
order_123456,3789,笔记本
order_00001,4789,笔记本
order_00002,7789,笔记本
order_00001,5789,洗衣机
order_00002,17789,服务器

根据上面的订单信息需要求出每一个订单中成交金额最大的一笔交易。

设计思路:

1、利用“订单id和金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

groupingcomparator代码:

package com.zsy.mr.groupingcomparator;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class ItemIdGroupingComparator extends WritableComparator {protected ItemIdGroupingComparator() {super(OrderBean.class,true);}@SuppressWarnings("rawtypes")@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean aBean = (OrderBean)a;OrderBean bOrderBean = (OrderBean)b;return aBean.getItemId().compareTo(bOrderBean.getItemId());}
}

Partitioner代码:

package com.zsy.mr.groupingcomparator;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {//相同的id会发往相同的partitioner,产生的分区数是根据用户设置的reducetask数保持一致,即numReduceTasks数是用户在设置的数字@Overridepublic int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {return (key.getItemId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

OrderBean代码:

package com.zsy.mr.groupingcomparator;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class OrderBean implements WritableComparable<OrderBean> {private String itemId;private String productName;private Float price;@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemId);out.writeUTF(productName);out.writeFloat(price);}@Overridepublic void readFields(DataInput in) throws IOException {this.itemId = in.readUTF();this.productName = in.readUTF();this.price = in.readFloat();}@Overridepublic int compareTo(OrderBean o) {// 如果订单号相同,在进行价格比较int result = this.itemId.compareTo(o.getItemId());if (result == 0) {result = -this.price.compareTo(o.price);}return result;}public String getItemId() {return itemId;}public void setItemId(String itemId) {this.itemId = itemId;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}@Overridepublic String toString() {return "itemId=" + itemId + ", productName=" + productName + ", price=" + price;}}

GroupingCommparatorSort代码:

package com.zsy.mr.groupingcomparator;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.zsy.mr.groupingcomparator.GroupingCommparatorSort.GroupingCommparatorSortMapper.GroupingCommparatorSortReducer;public class GroupingCommparatorSort {static class GroupingCommparatorSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {OrderBean orderBean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)throws IOException, InterruptedException {String[] str = value.toString().split(",");orderBean.setItemId(str[0]);orderBean.setPrice(Float.parseFloat(str[1]));orderBean.setProductName(str[2]);context.write(orderBean, NullWritable.get());}static class GroupingCommparatorSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {@Overrideprotected void reduce(OrderBean arg0, Iterable<NullWritable> arg1,Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)throws IOException, InterruptedException {context.write(arg0, NullWritable.get());}}}/*** main:(这里用一句话描述这个方法的作用).* * @author zhaoshouyun* @param args* @since 1.0*/public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.set("mapreduce.framework.name", "yarn");* conf.set("yarn.resoucemanger.hostname", "hadoop01");*/Job job = Job.getInstance(conf);job.setJarByClass(GroupingCommparatorSort.class);// 指定本业务job要使用的业务类job.setMapperClass(GroupingCommparatorSortMapper.class);job.setReducerClass(GroupingCommparatorSortReducer.class);// 指定mapper输出的k v类型 如果map的输出和reduce的输出一样,只需要设置输出即可// job.setMapOutputKeyClass(Text.class);// job.setMapOutputValueClass(FlowBean.class);// 指定最终输出kv类型(reduce输出类型)job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);// 指定job的输入文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));// 指定job的输出结果目录FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置setGroupingComparatorClassjob.setGroupingComparatorClass(ItemIdGroupingComparator.class);// 设置自定义的setPartitionerClassjob.setPartitionerClass(ItemIdPartitioner.class);// 设置reducetask任务数为2job.setNumReduceTasks(2);// 将job中配置的相关参数,以及job所有的java类所在 的jar包,提交给yarn去运行// job.submit();无结果返回,建议不使用它boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

运行结果-part-00000:

运行结果-part-00001:

这篇关于hadoop入门7:自定义GroupingComparator进行分组的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1039183

相关文章

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Linux使用fdisk进行磁盘的相关操作

《Linux使用fdisk进行磁盘的相关操作》fdisk命令是Linux中用于管理磁盘分区的强大文本实用程序,这篇文章主要为大家详细介绍了如何使用fdisk进行磁盘的相关操作,需要的可以了解下... 目录简介基本语法示例用法列出所有分区查看指定磁盘的区分管理指定的磁盘进入交互式模式创建一个新的分区删除一个存

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

使用Python进行文件读写操作的基本方法

《使用Python进行文件读写操作的基本方法》今天的内容来介绍Python中进行文件读写操作的方法,这在学习Python时是必不可少的技术点,希望可以帮助到正在学习python的小伙伴,以下是Pyth... 目录一、文件读取:二、文件写入:三、文件追加:四、文件读写的二进制模式:五、使用 json 模块读写

使用zabbix进行监控网络设备流量

《使用zabbix进行监控网络设备流量》这篇文章主要为大家详细介绍了如何使用zabbix进行监控网络设备流量,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录安装zabbix配置ENSP环境配置zabbix实行监控交换机测试一台liunx服务器,这里使用的为Ubuntu22.04(

在Pandas中进行数据重命名的方法示例

《在Pandas中进行数据重命名的方法示例》Pandas作为Python中最流行的数据处理库,提供了强大的数据操作功能,其中数据重命名是常见且基础的操作之一,本文将通过简洁明了的讲解和丰富的代码示例,... 目录一、引言二、Pandas rename方法简介三、列名重命名3.1 使用字典进行列名重命名3.编

python安装完成后可以进行的后续步骤和注意事项小结

《python安装完成后可以进行的后续步骤和注意事项小结》本文详细介绍了安装Python3后的后续步骤,包括验证安装、配置环境、安装包、创建和运行脚本,以及使用虚拟环境,还强调了注意事项,如系统更新、... 目录验证安装配置环境(可选)安装python包创建和运行Python脚本虚拟环境(可选)注意事项安装

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

使用C#如何创建人名或其他物体随机分组

《使用C#如何创建人名或其他物体随机分组》文章描述了一个随机分配人员到多个团队的代码示例,包括将人员列表随机化并根据组数分配到不同组,最后按组号排序显示结果... 目录C#创建人名或其他物体随机分组此示例使用以下代码将人员分配到组代码首先将lstPeople ListBox总结C#创建人名或其他物体随机分组

SpringBoot使用minio进行文件管理的流程步骤

《SpringBoot使用minio进行文件管理的流程步骤》MinIO是一个高性能的对象存储系统,兼容AmazonS3API,该软件设计用于处理非结构化数据,如图片、视频、日志文件以及备份数据等,本文... 目录一、拉取minio镜像二、创建配置文件和上传文件的目录三、启动容器四、浏览器登录 minio五、