MapReduce项目之气温统计

2024-02-02 09:10
文章标签 统计 项目 mapreduce 气温

本文主要是介绍MapReduce项目之气温统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这一批博文是博主由博客园搬移过来的,所以时间上可能存在混乱,希望大家见谅!


  在本博文中,我们要学习一个挖掘气象数据的程序。气象数据是通过分布在美国全国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来统计分析。

  我们使用的数据来自美国国家气候数据中心、美国国家海洋和大气管理局(简称 NCDCNOAA),这些数据按行并以 ASCII 格式存储,其中每一行是一条记录。 下面我们展示一行采样数据,其中重要的字段被突出显示。该行数据被分割成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。

 

  MapReduce 任务过程分为两个处理阶段:map 阶段和reduce阶段 。每个阶段都以键值对作为输入和输出,其类型由程序员自己来选择。程序员还需要写两个函数:map 函数和reduce 函数。在这里, map 阶段的输入是 NCDC NOAA 原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。我们的 map 函数很简单。由于我们只对气象站和气温感兴趣,所以只需要取出这两个字段数据。在本博文中,map 函数只是一个数据准备阶段, 通过这种方式来准备数据,使reducer 函数能够继续对它进行处理:即统计出每个气象站 30 年来的平均气温。map 函数还是一个比较合适去除已损记录的地方,在 map 函数里面,我们可以筛掉缺失的或者错误的气温数据。

  我们明白 MapReduce 程序的工作原理之后,下一步就是写代码实现它。我们需要编写三块代码内容:一个 map 函数、一个 reduce 函数和一些用来运行作业的代码。map 函数由 Mapper 类实现来表示,Mapper 声明一个 map() 虚方法,其内容由我们自己来实现。
  下面我们来编写 Mapper 类,实现 map() 方法,提取气象站和气温数据。

