上一篇文章:
MongoDB指南—16、聚合下一篇文章:
MongoDB指南—18、聚合命令
MapReduce是聚合工具中的明星,它非常强大、非常灵活。有些问题过于复杂,无法使用聚合框架的查询语言来表达,这时可以使用MapReduce。MapReduce使用JavaScript作为“查询语言”,因此它能够表达任意复杂的逻辑。然而,这种强大是有代价的:MapReduce非常慢,不应该用在实时的数据分析中。
MapReduce能够在多台服务器之间并行执行。它会将一个大问题分割为多个小问题,将各个小问题发送到不同的机器上,每台机器只负责完成一部分工作。所有机器都完成时,再将这些零碎的解决方案合并为一个完整的解决方案。
MapReduce需要几个步骤。最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么“无作为”,要么“产生一些键和X个值”。然后就是中间环节,称作洗牌(shuffle),按照键分组,并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值也就是最终结果。
下面会多举几个MapReduce的例子,这个工具非常强大,但也有点复杂。
示例1:找出集合中的所有键
用MapReduce来解决这个问题有点大材小用,不过还是一种了解其机制的不错的方式。要是已经知道MapReduce的原理,则直接跳到本节最后,看看MongoDB中MapReduce的使用注意事项。
MongoDB会假设你的模式是动态的,所以并不跟踪记录每个文档中的键。通常找到集合中所有文档所有键的最好方式就是用MapReduce。在本例中,会记录每个键出现了多少次。内嵌文档中的键就不计算了,但给map函数做个简单修改就能实现这个功能了。
在映射环节,我们希望得到集合中每个文档的所有键。map函数使用特别的emit函数“返回”要处理的值。emit会给MapReduce一个键(类似于前面$group所使用的键)和一个值。这里用emit将文档某个键的计数(count)返回({count : 1})。我们想为每个键单独计数,所以为文档中的每个键调用一次emit。this就是当前映射文档的引用:
> map = function() {
... for (var key in this) {
... emit(key, {count : 1});
... }};
这样就有了许许多多{count : 1}文档,每一个都与集合中的一个键相关。这种由一个或多个{count : 1}文档组成的数组,会传递给reduce函数。reduce函数有两个参数,一个是key,也就是emit返回的第一个值,还有另外一个数组,由一个或者多个与键对应的{count : 1}文档组成。
> reduce = function(key, emits) {
... total = 0;
... for (var i in emits) {
... total += emits[i].count;
... }
... return {"count" : total};
... }
reduce一定要能够在之前的map阶段或者前一个reduce阶段的结果上反复执行。所以reduce返回的文档必须能作为reduce的第二个参数的一个元素。例如,x键映射到了3个文档{count : 1,id : 1}、{count : 1,id : 2}和{count : 1,id : 3},其中id键只用于区分不同的文档。MongoDB可能会这样调用reduce:
> r1 = reduce("x", [{count : 1, id : 1}, {count : 1, id : 2}])
{count : 2}
> r2 = reduce("x", [{count : 1, id : 3}])
{count : 1}
> reduce("x", [r1, r2])
{count : 3}
不能认为第二个参数总是初始文档之一(比如{count:1})或者长度固定。reduce应该能处理emit文档和其他reduce返回结果的各种组合。
总之,MapReduce函数可能会是下面这样:
> mr = db.runCommand({"mapreduce" : "foo", "map" : map, "reduce" : reduce})
{
"result" : "tmp.mr.mapreduce_1266787811_1",
"timeMillis" : 12,
"counts" : {
"input" : 6
"emit" : 14
"output" : 5
},
"ok" : true
}
MapReduce返回的文档包含很多与操作有关的元信息。
- “result” : “tmp.mr.mapreduce_1266787811_1”
这是存放MapReduce结果的集合名。这是个临时集合,MapReduce的连接关闭后它就被自动删除了。本章稍后会介绍如何指定一个好一点的名字以及将结果集合持久化。
- “timeMillis” : 12
操作花费的时间,单位是毫秒。
- “counts” : { … }
这个内嵌文档主要用作调试,其中包含3个键。
- “input” : 6
发送到map函数的文档个数。
- “emit” : 14
在map函数中emit被调用的次数。
- “output” : 5
结果集合中的文档数量。
对结果集合进行查询会发现原有集合的所有键及其计数:
···
db[mr.result].find()
{ “_id” : “_id”, “value” : { “count” : 6 } }
{ “_id” : “a”, “value” : { “count” : 4 } }
{ “_id” : “b”, “value” : { “count” : 2 } }
{ “_id” : “x”, “value” : { “count” : 1 } }
{ “_id” : “y”, “value” : { “count” : 1 } }
···
这个结果集中的每个”_id”对应原集合中的一个键,”value”键的值就是reduce的最终结果。
示例2:网页分类
假设有个网站,人们可以提交其他网页的链接,比如reddit(http://www.reddit.com)。提交者可以给这个链接添加标签,表明主题,比如politics、geek或者icanhascheezburger。可以用MapReduce找出哪个主题最为热门,热门与否由最近的投票决定。
首先,建立一个map函数,发出(emit)标签和一个基于流行度和新旧程度的值。
map = function() {
for (var i in this.tags) {
var recency = 1/(new Date() - this.date);
var score = recency * this.score;
emit(this.tags[i], {"urls" : [this.url], "score" : score});
}
};
现在就化简同一个标签的所有值,以得到这个标签的分数:
reduce = function(key, emits) {
var total = {urls : [], score : 0}
for (var i in emits) {
emits[i].urls.forEach(function(url) {
total.urls.push(url);
}
total.score += emits[i].score;
}
return total;
};
最终的集合包含每个标签的URL列表和表示该标签流行程度的分数。
MongoDB和MapReduce
前面两个例子只用到了mapreduce、map和reduce键。这3个键是必需的,但是MapReduce命令还有很多可选的键。
- “finalize” : function
可以将reduce的结果发送给这个键,这是整个处理过程的最后一步。
- “keeptemp” : boolean
如果为值为true,那么在连接关闭时会将临时结果集合保存下来,否则不保存。
- “out” : string
输出集合的名称。如果设置了这选项,系统会自动设置keeptemp : true。
- “query” : document
在发往map函数前,先用指定条件过滤文档。
- “sort” : document
在发往map前先给文档排序(与limit一同使用非常有用)。
- “limit” : integer
发往map函数的文档数量的上限。
- “scope” : document
可以在JavaScript代码中使用的变量。
- “verbose” : boolean
是否记录详细的服务器日志。
1. finalize函数
和group命令一样,MapReduce也可以使用finalize函数作为参数。它会在最后一个reduce输出结果后执行,然后将结果存到临时集合中。
返回体积比较大的结果集对MapReduce不是什么大不了的事情,因为它不像group那样有4 MB的限制。然而,信息总是要传递出去的,通常来说,finalize是计算平均数、裁剪数组、清除多余信息的好时机。
2. 保存结果集合
默认情况下,Mongo会在执行MapReduce时创建一个临时集合,集合名是系统选的一个不太常用的名字,将”mr”、执行MapReduce的集合名、时间戳以及数据库作业ID,用“.”连成一个字符串,这就是临时集合的名字。结果产生形如mr.stuff.18234210220.2这样的名字。MongoDB会在调用的连接关闭时自动销毁这个集合(也可以在用完之后手动删除)。如果希望保存这个集合,就要将keeptemp选项指定为true。
如果要经常使用这个临时集合,你可能想给它起个好点的名字。利用out选项(该选项接受字符串作为参数)就可以为临时集合指定一个易读易懂的名字。如果用了out选项,就不必指定keeptemp : true了,因为指定out选项时系统会将keeptemp设置为true。即便你取了一个非常好的名字,MongoDB也会在MapReduce的中间过程使用自动生成的集合名。处理完成后,会自动将临时集合的名字更改为你指定的集合名,这个重命名的过程是原子性的。也就是说,如果多次对同一个集合调用MapReduce,也不会在操作中遇到集合不完整的情况。
MapReduce产生的集合就是一个普通的集合,在这个集合上执行MapReduce完全没有问题,或者在前一个MapReduce的结果上执行MapReduce也没有问题,如此往复直到无穷都没问题!
3. 对文档子集执行MapReduce
有时需要对集合的一部分执行MapReduce。只需在传给map函数前使用查询对文档进行过滤就好了。
每个传递给map函数的文档都要先反序列化,从BSON对象转换为JavaScript对象,这个过程非常耗时。如果事先知道只需要对集合的一部分文档执行MapReduce,那么在map之前先对文档进行过滤可以极大地提高map速度。可以通过”query”、”limit”和”sort”等键对文档进行过滤。
“query”键的值是一个查询文档。通常查询返回的结果会传递给map函数。例如,有一个做跟踪分析的应用程序,现在我们需要上周的总结摘要,只要使用如下命令对上周的文档执行MapReduce就好了:
> db.runCommand({"mapreduce" : "analytics", "map" : map, "reduce" : reduce,
"query" : {"date" : {"$gt" : week_ago}}})
sort选项和limit一起使用时通常能够发挥非常大的作用。limit也可以单独使用,用来截取一部分文档发送给map函数。
如果在上个例子中想分析最近10 000个页面的访问次数(而不是最近一周的),就可以使用limit和sort:
> db.runCommand({"mapreduce" : "analytics", "map" : map, "reduce" : reduce,
"limit" : 10000, "sort" : {"date" : -1}})
query、limit、sort可以随意组合,但是如果不使用limit的话,sort就不能有效发挥作用。
4. 使用作用域
MapReduce可以为map、reduce、finalize函数都采用一种代码类型。但多数语言里,可以指定传递代码的作用域。然而MapReduce会忽略这个作用域。它有自己的作用域键”scope”,如果想在MapReduce中使用客户端的值,则必须使用这个参数。可以用“变量名 : 值”这样的普通文档来设置该选项,然后在map、reduce和finalize函数中就能使用了。作用域在这些函数内部是不变的。例如,上一节的例子使用1/(newDate() – this.date)计算页面的新旧程度。可以将当前日期作为作用域的一部分传递进去:
> db.runCommand({"mapreduce" : "webpages", "map" : map, "reduce" : reduce,
"scope" : {now : new Date()}})
这样,在map函数中就能计算1/(now – this.date)了。
5. 获得更多的输出
还有个用于调试的详细输出选项。如果想看看MapReduce的运行过程,可以将”verbose”指定为true。
也可以用print把map、reduce、finalize过程中的信息输出到服务器日志上。
上一篇文章:
MongoDB指南—16、聚合下一篇文章:
MongoDB指南—18、聚合命令