MapReduce—平均工资

2023-11-10 01:50
文章标签 mapreduce 平均工资

本文主要是介绍MapReduce—平均工资,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MapReduce—平均工资

        • MapReduce—平均工资
          • 1. 需求分析
          • 2. 解答思路

MapReduce—平均工资
我这里是使用集群去处理这个日志数据,数据在我的github上,默认使用maven去管理所有的jar包
  • github地址
1. 需求分析

按照所给数据文件去统计每个部门的人数,最高工资,最低工资和平均工资

需求统计的日志数据如下:

在这里插入图片描述

需要将每个部门的人数,工资进行统计。比如10号部门有3个人,最高工资是5000元,最低工资是1300元,平均工资是2916.666666666667元。则以如下形式进行显示:

10 3 5000 1300 2916.666666666667

2. 解答思路

1.因为要统计部门的人数以及工资,那么在最后的reduce阶段,进行汇总时,可以设置一个计数器,在进行汇总时,就可以计算出部门人数,所以,我们只需要日志数据中的两列,分别是部门编号和工资,将部门编号作为key,工资作为value2.在reduce输出阶段,因为要输出人数,最高工资,最低工资和平均工资,一共四列,所以需要将计算出的结果拼接成一个Text进行输出3.在处理过程中我使用Partitioner将数据分开通过不同的reduce去处理4.如果需要本地运行,记得注释掉avgsal文件中的23/24/25行,并将47行和50行的文件路径修改为自己所使用的文件路径5.因为在数据扭转的过程中,<K2, V2>和<K3, V3>的数据类型发生了变化,所以要在avgsal中设置map端所输出的数据类型,也就是要指定<K2, V2>的数据类型

mapper端代码

package com.yangqi.avgsal;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author xiaoer* @date 2019/11/12 12:44*/
public class MyMapper extends Mapper<LongWritable, Text, IntWritable, DoubleWritable> {IntWritable num = new IntWritable();DoubleWritable result = new DoubleWritable();/*** 针对每一行的数据,都会执行一次下面的map方法** @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] split = line.split(",");String str1 = split[split.length - 1];String str2 = split[split.length - 3];num.set(Integer.parseInt(str1));result.set(Double.parseDouble(str2));context.write(num, result);}
}

reduce端代码

package com.yangqi.avgsal;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author xiaoer* @date 2019/11/12 12:47*/
public class MyReducer extends Reducer<IntWritable, DoubleWritable, IntWritable, Text> {Text result = new Text();@Overrideprotected void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {// 记录部门的人数int num = 0;// 记录部门的工资和double sum = 0;// 记录最大工资double max = Double.MIN_VALUE;// 记录最小工资double min = Double.MAX_VALUE;for (DoubleWritable value : values) {num++;sum += value.get();if (max < value.get()) {max = value.get();}if (min > value.get()) {min = value.get();}}// 将结果进行拼接,拼接成Text进行输出String str = "\t" + num + "" + "\t" + max + "" + "\t" + min + "\t" + (sum / num);result.set(str);// 以<K3, V3>形式进行写出context.write(key, result);}
}

partitioner端代码

package com.yangqi.avgsal;import org.apache.hadoop.mapreduce.Partitioner;/*** @author xiaoer* @date 2019/11/13 11:54*/
public class MyPartitioner<K, V> extends Partitioner<K, V> {public int getPartition(K key, V value, int numPartitions) {int emp = Integer.parseInt(key.toString());if (emp == 10 || emp == 30) {return 0;} elsereturn 1;}
}

avgsal

package com.yangqi.avgsal;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author xiaoer* @date 2019/11/12 12:50*/
public class AvgSal {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 获取配置对象:读取四个默认配置文件Configuration conf = new Configuration();System.setProperty("HADOOP_USER_NAME", "hadoop");conf.set("mapreduce.app-submission.cross-platform", "true");conf.set("mapred.jar", "AvgSal/target/AvgSal-1.0-SNAPSHOT.jar");FileSystem fs = FileSystem.get(conf);// 创建Job实例对象Job job = Job.getInstance(conf, "avgsal");// 用于指定驱动类型job.setJarByClass(AvgSal.class);// 用于指定Map阶段的类型job.setMapperClass(MyMapper.class);// 用于指定Reduce阶段的类型job.setReducerClass(MyReducer.class);job.setNumReduceTasks(2);// 设置Partition的类型job.setPartitionerClass(MyPartitioner.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(DoubleWritable.class);// 设置K3的输出类型job.setOutputKeyClass(IntWritable.class);// 设置V3的输出类型job.setOutputValueClass(Text.class);// 设置要统计的文件的路径FileInputFormat.addInputPath(job, new Path("/emp"));// FileInputFormat.addInputPath(job, new Path(args[0]));// 设置文件的输出路径Path path = new Path("/output");// Path path = new Path(args[1]);if (fs.exists(path)) {fs.delete(path, true);}FileOutputFormat.setOutputPath(job, path);// 等到作业执行,并退出System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

这篇关于MapReduce—平均工资的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/379713

相关文章

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式

MapReduce算法 – 反转排序(Order Inversion)

译者注:在刚开始翻译的时候,我将Order Inversion按照字面意思翻译成“反序”或者“倒序”,但是翻译完整篇文章之后,我感觉到,将Order Inversion翻译成反序模式是不恰当的,根据本文的内容,很显然,Inversion并非是将顺序倒排的意思,而是如同Spring的IOC一样,表明的是一种控制权的反转。Spring将对象的实例化责任从业务代码反转给了框架,而在本文的模式中,在map

圆形缓冲区-MapReduce中的

这篇文章来自一个读者在面试过程中的一个问题,Hadoop在shuffle过程中使用了一个数据结构-环形缓冲区。 环形队列是在实际编程极为有用的数据结构,它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组织简单。能很快知道队列是否满为空。能以很快速度的来存取数据。 因为有简单高效的原因,甚至在硬件都实现了环形队列。 环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用

【硬刚Hadoop】HADOOP MAPREDUCE(11):Join应用

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 Reduce Join Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。 Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在

【硬刚Hadoop】HADOOP MAPREDUCE(10):OutputFormat数据输出

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 OutputFormat接口实现类 2 自定义OutputFormat 3 自定义OutputFormat案例实操 1.需求 过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/o

【硬刚Hadoop】HADOOP MAPREDUCE(9):MapReduce内核源码解析(2)ReduceTask工作机制

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1.ReduceTask工作机制 ReduceTask工作机制,如图4-19所示。 图4-19 ReduceTask工作机制 (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中

【硬刚Hadoop】HADOOP MAPREDUCE(8):MapReduce内核源码解析(1)MapTask工作机制

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 MapTask工作机制 MapTask工作机制如图4-12所示。 图4-12  MapTask工作机制 (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出

【硬刚Hadoop】HADOOP MAPREDUCE(7):Shuffle机制(3)

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 7 Combiner合并 (6)自定义Combiner实现步骤 (a)自定义一个Combiner继承Reducer,重写Reduce方法 public class WordcountCombiner extends Reducer<Text, IntWritable, Text,

【硬刚Hadoop】HADOOP MAPREDUCE(6):Shuffle机制(2)

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 4 WritableComparable排序 1.排序的分类 2.自定义排序WritableComparable (1)原理分析 bean对象做为key传输,需要实现WritableComp

【硬刚Hadoop】HADOOP MAPREDUCE(5):Shuffle机制(1)

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 Shuffle机制 Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。如图4-14所示。 2 Partition分区