本文主要是介绍从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
编写一个气象数据挖掘的MapReduce程序
1. 气象数据在哪里?
NCDC 美国国家气候数据中心
获取数据的方式在www.hadoopbook.com里给出了,是这里 http://hadoopbook.com/code.html
两个示例的数据在这里下载 https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all ,文件名分别是1901,1902
原始记录是许多 小文件,现在已经按照年份被拼接成上面的1901, 1902这两个单独的文件。
2. 最重要的是第5节,不想看长篇大论的,可以直接看第5节就行。
3. 用Hadoop分析数据
3.1 分析一条记录:
0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
记录的含义:
0029029070
99999
19010101 观察日期
0600 观察时间
4+64333 经度
+023450 纬度
F M-12
+0005
99999
V020
270
1
N
0159
1
99999
9
9
N
000000
1
N
9-0078
1
+ 99999
102001ADDGF108991999999999999999999
3.2 源代码文件MaxTemperatureMapper.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{String line = value.toString();String year = line.substring(15,19);int airTemperature;if(line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if(airTemperature != MISSING && quality.matches("[01459]")){output.collect(new Text(year), new IntWritable(airTemperature));}}
}
3.3 源代码MaxTemperatureReducer.java
package com.ifis;import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureReducer extends MapReduceBaseimplements Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{int maxValue = Integer.MIN_VALUE;while(values.hasNext()){maxValue = Math.max(maxValue, values.next().get());}output.collect(key, new IntWritable(maxValue));}
}
3.4 源代码MaxTemperature.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;public class MaxTemperature{public static void main(String[] args) throws IOException{if (args.length != 2){System.err.println("Usage: MaxTemperature <intput path> <output path>");System.exit(-1);}JobConf conf = new JobConf(MaxTemperature.class);conf.setJobName("Max Temperature");FileInputFormat.addInputPath(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));conf.setMapperClass(MaxTemperatureMapper.class);conf.setReducerClass(MaxTemperatureReducer.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);JobClient.runJob(conf);}
}
3.5 编译命令:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
3.6 打jar包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ jar -cvf maxtemp.jar -C ./classes/ .
3.7 用put命令将1901文件放到hdfs。
3.8 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar com.ifis.MaxTemperature 1901 o4
3.9 查看结果:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o4/part-00000
4. 一个新版api的MapReducer。
4.1 说明
这个例子书里没有给出import的类和接口的细节。如果有些类或者接口不知道import,到这里找http://hadoop.apache.org/docs/current/api/
从这里找到类名,然后点击进入,可以看到包的信息。
如果是java的类和接口,在这里找http://www.javaweb.cc/JavaAPI1.6/
注意,这个新的api里,各种类要import org.apache.hadoop.mapreduce包,而不是org.apache.hadoop.mapred包,如Context,Job等等。
4.2 源代码
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;public class NewMaxTemperature{static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")){context.write(new Text(year), new IntWritable(airTemperature));}}}static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int maxValue = Integer.MIN_VALUE;for(IntWritable v : values){maxValue = Math.max(maxValue, v.get());}context.write(key, new IntWritable(maxValue));}}public static void main(String[] args) throws Exception{if (args.length != 2){System.err.println("Usage NewMaxTemerapture <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(NewMaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true)?0:1);}
}
4.3 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes ./src/*.java
4.4 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ jar -cvf newmaxtemp.jar -C ./classes/ .
4.5 将1901和1902两个文件put到hdfs上,分别更名为1901.dat和1902.dat
4.6 执行,从1901.dat和1902.dat这两个文件里找当年的最高温度: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar newmaxtemp.jar com.ifis.NewMaxTemperature 190*.dat o2
4.7 查看结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o2/part-r-00000
5. 将4的例子改造成mapper,reducer分开写的形式。
假设项目的目录是p3,此目录下的目录结构和文件结构如下:
|-- classes
`-- src
|-- MaxTemperature.java
|-- MaxTemperatureMapper.java
`-- MaxTemperatureReducer.java
`-- src
|-- MaxTemperature.java
|-- MaxTemperatureMapper.java
`-- MaxTemperatureReducer.java
5.1 源代码MaxTemperatureMapper.java
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")){context.write(new Text(year), new IntWritable(airTemperature));}}
}
5.2 源代码MaxTemperatureReducer.java
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int maxValue = Integer.MIN_VALUE;for(IntWritable v : values){maxValue = Math.max(maxValue, v.get());}context.write(key, new IntWritable(maxValue));}
}
5.3 源代码MaxTemperature.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;public class MaxTemperature{public static void main(String[] args) throws Exception{if (args.length != 2){System.err.println("Usage NewMaxTemerapture <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true)?0:1);}
}
5.4 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
5.5 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ jar -cvf maxtemp.jar -C ./classes/ .
5.6 要处理的数据文件1901.dat和1902.dat在上一步已经put到hdfs,这次不需要再做这个动作了。
5.7 执行: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar com.ifis.MaxTemperature 190*.dat o3
5.8 查看执行结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o3/part-r-00000
6. Combiner可以提高map阶段的效率。注意,求最大值和最小值可以通过combiner提升效率,但求平均值就不行了。
7. Hadoop的streaming,通过unix标准流的方式,允许用其他语言写MapReduce程序。具体细节在这里不讲了,知道有这个功能就行。我们主要关注java语言。
8. 请熟悉第5节的源代码形式,熟悉到可以手工默写出来,这是MapReducer的主要形式,对本文来说,能掌握这个就足够了。
这篇关于从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!