从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序

本文主要是介绍从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

编写一个气象数据挖掘的MapReduce程序

1. 气象数据在哪里?
NCDC  美国国家气候数据中心
获取数据的方式在www.hadoopbook.com里给出了,是这里 http://hadoopbook.com/code.html
两个示例的数据在这里下载 https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all  ,文件名分别是1901,1902
原始记录是许多 小文件,现在已经按照年份被拼接成上面的1901, 1902这两个单独的文件。

2. 最重要的是第5节,不想看长篇大论的,可以直接看第5节就行。

3. 用Hadoop分析数据
3.1 分析一条记录:
0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
记录的含义:
0029029070
99999
19010101  观察日期
0600  观察时间
4+64333  经度
+023450 纬度
F M-12
    +0005
    99999
    V020
    270
    1
    N
    0159
    1
    99999
    9
    9
    N
    000000
    1
    N
    9-0078
   1
    + 99999
    102001ADDGF108991999999999999999999

    3.2 源代码文件MaxTemperatureMapper.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{String line = value.toString();String year = line.substring(15,19);int airTemperature;if(line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if(airTemperature != MISSING && quality.matches("[01459]")){output.collect(new Text(year), new IntWritable(airTemperature));}}
}

    3.3 源代码MaxTemperatureReducer.java
package com.ifis;import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureReducer extends MapReduceBaseimplements Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{int maxValue = Integer.MIN_VALUE;while(values.hasNext()){maxValue = Math.max(maxValue, values.next().get());}output.collect(key, new IntWritable(maxValue));}
}

    3.4 源代码MaxTemperature.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;public class MaxTemperature{public static void main(String[] args) throws IOException{if (args.length != 2){System.err.println("Usage: MaxTemperature <intput path> <output path>");System.exit(-1);}JobConf conf = new JobConf(MaxTemperature.class);conf.setJobName("Max Temperature");FileInputFormat.addInputPath(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));conf.setMapperClass(MaxTemperatureMapper.class);conf.setReducerClass(MaxTemperatureReducer.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);JobClient.runJob(conf);}
}

    3.5 编译命令:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar  -d ./classes/ ./src/*.java
    3.6 打jar包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ jar -cvf maxtemp.jar -C ./classes/ .
    3.7 用put命令将1901文件放到hdfs。
    3.8 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 1901 o4
    3.9 查看结果:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o4/part-00000

4. 一个新版api的MapReducer。
    4.1 说明
    这个例子书里没有给出import的类和接口的细节。如果有些类或者接口不知道import,到这里找http://hadoop.apache.org/docs/current/api/
    从这里找到类名,然后点击进入,可以看到包的信息。
    如果是java的类和接口,在这里找http://www.javaweb.cc/JavaAPI1.6/
    注意,这个新的api里,各种类要import org.apache.hadoop.mapreduce包,而不是org.apache.hadoop.mapred包,如Context,Job等等。

    4.2 源代码
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;public class NewMaxTemperature{static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")){context.write(new Text(year), new IntWritable(airTemperature));}}}static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int maxValue = Integer.MIN_VALUE;for(IntWritable v : values){maxValue = Math.max(maxValue, v.get());}context.write(key, new IntWritable(maxValue));}}public static void main(String[] args) throws Exception{if (args.length != 2){System.err.println("Usage NewMaxTemerapture <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(NewMaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true)?0:1);}
}

4.3 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes ./src/*.java
4.4 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ jar -cvf newmaxtemp.jar -C ./classes/ .
4.5 将1901和1902两个文件put到hdfs上,分别更名为1901.dat和1902.dat
4.6 执行,从1901.dat和1902.dat这两个文件里找当年的最高温度: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar newmaxtemp.jar com.ifis.NewMaxTemperature 190*.dat o2
4.7 查看结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o2/part-r-00000

5. 将4的例子改造成mapper,reducer分开写的形式。
假设项目的目录是p3,此目录下的目录结构和文件结构如下:
|-- classes
    `-- src
        |-- MaxTemperature.java
        |-- MaxTemperatureMapper.java
        `-- MaxTemperatureReducer.java

5.1 源代码MaxTemperatureMapper.java
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")){context.write(new Text(year), new IntWritable(airTemperature));}}
}


5.2 源代码MaxTemperatureReducer.java
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int maxValue = Integer.MIN_VALUE;for(IntWritable v : values){maxValue = Math.max(maxValue, v.get());}context.write(key, new IntWritable(maxValue));}
}

5.3 源代码MaxTemperature.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;public class MaxTemperature{public static void main(String[] args) throws Exception{if (args.length != 2){System.err.println("Usage NewMaxTemerapture <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true)?0:1);}
}

5.4 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
5.5 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ jar -cvf maxtemp.jar -C ./classes/ . 
5.6 要处理的数据文件1901.dat和1902.dat在上一步已经put到hdfs,这次不需要再做这个动作了。
5.7 执行: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 190*.dat o3
5.8 查看执行结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o3/part-r-00000 

6. Combiner可以提高map阶段的效率。注意,求最大值和最小值可以通过combiner提升效率,但求平均值就不行了。

7. Hadoop的streaming,通过unix标准流的方式,允许用其他语言写MapReduce程序。具体细节在这里不讲了,知道有这个功能就行。我们主要关注java语言。

8. 请熟悉第5节的源代码形式,熟悉到可以手工默写出来,这是MapReducer的主要形式,对本文来说,能掌握这个就足够了。

这篇关于从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1050870

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

Springboot处理跨域的实现方式(附Demo)

《Springboot处理跨域的实现方式(附Demo)》:本文主要介绍Springboot处理跨域的实现方式(附Demo),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录Springboot处理跨域的方式1. 基本知识2. @CrossOrigin3. 全局跨域设置4.

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

python+opencv处理颜色之将目标颜色转换实例代码

《python+opencv处理颜色之将目标颜色转换实例代码》OpenCV是一个的跨平台计算机视觉库,可以运行在Linux、Windows和MacOS操作系统上,:本文主要介绍python+ope... 目录下面是代码+ 效果 + 解释转HSV: 关于颜色总是要转HSV的掩膜再标注总结 目标:将红色的部分滤

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Python实现自动化接收与处理手机验证码

《Python实现自动化接收与处理手机验证码》在移动互联网时代,短信验证码已成为身份验证、账号注册等环节的重要安全手段,本文将介绍如何利用Python实现验证码的自动接收,识别与转发,需要的可以参考下... 目录引言一、准备工作1.1 硬件与软件需求1.2 环境配置二、核心功能实现2.1 短信监听与获取2.