让python代码在hadoop上运行

使用Python编写MapRecuce代码的技巧就在于我们使用了Hadoop streaming来帮助我们在map和reduce之间传递数据通过stdin和stdout,我们仅仅使用Python的sys.stdin来输入数据,使用Python的sys.stdout来输出数据,其他的streaming都会帮我们做好。别不信这一点!

首先创建map和reduce处理程序,用python实现。
mapper.py:

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

recucer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

保存,比如我保存在本地的home/hadoop文件夹下面,并且要给这两个文件可执行权限
不用急着在hadoop上验证,可以先在本地验证map和reduce代码的正确性:

# very basic test
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo     1
foo     1
quux    1
labs    1
foo     1
bar     1
quux    1

hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar     1
foo     3
labs    1
quux    2

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
 The     1
 Project 1
 Gutenberg       1
 EBook   1
 of      1
 [...]
 (you get the idea)

如果本地验证通过,说明map和reduce的处理逻辑没有问题。然后可以下载三个文本文档作为输入源:
The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
The Notebooks of Leonardo Da Vinci
Ulysses by James Joyce
然后把这三个文本上传到hdfs文件系统上:
bin/hdfs dfs -copyFromLocal /home/hadoop/gutenberg /user/hadoop/gutenberg
可以用bin/hdfs dfs -ls /user/hadoop/gutenberg来检测是否上传成功:

[hadoop@localhost hadoop-2.7.4]$ bin/hdfs dfs -ls /user/hadoop/gutenberg
Found 3 items
-rw-r--r--   1 hadoop supergroup      21293 2017-09-04 11:41 /user/hadoop/gutenberg/20417
-rw-r--r--   1 hadoop supergroup      23403 2017-09-04 11:41 /user/hadoop/gutenberg/4300
-rw-r--r--   1 hadoop supergroup      22178 2017-09-04 11:41 /user/hadoop/gutenberg/5000
[hadoop@localhost hadoop-2.7.4]$ 

然后要找到自己本地环境中安装hadoop中的stream jar包,我本地的stream包是

/home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar

然后用此streaming的jar文件来运行就可以:

[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar -file /home/hadoop/mapper.py    -mapper /home/hadoop/mapper.py -file /home/hadoop/reducer.py   -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output

或者:

[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar   -mapper /home/hadoop/mapper.py  -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output

可以查看运行结果:

[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar  -mapper /home/hadoop/mapper.py  -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output3
packageJobJar: [/tmp/hadoop-unjar2137675126277149757/] [] /tmp/streamjob3391053950303419829.jar tmpDir=null
17/09/04 14:33:18 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/09/04 14:33:18 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/09/04 14:33:19 INFO mapred.FileInputFormat: Total input paths to process : 3
17/09/04 14:33:19 INFO mapreduce.JobSubmitter: number of splits:3
17/09/04 14:33:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1504425057280_0005
17/09/04 14:33:20 INFO impl.YarnClientImpl: Submitted application application_1504425057280_0005
17/09/04 14:33:20 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1504425057280_0005/
17/09/04 14:33:20 INFO mapreduce.Job: Running job: job_1504425057280_0005
17/09/04 14:33:31 INFO mapreduce.Job: Job job_1504425057280_0005 running in uber mode : false
17/09/04 14:33:31 INFO mapreduce.Job:  map 0% reduce 0%
17/09/04 14:33:51 INFO mapreduce.Job:  map 100% reduce 0%
17/09/04 14:34:00 INFO mapreduce.Job:  map 100% reduce 100%
17/09/04 14:34:00 INFO mapreduce.Job: Job job_1504425057280_0005 completed successfully
17/09/04 14:34:00 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=86853
        FILE: Number of bytes written=748071
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=67169
        HDFS: Number of bytes written=24946
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=1
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=52498
        Total time spent by all reduces in occupied slots (ms)=5713
        Total time spent by all map tasks (ms)=52498
        Total time spent by all reduce tasks (ms)=5713
        Total vcore-milliseconds taken by all map tasks=52498
        Total vcore-milliseconds taken by all reduce tasks=5713
        Total megabyte-milliseconds taken by all map tasks=53757952
        Total megabyte-milliseconds taken by all reduce tasks=5850112
    Map-Reduce Framework
        Map input records=1665
        Map output records=5029
        Map output bytes=76784
        Map output materialized bytes=86865
        Input split bytes=295
        Combine input records=0
        Combine output records=0
        Reduce input groups=998
        Reduce shuffle bytes=86865
        Reduce input records=5029
        Reduce output records=998
        Spilled Records=10058
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=847
        CPU time spent (ms)=3130
        Physical memory (bytes) snapshot=712916992
        Virtual memory (bytes) snapshot=8411308032
        Total committed heap usage (bytes)=444870656
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=66874
    File Output Format Counters 
        Bytes Written=24946
17/09/04 14:34:00 INFO streaming.StreamJob: Output directory: /user/test/gutenberg-output

可以发现运行成功,然后可以查看运行结果:

[hadoop@localhost hadoop-2.7.4]$ bin/hdfs dfs -cat  /user/test/gutenberg-output/part-00000
!   3
""; 6
"-//W3C//DTD    3
"//m.gutenberg.org/ebooks/20417.mobile";    1
"//m.gutenberg.org/ebooks/4300.mobile"; 1
"//m.gutenberg.org/ebooks/5000.mobile"; 1
"/ebooks/suggest/"; 3
"Load   3
"Quixote"</td>  3
"en_US";    6
"http://www.gutenberg.org/ebooks/20417";    1
"http://www.gutenberg.org/ebooks/4300"; 1
"http://www.gutenberg.org/ebooks/5000"; 1
"http://www.w3.org/MarkUp/DTD/xhtml-rdfa-1.dtd">    3
"screen";   3
<Enter>.    3
<enter>">   3
<h>"    3
<s>"    3
...

可以看到结果了。
感谢:数据来源:可以是web日志,比如access.log,可以是爬虫爬来的数据,就要用python来做了。从数据来源中看能不能获取一些有用信息。
参考:
最经典的python在hadoop上使用教程
使用 python 构建基于 hadoop 的 mapreduce 日志分析平台

    原文作者:舒小贱
    原文地址: https://www.jianshu.com/p/229c7ec48110
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