概览
前言
本教程取材翻译于mrjob v0.5.10 documentation。有删减。最近在学mapreduce, 用到mrjob,在网上没有找到好的中文教程,就自己翻译了一下官方文档的重点。
简介
mrjob是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在mrjob的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码(进行测试)。同时,mrjob可以轻松运行于Amazon Elastic MapReduce。
为了达到简便实用的目的,一些功能被mrjob省去了。如果追求更多的功能,可以尝试Dumbo,Pydoop等package。
安装
使用pip安装。
pip install mrjob
anaconda的使用者推荐使用conda安装。
conda install -c conda-forge mrjob
第一个mrjob程序
在这里我们使用一个统计文本中字符数的程序fc.py作为例子。
from mrjob.job import MRJob
class FrequencyCount(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
FrequencyCount.run()
要运行该程序,只需在命令行中运行
python fc.py testfile.txt
即可。其中testfile.txt可以是任意文本文件。
接下来我们简单地解释这段程序。 mrjob中所有的任务都是通过一个继承MRJob的类来定义的。在这个类中,可以包含mapper,combiner和reducer。这三个函数的参数均是一个(key, value)键值对。在本mapper函数中,键被忽略(写作_),值为文本的每一行line。在reducer中,对mapper生成的每一个键(chars,words,lines)求和,生成的和为对应的值输出。
另一个注意点是最后的if判断,该if判断是必须的。在这个if判断中,mrjob才明确我们的目标(job class)是什么。
MapReduce简介
mapreduce是一种用来在分布式系统上处理海量数据的系统。其基础是MapReduce: Simplified Data Processing on Large Clusters这篇论文。mapreduce将海量数据分成小的数据集,并行地进行相同的任务,最后将所有的子结果整理并合并成最终的结果。其中拆分数据进行相同的步骤称为mapper,后面合并整理的步骤称为reducer。而combiner可以看作是一个优化器,但不是必须的。
编写任务脚本
一步任务
一步任务(one step job)是最简单的mrjob脚本,前文中第一个mrjob程序 fc.py 就是一个一步工作脚本。
要编写一步工作脚本,只需继承(subclass)MRjob类,并覆盖(override)mapper, combiner, reducer 等方法即可。
多步任务
在编写多步任务(Multi step job)时,需要覆盖steps方法,并在step中返回一个由mapper, combiner, reducer等组成的list。
以下是一个多步任务的例子。在这个例子中,输入文件中的最高频词汇将被输出。
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
WORD_RE = re.compile(r"[\w']+")
class MRMostUsedWord(MRJob):
def steps(self):
return[
MRStep(mapper = self.mapper_get_words,
combiner = self.combiner_count_words,
reducer = self.reducer_count_words),
MRStep(reducer = self.reducer_find_max_word)
]
def mapper_get_words(self,_,line):
#yield each word in the line
for word in WORD_RE.findall(line):
yield (word.lower(),1)
def combiner_count_words(self, word, counts):
#optimization: count the words we have seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
#send all (num_occupences, word) pairs to same reducer
#use sum(num_occupences) to get the total num of occupences of each word
yield None, (sum(counts), word)
def reducer_find_max_word(self, _, word_count_pairs):
#none key in this function because in reducer_count_words we discard the key
#each item of word_count_pairs is (count, word), yield one result: the value(word) of max count
yield max(word_count_pairs)
#never forget
if __name__ == '__main__':
MRMostUsedWord.run()
在step方法中共返回4个mapper, combiner 和 reducer。
开始和结束
在任务的开始前和结束后,可以通过特定的方法进行设置:*_init() 和 *_final()方法,前面的 * 可以是mapper, combiner, reducer 任意一种。
命令行语句的使用
第一种用法是在任务前先单独运行一条命令行指令,通过将*_cmd
设置为参数传入MRStep或在MRJob中覆盖同名方法。
另一种用法是用命令行指令过滤(filter)输入文件,方法是在MRStep中加入mapper_pre_filter
或reducer_pre_filter
,或在MRJob类中覆盖同名的方法。mapper_pre_filter='grep "kitty"'
就表示在mapper前,只输入含有kitty的行。
协议
协议(Protocol)主要是关于mrjob中数据的格式。每一个任务都有input protocol , output protocol 和 internal protocol。
每一个协议都有read()
和write()
方法,read()
将原始数据的字节转化为python使用的键值对,write()
将python使用的键值对转化回字节。
input protocol用来将input的字节读入第一个mapper(当不存在mapper时读入第一个reducer),output protocol用来最后输出output,internal protocol是将一步的输出转化成下一步的输入。
以上三种协议在应用中都可以自己设置,同时使用者也可以编写完全不同的全新协议。
运行器Runner
简介
MRJob类可以将任务置于MapReduce框架下运行,而运行器Runner包装并提交任务,在不同的环境下运行任务,并向使用者报告运行结果。
通常情况下,使用者是通过命令行以及设置文件(configuration file)与运行器进行交互的。当使用者通过命令行运行程序时,程序会根据不同的参数--runner
去创建不同的运行器,使任务运行在不同的环境。
使用者一般不需要手动创建运行器,当程序运行时,会自动为任务生成运行器。当然,使用者也可以用my_job.make_runner()
手动创建运行器。
本地环境运行
要在本地运行任务,只需要使用如下代码。
python your_mr_job_sub_class.py <log_file_or_whatever> output
使用者也可以单独运行若干个步骤。
python your_mr_job_sub_class.py --mapper
这行代码只运行了任务中的mapper部分。
Hadoop集群环境运行
首先对Hadoop集群进行设置.
接下来在运行任务时加入-r Hadoop
参数。
python your_mr_job_sub_class.py -r Hadoop input output
EMR环境运行
首先设置aws.
接下来在运行任务时加入-r emr
参数。
python your_mr_job_sub_class.py -r emr input output
Dataproc环境运行
首先谷歌云平台Google Cloud Platform.
接下来在运行任务时加入-r dataproc
参数。
python your_mr_job_sub_class.py -r dataproc input output
手动编写运行器脚本
使用者可以手动编写运行器脚本,并在这个脚本中用make_runner()
运行Runner
来调用其他脚本中的任务。
手动编写运行器也常被用在测试中。
mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
runner.run()
for key, value in mr_job.parse_output(runner.cat_output()):
..# do something with the parsed output
在这段代码中,使用者实例化了一个MRJob
,用make_runner
创建了一个Runner
,用runner.run()
运行任务,并利用cat_output()
将结果转化为一个字节流(bytes stream),最后用parse_output
将字节流进行解析。
在手动编写运行器脚本时,必须格外注意,绝对不能将用来生成运行器的make_runner
和描述任务的类文件(job class)放在一个文件中。
以下是一个错误的示范:
from mrjob.job import MRJob
class MyJob(MRJob):
# (your job)
# no, stop, what are you doing?!?!
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
runner.run()
# ... etc
运行这段代码,将导致类似以下的报错信息出现
UsageError: make_runner() was called with --steps. This probably means you
tried to use it from __main__, which doesn't work.
正确的做法是将你的任务放入一个脚本,将手动编写的运行器放入另一个。以下是对刚才错误示范进行修正的两个对应文件.
# job.py
from mrjob.job import MRJob
class MyJob(MRJob):
# (your job)
if __name__ == '__main__':
MyJob.run()
# run.py
from job import MyJob
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
runner.run()
# ... etc
后记
在官方文档中还包含Spark和emr在mrjob中的使用等内容,因为精力有限,没有写进这篇教程中。我是第一次进行这种翻译教程的工作,因为能力有限,不免有些疏漏。如果有翻译和内容错误之处,欢迎大家指正。
如果用微信手机端的阅读体验不佳,欢迎点击链接,来我的博客阅读。