package com.hadoop.test;import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;/*** *@author 努力的凹凸曼* 统计美国每个气象站30年来的平均气温* 1.编写map()函数* 2.编写reduce()函数* 3.编写run()执行方法,负责运行MapReduce作业* 4.在main()方法中运行程序**/
public class Temperature extends Configured implements Tool{//定义TemperatureMapper继承自Mapper类,并在其中实现map()函数;//Mapper<>接口的数据类型为:Mapper<输入可有值相当于Java的int类型,输入value值相当于Java的String类型,输出key值,输出value值>public static class TemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{//Context实例用于键值对的输出//第一步:将每行气象站数据转换为String类型String line = value.toString();//第二步:提取气温值int temperature = Integer.parseInt(line.substring(14, 19).trim());if (temperature != -9999) {//第三步:获取气象站编号//通过context实例获取数据分片FileSplit fileSplit = (FileSplit) context.getInputSplit();//然后获取气象站编号String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//第四步:输出数据context.write(new Text(weatherStationId), new IntWritable(temperature));}}}public static class TemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{//统计气象站的所有气温值int sum = 0;int count = 0;for(IntWritable val:values) {//对所有气温值累加sum += val.get();//统计集合大小count++;}//第二步:求同一个气象站的平均气温result.set(sum/count);//第三步:输出数据
            context.write(key, result);}}public static void main(String[] args) throws Exception {// TODO Auto-generated method stub//输入输出路径String[] args0 = {args[0], args[1]};//该写法用于在Hadoop集群上进行测试,如果在Eclipse上进行测试请采用下一行的写法//{"hdfs://Centpy:9000/weather/", "hdfs://Centpy:9000/weather/output"};//执行int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);System.exit(ec);}@Overridepublic int run(String[] arg0) throws Exception{// TODO Auto-generated method stub//第一步:读取配置文件Configuration conf = new Configuration();//第二步:如果输出路径已经存在,则删除Path mypath =new Path(arg0[1]);FileSystem hdfs = mypath.getFileSystem(conf);if (hdfs.isDirectory(mypath)) {hdfs.delete(mypath, true);}//第三步:构建Job对象Job job = new Job(conf, "temperature");job.setJarByClass(Temperature.class);//第四步:指定数据的输入输出路径FileInputFormat.addInputPath(job, new Path(arg0[0]));FileOutputFormat.setOutputPath(job, new Path(arg0[1]));//第五步:指定Mapper和Reducerjob.setMapperClass(TemperatureMapper.class);job.setReducerClass(TemperatureReducer.class);//第六步:设置map()函数和reducer()函数输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//第七步:提交作业job.waitForCompletion(true);return 0;}}

  写完之后我们需要将其导出为JAR包。

  然后上传到Hadoop集群上Hadoop的安装路径下(可用cd $HADOOP_HOME命令快速进入目录),然后使用rz命令上传,如果没装rz命令请先运行命令:

  
yum -y install lrzsz

  然后找到刚才导出的JAR包确认上传。

  最后,执行运行命令:

  hadoop jar Temperature.jar com.hadoop.test.Temperature /weather /weatherOutput //  第一个jar表示运行的对象是jar文件
                                          //  .jar文件为我们要运行的jar文件
                                          //  com.hadoop.test.Temperature为类的路径,注意请写全
                                          //  /weather表示我们的输入文件,对应代码中的args[0]
                                          //  /weatherOutput表示我的输出文件,对应代码中的args[1],注意必须是不存在的路径

  运行结果如下:

 

 以上就是博主为大家介绍的这一板块的主要内容,这都是博主自己的学习过程,希望能给大家带来一定的指导作用,有用的还望大家点个支持,如果对你没用也望包涵,有错误烦请指出。如有期待可关注博主以第一时间获取更新哦,谢谢!

 

 版权声明:本文为博主原创文章,未经博主允许不得转载。

 

 


这篇关于MapReduce项目之气温统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/670206

相关文章

javafx 如何将项目打包为 Windows 的可执行文件exe

《javafx如何将项目打包为Windows的可执行文件exe》文章介绍了三种将JavaFX项目打包为.exe文件的方法:方法1使用jpackage(适用于JDK14及以上版本),方法2使用La... 目录方法 1:使用 jpackage(适用于 JDK 14 及更高版本)方法 2:使用 Launch4j(

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke

SpringBoot项目引入token设置方式

《SpringBoot项目引入token设置方式》本文详细介绍了JWT(JSONWebToken)的基本概念、结构、应用场景以及工作原理,通过动手实践,展示了如何在SpringBoot项目中实现JWT... 目录一. 先了解熟悉JWT(jsON Web Token)1. JSON Web Token是什么鬼

手把手教你idea中创建一个javaweb(webapp)项目详细图文教程

《手把手教你idea中创建一个javaweb(webapp)项目详细图文教程》:本文主要介绍如何使用IntelliJIDEA创建一个Maven项目,并配置Tomcat服务器进行运行,过程包括创建... 1.启动idea2.创建项目模板点击项目-新建项目-选择maven,显示如下页面输入项目名称,选择

Jenkins中自动化部署Spring Boot项目的全过程

《Jenkins中自动化部署SpringBoot项目的全过程》:本文主要介绍如何使用Jenkins从Git仓库拉取SpringBoot项目并进行自动化部署,通过配置Jenkins任务,实现项目的... 目录准备工作启动 Jenkins配置 Jenkins创建及配置任务源码管理构建触发器构建构建后操作构建任务

opencv实现像素统计的示例代码

《opencv实现像素统计的示例代码》本文介绍了OpenCV中统计图像像素信息的常用方法和函数,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 统计像素值的基本信息2. 统计像素值的直方图3. 统计像素值的总和4. 统计非零像素的数量

如何使用 Bash 脚本中的time命令来统计命令执行时间(中英双语)

《如何使用Bash脚本中的time命令来统计命令执行时间(中英双语)》本文介绍了如何在Bash脚本中使用`time`命令来测量命令执行时间,包括`real`、`user`和`sys`三个时间指标,... 使用 Bash 脚本中的 time 命令来统计命令执行时间在日常的开发和运维过程中,性能监控和优化是不

Nginx、Tomcat等项目部署问题以及解决流程

《Nginx、Tomcat等项目部署问题以及解决流程》本文总结了项目部署中常见的four类问题及其解决方法:Nginx未按预期显示结果、端口未开启、日志分析的重要性以及开发环境与生产环境运行结果不一致... 目录前言1. Nginx部署后未按预期显示结果1.1 查看Nginx的启动情况1.2 解决启动失败的

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p