1.下载气象数据集
气象数据集下载地址为:
- ftp://ftp.ncdc.noaa.gov/pub/data/noaa
我们下载国内的气象数据,使用下面命令进行下载
wget -D --accept-regex=REGEX -P data -r -c ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*
国内气象站ID区间为50001-59998
详细的可以在《1951—2007年中国地面气候资料日值数据集台站信息》中查看,不过应该不全。另外《StationIDs_Global_1509》中提供了世界各国气象站编号范围。
2.解压数据集,并保存在文本文件中
7月23号下载的,数据量为79w行,大小为182MB。所以即使年底也不过200w行。
[grid@tiny01 ~]$ zcat data/ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*.gz > data.txt
在这里>表示输出重定向符
我们查看气象数据集:
0169501360999992017010100004+52130+122520FM-12+043399999V0202201N0010102600199003700199-03271-03631102641ADDAA124160092AJ199999999999999GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL +99999+99999GF108991081999026001999999MA1999999096571MD1210041+0301REMSYN004BUFR
对数据格式进行解释
1-4 0169
5-10 501360 # USAF weather station identifier
11-15 99999 # WBAN weather station identifier
16-23 20170101 # 记录日期
24-27 0000 # 记录时间
28 4
29-34 +52130 # 纬度(1000倍)
35-41 +122520 # 经度(1000倍)
42-46 FM-12
47-51 +0433 # 海拔(米)
52-56 99999
57-60 V020
61-63 220 # 风向
64 1 # 质量代码
65 N
66-69 0010
70 1
71-75 02600 # 云高(米)
76 1
77 9
78 9
79-84 003700 # 能见距离(米)
85 1
86 9
87 9
88-92 -0327 # 空气温度(摄氏度*10)
93 1
94-98 -0363 # 露点温度(摄氏度*10)
99 1
100-104 10264 # 大气压力
105 1
其中第5-10位表示气象站编号:501360(取前五位),查表可得对应的是黑龙江漠河。我们主要分析的是月份:16-21位和空气温度:88-92位的极值关系。
3. 编写MapReduce程序
Mapper程序
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String data = line.substring(15, 21);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(data), new IntWritable(airTemperature));
}
}
}
Reducer程序
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
M-R job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
public class MaxTemperature extends Configured implements Tool {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
conf.set("mapred.jar", "MaxTemperature.jar");
Job job = Job.getInstance(conf);
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
return 0;
}
}
注意设置conf.set("mapred.jar", "MaxTemperature.jar");
第二个参数为即将打成的jar包的名称
4.编译java文件,打成jar包
此编译命令为:
[grid@tiny01 myclass]$ javac -classpath $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.2.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar:$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar *.java
[grid@tiny01 myclass]$ jar cvf MaxTemperature.jar *.class
[grid@tiny01 myclass]$ ll
total 28
-rw-rw-r--. 1 grid grid 1413 Jul 3 16:45 MaxTemperature.class
-rw-rw-r--. 1 grid grid 3085 Jul 9 19:04 MaxTemperature.jar
-rw-rw-r--. 1 grid grid 949 Jun 30 15:49 MaxTemperature.java
-rw-rw-r--. 1 grid grid 1876 Jul 3 16:45 MaxTemperatureMapper.class
-rw-rw-r--. 1 grid grid 953 Jun 30 15:37 MaxTemperatureMapper.java
-rw-rw-r--. 1 grid grid 1687 Jul 3 16:45 MaxTemperatureReducer.class
-rw-rw-r--. 1 grid grid 553 Jun 30 15:47 MaxTemperatureReducer.java
这里的classpath和之前的hadoop版本有所区别,需要按照新的设置方法,这一点网上很少提及!(注意Hadoop不同版本,包不一样)
5.将数据上传至hdfs上
[grid@tiny01 ~]$ hadoop fs -put data.txt /data.txt
6. 运行该程序
[grid@tiny01 ~]$ hadoop jar MaxTemperature.jar MaxTemperature /data.txt /out
17/07/24 00:13:20 INFO client.RMProxy: Connecting to ResourceManager at tiny01/192.168.1.101:8032
17/07/24 00:13:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/07/24 00:13:22 INFO input.FileInputFormat: Total input paths to process : 1
17/07/24 00:13:23 INFO mapreduce.JobSubmitter: number of splits:2
17/07/24 00:13:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1500807860144_0002
17/07/24 00:13:24 INFO impl.YarnClientImpl: Submitted application application_1500807860144_0002
17/07/24 00:13:24 INFO mapreduce.Job: The url to track the job: http://tiny01:8088/proxy/application_1500807860144_0002/
17/07/24 00:13:24 INFO mapreduce.Job: Running job: job_1500807860144_0002
17/07/24 00:13:44 INFO mapreduce.Job: Job job_1500807860144_0002 running in uber mode : false
17/07/24 00:13:44 INFO mapreduce.Job: map 0% reduce 0%
17/07/24 00:14:49 INFO mapreduce.Job: map 20% reduce 0%
17/07/24 00:14:52 INFO mapreduce.Job: map 33% reduce 0%
17/07/24 00:14:55 INFO mapreduce.Job: map 50% reduce 0%
17/07/24 00:16:02 INFO mapreduce.Job: map 51% reduce 0%
17/07/24 00:16:05 INFO mapreduce.Job: map 54% reduce 0%
17/07/24 00:16:08 INFO mapreduce.Job: map 57% reduce 0%
17/07/24 00:16:11 INFO mapreduce.Job: map 60% reduce 0%
17/07/24 00:16:14 INFO mapreduce.Job: map 62% reduce 0%
17/07/24 00:16:20 INFO mapreduce.Job: map 65% reduce 0%
17/07/24 00:16:40 INFO mapreduce.Job: map 69% reduce 0%
17/07/24 00:16:42 INFO mapreduce.Job: map 73% reduce 0%
17/07/24 00:16:44 INFO mapreduce.Job: map 83% reduce 0%
17/07/24 00:16:46 INFO mapreduce.Job: map 100% reduce 0%
17/07/24 00:17:22 INFO mapreduce.Job: map 100% reduce 100%
17/07/24 00:17:30 INFO mapreduce.Job: Job job_1500807860144_0002 completed successfully
17/07/24 00:17:32 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=10226664
FILE: Number of bytes written=20805407
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=190690631
HDFS: Number of bytes written=77
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=1
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=383699
Total time spent by all reduces in occupied slots (ms)=143422
Total time spent by all map tasks (ms)=383699
Total time spent by all reduce tasks (ms)=143422
Total vcore-milliseconds taken by all map tasks=383699
Total vcore-milliseconds taken by all reduce tasks=143422
Total megabyte-milliseconds taken by all map tasks=392907776
Total megabyte-milliseconds taken by all reduce tasks=146864128
Map-Reduce Framework
Map input records=789998
Map output records=786666
Map output bytes=8653326
Map output materialized bytes=10226670
Input split bytes=184
Combine input records=0
Combine output records=0
Reduce input groups=7
Reduce shuffle bytes=10226670
Reduce input records=786666
Reduce output records=7
Spilled Records=1573332
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=2436
CPU time spent (ms)=8470
Physical memory (bytes) snapshot=415924224
Virtual memory (bytes) snapshot=6170849280
Total committed heap usage (bytes)=267198464
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=190690447
File Output Format Counters
Bytes Written=77
查看结果
[grid@tiny01 ~]$ hadoop fs -cat /out/part-r-00000
201701 307
201702 350
201703 375
201704 399
201705 426
201706 444
201707 485
由于这里的气温是摄氏度的10倍,所以看起来很大。
我们来检查一下:
[grid@tiny01 ~]$ hadoop fs -copyToLocal /out/part-r-00000 result.txt
[grid@tiny01 ~]$ awk '{print $1".{66}+0"$2"1"}' result.txt |xargs -i grep --color=auto {} sample.txt | awk -v FS="" '{print substr($0,5,5),substr($0,16,6),substr($0,88,6)}'
59158 201704 +03071
59997 201701 +03071
56966 201702 +03501
56966 201703 +03751
56966 201704 +03991
51573 201705 +04261
51573 201706 +04441
51573 201706 +04441
51573 201707 +04851
正则表达式不会写,就将就着看吧,第一条是因为正则表达式匹配的问题,因此这条数据不算。但是其他条都吻合,我们可以看看这几个气象站:
- 51573:新疆吐鲁番
- 56966:云南元江
- 59997:没找到
测试成功!