搭建好环境之后 ,就来跑个简单的Mapreduce试试看吧。这个比第一课难多了,需要多多练习并熟练掌握。
需要编写py脚本以及shell脚本, 所以需要学习Python和Linux的Shell编程。
虽然现在可以照抄老师的代码,但是代码有些方面还不太懂,更不能够理解透彻。所以,需要继续学习python
以下是笔记, bash shell和py都写在同一个文本文件中,仅以空行隔开:
# MapReduce基础 # 海量都能处理, GTP级都能处理,理论上是没有瓶颈的. # 一次性同时处理整个数据集, 数据必须一次全进来, # 批处理方式, 大数据输入,大批数据输出. #之所以可以用多种语言来开发,主要是有Hadoop Streaming的存在 #使用python来MR比较方便. ############################ word count 第一版############################ vim map.py #!/etc/bin/python import sys for line in sys.stdin: ss = line.strip().split(' ') for word in ss: print '\t'.join([word.strip(),"1"]) cat 1.txt | python map.py cat word2 | grep --color -w this # 查找并显示文件中this cat word2 | grep --color -wo this # 只显示this cat word2 | grep --color -wo this | wc -l # 显示this数量 vim reduce.py #!/etc/bin/python import sys cur_word = None sums = 0 for line in sys.stdin: ss = line.strip().split('\t') if len(ss) !=2: continue word = ss[0].strip() cnt = ss[1].strip() if cur_word == None: cur_word = word if cur_word != word: print '\t'.join([cur_word, str(sums)]) cur_word = word sums = 0 sums += int(cnt) print '\t'.join([cur_word, str(sums)]) # 本地测试程序流程, 通过再继续 cat word2 | python map.py | sort -k 1 | python reduce.py
下面把它放到hdfs上试试:
可以先把文本文件上传到hdfs, 如果脚本执行时发生错误:-bash: ./run.sh: /bin/bash^M: bad interpreter: No such file or directory
原因是windows编辑的sh文件,复制到Linux中执行,则格式不对。可用 vim打开,然后 :set ff=unix 再保存退出
####################### 为Hadoop 准备shell脚本 vim run.sh #!/bin/bash #引入 streaming Jar包 STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar" # HDFS上要处理的文件 INPUT_FILE_PATH_1="/word2" OUTPUT_PATH="/output" #先删除原有的输出路径,如果存在,会报错 /usr/local/src/hadoop-2.7.5/bin/hdfs dfs -rm -r $OUTPUT_PATH
# 创建源文件路径
/bigdata/hadoop-2.7.5/bin/hdfs dfs -mkdir /test
# 上传文本文件
/bigdata/hadoop-2.7.5/bin/hdfs dfs -put 1.txt /test
/usr/local/src/hadoop-2.7.5/bin/hadoop jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py" \ -reducer "python reduce.py" \ -jobconf "mapred.reduce.tasks=2" \ -file ./map.py \ -file ./reduce.py # mapred.reduce.tasks=2 指定reduce个数, 默认为1,这个参数只是参考,不是很准. 受到数据源大小以及分片的影响 . # -jobconf mapred.job.name="Digital_log_count" 为job指定名称 # -cacheFile 向计算节点分发HDFS文件 # -cacheArchive 向计算节点分发HDFS压缩文件 ######################### 执行上面的脚本 , 等待结果. hdfs dfs -get /output/part-00000 cat word2 | python map.py | sort -k1 |python reduce.py > local.data #对比一下结果 cat local.data | sort > local.data.1 cat part-00000 | sort > cluster.data md5sum local.data.1 md5sum cluster.data #或者使用cmp命令, 结果是0就表示一致. cmp local.data.1 cluster.data echo $?
另一个版本的,白名单版的mapreduce
############################ word count 第二版只统计白名单 (white_list)############################ vim map.py #!/etc/bin/python import sys def read_local_file_func(f): word_set = set() file_in = open(f, 'r') for line in file_in: word = line.strip() word_set.add(word) return word_set def mapper_func(white_list_fd): word_set = read_local_file_func(white_list_fd) for line in sys.stdin: ss = line.strip().split(' ') for s in ss: word = s.strip() if word != "" and (word in word_set): print "%s\t%s" % (s, 1) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args) vim reduce.py #!/etc/bin/python import sys def reduer_func(): current_word =None count_pool = [] sum = 0 for line in sys.stdin: word, val = line.strip().split('\t') if current_word ==None: current_word = word if current_word != word: for count in count_pool: sum += count print "%s\t%s" % (current_word, sum) current_word = word count_pool = [] sum = 0 count_pool.append(int(val)) for count in count_pool: sum += count print "%s\t%s" % (current_word, str(sum)) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args) vim run.sh #!/bin/bash STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar" INPUT_FILE_PATH_1="/word2" OUTPUT_PATH="/output" #先删除原有的输出路径,如果存在,会报错 /usr/local/src/hadoop-2.7.5/bin/hdfs dfs -rm -r $OUTPUT_PATH #step 1 /usr/local/src/hadoop-2.7.5/bin/hadoop jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func white_list" \ -reducer "python reduce.py reduer_func" \ -jobconf "mapred.reduce.tasks=2" \ -file ./map.py \ -file ./reduce.py \ -file ./white_list #执行本地测试与hdfs测试 ############################ 结束: word count 第二版只统计白名单 (white_list)############################
老师在课堂上提到的2个知识点:
# Streaming 优点: 开发效率高, 程序运行效率高, 便于平台进行资源控制 #Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存资源 # Streaming缺点: 两次数据copy和解析(分割),带来一定的开销 #Streaming不能直接处理二进制数据,如果要处理二进制的数据,比较好的方法是将二进制的key和value进行base64的编码转化成文本 python import base64 base64.b64encode('abcdefg') base64.b64decode('YWJjZGVmZw==') ### jieba 中文分词 # wget --no-check-certificate https://pypi.python.org/packages/71/46/c6f9179f73b818d5827202ad1c4a94e371a29473b7f043b736b4dab6b8cd/jieba-0.39.zip file jieba-0.39.zip unzip jieba-0.39.zip ### 压缩解压命令 gzip 1.txt gunzip 1.txt.gz
两种文件的分发与打包:
############################# 文件分发与打包(-cacheFile) #如果本地配置和数据很大的时候,需要提前上传到HDFS目录上. #如果文件(如字典文件)存放在HDFS中,希望计算时在每个计算节点上将文件当作本地文件处理,可以使用-cacheFile hdfs://host:port/path/to/file#linkname选项在计算节点缓存文件. # Streaming程序通过./linkname访问文件 vim run.sh HADOOP_CMD="/usr/local/src/hadoop-2.7.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar" INPUT_FILE_PATH_1="/The_Man_of_property.txt" OUTPUT_PATH="/output_cachefile_broadcast" $HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH #step 1 $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func ABC" \ -reducer "python red.py reducer_func" \ -jobconf "mapred.reduce.tasks=2" \ -jobconf "mapred.job.name=cachefile_demo" \ -cacheFile "hdfs://master:9000/white_list.txt#ABC" \ -file "./map.py" \ -file "./red.py" \ #ABC是文件别名,代替white_list.txt 另外,map.py和reduce.py可以只用上面第一例的,不必更改, 因为只是文件分发方式变了. # tasks=2的时候,也可能在结果产生空文件, 因为hash到key的时候,分到一个文件里了. # map的输出也就是中间结果启用压缩, 主要是为了减少shuffle过程中的网络传输数据量 -jobconf "mapred.compress.map.output=true" \ -jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ # 输出时压缩, 减少输出结果占用HDFS存储 -jobconf "mapred.output.compress=true" \ -jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ ############################# 文件分发与打包(-cacheArchive) #如果要分发的文件有目录结构,可以先将整个目录打包,上传到HDFS,再用-cacheArchive hdfs://host:port/path/to/archivefile#linkname 分发压缩包 #有2个白名单文件, white_list_1, white_list_2 可以打包到一起 tar zcvf w.tar.gz white_list_* #可以复制到别的目录解压验证 hdfs dfs -put w.tar.gz / vim run.sh HADOOP_CMD="/usr/local/src/hadoop-2.7.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar" INPUT_FILE_PATH_1="/The_Man_of_property.txt" OUTPUT_PATH="/output_cachefile_broadcast" $HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH #step 1 $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func WH.gz" \ -reducer "python red.py reducer_func" \ -jobconf "mapred.reduce.tasks=2" \ -jobconf "mapred.job.name=cachefile_demo" \ -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \ -file "./map.py" \ -file "./red.py" \ vim map.py #!/usr/bin/python import os import sys import gzip def get_file_handler(f): file_in = open(f, 'r') return file_in def get_cachefile_handlers(f): f_handlers_list = [] if os.path.isdir(f): for fd in os.listdir(f): f_handlers_list.append(get_file_handler(f + '/' +fd)) return f_handlers_list def read_local_file_func(f): word_set = set() for cachefile in get_cachefile_handlers(f): for line in cachefile: word = line.strip() word_set.add(word) return word_set def mapper_func(white_list_fd): word_set = read_local_file_func(white_list_fd) for line in sys.stdin: ss = line.strip().split(' ') for s in ss: word = s.strip() if word != "" and (word in word_set): print "%s\t%s" % (s,1) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args) vim reduce.py #!/etc/bin/python import sys def reduer_func(): current_word =None count_pool = [] sum = 0 for line in sys.stdin: word, val = line.strip().split('\t') if current_word ==None: current_word = word if current_word != word: for count in count_pool: sum += count print "%s\t%s" % (current_word, sum) current_word = word count_pool = [] sum = 0 count_pool.append(int(val)) for count in count_pool: sum += count print "%s\t%s" % (current_word, str(sum)) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
老师给的不想开发代码,直接套模板的MR方案:
######################### 不想开发代码,只做单reduce, 如下是配置文件以及2个py脚本。 vim run.sh set -e -x HADOOP_CMD="/usr/local/src/hadoop-2.7.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar" INPUT_FILE_PATH_A="/a.txt" INPUT_FILE_PATH_B="/b.txt" OUTPUT_SORT_PATH="/output_sort" $HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_SORT_PATH #step 3 $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B \ -output $OUTPUT_SORT_PATH \ -mapper "python map_sort.py" \ -reducer "python red_sort.py" \ -file "./map_sort.py" \ -file "./red_sort.py" \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -jobconf stream.num.map.output.key.fields=1 \ -jobconf stream.map.output.field.separator='^I' \ -jobconf map.output.key.field.separator='^I' \ -jobconf mapred.text.key.partitioner.options="-k1,1" \ -jobconf mapred.text.key.comparator.options="-k1,1n" \ -jobconf mapred.reduce.tasks=1 #上面的几行参数缺一个都不行. #下面这2行按第1列的数值去partition, 保证它分到哪个桶,保证它这数据能分到哪个reduce上处理. key和partition是不相等的,效果是按第1列数值去partition,按第2列去排序. # -jobconf stream.num.map.output.key.fields=2 \ # -jobconf num.key.fields.for.partition=1 \ #而下面这一行代替了partition # -jobconf mapred.text.key.partitioner.options="-k1,1" \ #第1列开始,1列结束,也就是将第1列作为partition # -jobconf mapred.text.key.comparator.options="-k1,1n" \ #按第1到1的列排序, n是将其转换为数值型. ### 最少代码的自然数排序,map只是将字符串解析一下。基本没做处理。 vim map_sort.py #!/etc/bin/python import sys for line in sys.stdin: ss = line.strip().split('\t') key = ss[0] val = ss[1] print "%s\t%s" % (key, val) ### reduce读什么出什么,基本没做处理。 vim red_sort.py #!/etc/bin/python import sys for line in sys.stdin: print line.strip()
以及MR join
########################## MapReduce Join 三个map文件, 前2个处理完成,合并到第3个mapreduce中 vim map_a.py #!/usr/bin/python import sys for line in sys.stdin: ss = line.strip().split('^I') key = ss[0] val = ss[1] print "%s\t1\t%s" % (key,val) # 示例数据 a.txt aaa1 123 aaa2 123 aaa3 123 aaa4 123 aaa5 123 aaa6 123 aaa7 123 aaa8 123 aaa9 123 aaa10 123 vim map_b.py #!/usr/bin/python import sys for line in sys.stdin: ss = line.strip().split('^I') key = ss[0] val = ss[1] print "%s\t2\t%s" % (key,val) # 示例数据 b.txt aaa1 hadoop aaa2 hadoop aaa3 hadoop aaa4 hadoop aaa5 hadoop aaa6 hadoop aaa7 hadoop aaa8 hadoop aaa9 hadoop aaa10 hadoop #### reduce_join脚本 这段代码可能有问题。 vim red_join.py #!/usr/bin/env python import sys val_1 = [] for line in sys.stdin: key, flag, val = line.strip().split('\t') if flag =='1': val_1.append(val) elif flag == '2' and val_1 != "" : val_2 = val for v in val_1: print "%s\t%s\t%s" % (key, v, val_2) val_1 = [] # 上述这段代码可能有问题,要求对输入的数据符合格式