map.py
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
#从行中截取出有效的部分,year是年份, temp是温度, q不知道
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
#排除一些无效数据
if (temp != "+9999" and re.match("[01459]", q)):
#输出的格式是 年份\t温度
print "%s\t%s" % (year, temp)
MapReduce框架
会进行一个sort操作
reduce.py
#!/usr/bin/env python
import sys
#先初始一个元组
(last_key, max_val) = (None, 0)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
#遇到了新的key,而且上一个key不是None
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val) #按这种形式输出
(last_key, max_val) = (key, int(val)) #last=current
else:
#如果没有遇到新的 就继续维持一个最大的温度
(last_key, max_val) = (key, max(max_val, int(val)))
#输出最后一个年份的情况
if last_key:
print "%s\t%s" % (last_key, max_val)
按照Linux管道的形式运行:
cat input/ncdc/sample.txt | src/main/ch02/python/max_temperature_map.py | \
sort | src/main/ch02/python/max_temperature_reduce.py
1949 111
1950 22
Hadoop命令
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-
streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/ruby/max_temperature_map.rb | sort |
ch02/ruby/max_temperature_reduce.rb" \
-reducer src/main/ch02/ruby/max_temperature_reduce.rb \
-file src/main/ch02/ruby/max_temperature_map.rb \
-file src/main/ch02/ruby/max_temperature_reduce.rb