MongoDB聚合

MongoDB聚合操作用于对数据的批量操作,将集合按条件分组后在进行一系列操作,诸如求和、求均值等。聚合操作能对集合进行复杂的操作,主要用于数理统计和数据挖掘。MongoDB中聚合操作的输入是i集合中的文档,输出可以是一个文档,也可是多条文档。MongoDB提供非常强大的聚合操作,可分为三种方式

  • 聚合管道(Aggregation Pipeline)
  • 单目聚合操作(Single Purpose Aggregation Operation)
  • MapReduce 编程模型

聚合管道

POSIX多线程使用方式中,有种叫做管道(流水线)的方式,其数据元素流串行地被一组线程按顺序执行。

聚合管道由阶段(Stage)组成,文档在一个阶段处理完毕后,聚合管道会把处理结果传到下一个阶段。聚合管的功能

  • 对文档进行过滤,查询出符合条件的文档。
  • 对文档进行变换,改变文档的输出形式。

聚合管道的每个阶段使用阶段操作符(Stage Operators)定义,在每个阶段操作符中可使用表达式操作符(Expression Operators)计算总和、均值、拼接或分割字符串等操作,直到每个阶段完结最后返回结果,返回的结果可直接输出,也可存储到集合中。

《MongoDB聚合》 聚合管道到的用法

处理流程

  • db.collection.aggregate() 可同时使用多个管道,方便数据处理。
  • db.collection.aggregate() 使用 MongoDB 内置原生操作,聚合效率高且支持类似SQL中GroupBy的操作,不在需要用户编写自定义的JS例程。
  • 每个阶段管道限制100M的内存,若单节点管道超出极限,MongoDB产生错误。为了能够处理大型数据集,可设置 allowDiskUsetrue 为聚合管道节点把数据写入临时文件,以解决100M内存限制。
  • db.collection.aggregate() 可作用于分片集合,但结果不能输在分片集合,MapReduce可作用于在分片集合其结果也可输在分片集合中。
  • db.collection.aggregate() 返回一个指针(cursor),数据存放在内存中可直接操作,跟MongoShell一样。
  • db.collection.aggregate() 输出的结果只能保存在文档中,BSON Document大小限制为16M。

语法解析

db.collection.aggregate(pipeline, options)
db.collection.aggregate([{<stage>,...}], ...)

pipeline 参数

  • $project 对输入文档添加新字段或删除现有字段,可自定义显示哪些字段。
  • $match 根据条件过滤仅输出符合条件的文档,若放在pipeline前面,根据条件过滤数据并传输到写一个阶段管道,可提高后续数据处理效率。也可放在out之前用以对结果再一次过滤。
  • $redact 字段所处的document结构的级别
  • $limit 用来限制MongoDB聚合管道返回的文档数量
  • $skip 在聚合管道中跳过指定数量的文档并返回剩余的文档
  • $unwind 将文档中某个数组类型字段拆分成多条,每条包含数组中的一个值。
  • $sample 随机选择从起输入指定数量的文档,若大于或等于5%的collection的文档,$sample进行收集扫描并排序随后选择顶部文件。因此$sample在收集阶段是受排序的内存限制。
  • $sort 将输入文档排序后输出
  • $geoNear 用于地理位置数据分析
  • $out 必须为pipeline最后一个阶段管道,将最后计算结果写入到指定collection中。
  • $indexStats 返回数据集合的每个索引使用情况
  • $group 将集合中的文档分组,可用于统计结果,$group 首先将数据根据 key 进行分组。

$match

筛选条件,过滤不满足条件的文档,可使用常规查询操作符。

db.users.aggregate( {$match: {'age':{$gte:18}} } )

$project

  • 用于包含、排除字段,设置需查询或过滤的字段,0为过滤掉字段不显示,1为需查询的字段。
  • 用于对字段重命名
  • 投射中可使用表达式
db.users.aggregate(
  {$match: {age: {$gte: 18}}},
  {$project: {_id:0, username:1, created_at:1}}
)

// $project 当字段值为0或1时用于过滤字段,当键名为一个自定义的字符串,键值为$紧跟原字段表示要对该字段进行重命名。
db.users.aggregate(
  {$match: {age: {$gte: 18}}},
  {$project: {_id:0, username:1, nickname:$username }}
)

// 通过修改字段名达到生成字段副本,以便后续操作符使用。
db.users.aggregate(
  { $match: {age: {$gte: 18 } } },
  { $project: {id:$_id, username:1 } }
)

算术表达式可对数值运算

db.users.aggregate(
  // 对 score 字段值加1后作为 scores 的值
  { $project: {scores: { $add : [$score,$score,1]  } } },
)
db.users.aggregate(
  // $subtract:[exp1, exp2] 数组中第一个元素减去第二个元素
  { $project: {scores: { $subtract: [$score,1]  } } },
)
db.users.aggregate(
  // $multiply:[exp1, exp2] 数组中多个元素相乘
  { $project: {scores: { $multiply: [$score, 2, 5]  } } },
)
db.users.aggregate(
  // $divide:[exp1, exp2] 数组中第一个元素除以第二个元素
  { $project: {scores: { $divide: [$score, 2, 5]  } } },
)
db.users.aggregate(
  // $mod:[exp1, exp2] 数组中第一个元素除以第二个元素的余数
  { $project: {scores: { $mod: [$score, 2]  } } },
)

字符串操作

db.users.aggregate(
  // $substr:[exp, startOffset, numToReturn] 字符串截取
  { $project: {nickname: { $substr: [$nickname, 2, 6]  } } },
)

db.users.aggregate(
  // $concat:[exp1, exp2, exp3...] 字符串拼接,将数组中多个元素拼接在一起
  { $project: {fullname: { $concat: [$firstname, $lastname]  } } },
)

db.users.aggregate(
  // $toLower:exp 字符串转为小写
  { $project: {nickname: { $toLower: $username } } },
)

db.users.aggregate(
  // $toUpper:exp 字符串转为大写
  { $project: {nickname: { $toUpper: $username } } },
)

为所有文档新增字段

db.users.update({}, {$set, {publish_at:new Date()} }, true, true)

日期表达式

db.users.aggregate(
  { $project: { 
      'year': { $year: $created_at },
      'month': { $month: $created_at }, 
      'dayOfMonth': { $dayOfMonth: $created_at }, 
      'dayOfWeek': { $dayOfWeek: $created_at }, 
      'dayOfYear': { $dayOfYear: $created_at },  
      'hour': { $hour: $created_at }, 
      'minute': { $minute: $created_at }, 
      'second': { $second: $created_at }, 
    } 
  }
)

时间间隔(秒数)

db.users.aggregate(
  { 
    $project: { 
      'fasttime': {
         $subtract: [ { $second: new Date() }, { $second: $created_at } ]
        } 
     } 
  }
)

字符串比较

db.users.aggregate(
  // $cmp:[exp1, exp2] 字符串比较,相同为0,小于返回负数, 大于返回正数
  { $project: { result : { $cmp: [ $age, 18 ] } } }
)
db.users.aggregate(
  // $strcasecmp:[exp1, exp2] 字符串比较,相同为0,小于返回-1, 大于返回1
  { $project: { result : { $strcasecmp: [ $username, 'junchow' ] } } }
)

逻辑条件

db.users.aggregate(
  // $eq 判断表达式是否相等
  { $project: { result : {$eq: [$username, 'junchow' ] } } }
)

db.users.aggregate(
  // $and [exp1, exp2...expn] 连接多条件,所有条件为真则表达式为真
  { $project: { result: { $and : [ {$eq : [$username:'junchow']}, {$gt:[$age : 18]} ] } }  }
)

db.users.aggregate({
  // $not exp 用于取反操作
  $project: {result: {$not:{$eq:[$username, 'junchow']}} }
})

db.users.aggregate({
  // $cond:[booleanExp, trueExp, falseExp] 三目运算符
  $project : { result: {$cond: [{$eq:[$username:'junchow']}, true, false] } }
})

db.users.aggregate({
  // $ifNull:[exp, replacementExpr] 若条件为null则返回表达式值,若字段不存在时字段值为null
  $project: { result: { $ifNull : [ $notExistField, 'not exist is null' ] } }
})

$group

$group分组使用_id指定要分组的键名,用来自定义字段统计。

db.users.aggregate({
  $match : { age: { $gte : 18 } }
},{
  $group : { _id:$username, count:{$sum:1} }
});

// 多字段分组
db.users.aggregate({
  $match: {age: {$gte:18}  }},
  $group: {_id:{username:$username, age:$ge}, 'count':{$sum:1} }        
})

// $sum:val 对每个文档加val求和
// $avg:val 对每个文档求均值
db.users.aggregate({
  $group: { _id:$username, count:{$avg:$age} }
})

db.users.aggregate({
  $group: { _id:$username, count:{$max:$age}  }
})

db.users.aggregate({
  $group: {_id:$username, count:{$min:$age} }
})

// $first:val 获取分组中首个
db.users.aggregate({
  $group:{_id:$username, count:{$first: $age} }
})
db.users.aggregate({
  $group:{_id:$username, count:{$last: $age} }
})
db.users.aggregate({
  $group: {_id:$username, count:{$addToSet: $age} }
})
db.users.aggregate({
  $group:{_id:$username, count:{$push: $age} }
})

聚合运算

group

先选定分组所依据的键,而后将集合依据选定键值的不同分成若干组。然后可通过聚合每一组内的文档,产生一个结果文档。

group不支持分片集群,无法进行分布式运算(shard cluster)。若需要支持分布式需使用aggregatemapReduce

db.collection.group(document)

{
  # 分组字段
  key:{key1, key2:1},
  # 查询条件
  cond:{},
  # 聚合函数
  reduce:function(current, result){},
  # 初始化
  initial:{},
  # 统计一组后的回调函数
  finalize:function(){}
}

计算每个栏目下商品个数

SELECT COUNT(*) FROM goods GROUP BY category_id;

db.goods.group({
  key:{category_id:1},
  cond:{},//所有
  reduce:function(current, result){//current对应当前行,result对应分组中的多行
    result.total += 1;
  },
  initial:{total:0}
})

查看每个栏目下商品价格大于100的数量

SELECT category,goods_name FROM goods WHERE 1=1 AND price>100 GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{price:{$gt:100}},
  reduce:function(current,result){
    result.count += 1;
  },
  initial:{count:0}
})

计算每个栏目下商品库存量

SELECT category_id,SUM(store) FROM goods WHERE 1=1 AND GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  initial:{sum:0},
  reduce:function(current,result){
    result += current.store;
  }
});

获取每个栏目下最贵的商品价格

SELECT catetory_id,MAX(price) FROM goods GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  initial:{max:0},
  reduce:function(current,result){
    if(current.price > result.max){
      result.max = current.price;
    }
  }
});

查询每个栏目下商品的平均价格

SELECT category_id,AVERAGE(price) FROM goods GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  reduce:function(current,result){
    result.total += current.price;
    result.count  += 1;
  },
  initial:{total:0, count:0},//进组result
  finalize:functioin(result){//出组 result
    result.average = result.total/result.count;
  }
})

aggregate

aggregate聚合框架与sql对比

  • $match WHERE
  • $group GROUP BY
  • $project SELECT
  • $sort ORDER BY
  • $limit LIMIT
  • $sum SUM()
  • $sum COUNT()

查询每个栏目下商品数量

SELECT COUNT(*) FROM goods GROUP BY category_id

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}}},
  {$project:{_id:0, category_id:'$category_id', count:'$count'}},
  {$sort:{count:1}}
])

查询每个栏目下价格大于100的商品个数

SELECT COUNT(*) FROM goods WHERE 1=1 AND price>100 GROUP BY category_id

db.goods.aggregate([
  {$match:{price:{$gt:100}}},
  {$group:{_id:'$category_id'}, count:{$sum:1}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
  {$sort:{count:1}}
])

查询每个栏目下价格大于100的商品个数,仅显示个数大于3的。

SELECT category_id,COUNT(*) AS count WHERE 1=1 AND price>100 GROUP BY category_id HAVING count>3

db.goods.aggregate([
  {$match:{price:{$gt:100}}},
  {$group:{_id:'$category_id', count:{$sum:1}}},
  {$match:{count:{$gt:3}}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
  {$sort:{count:1}}
])

查看每个栏目下商品的库存量

SELECT category_id,SUM(store) WHERE 1=1 GROUP BY category_id

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}, total:{$sum:'$store'}}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count', total:'$total'}},
  {$sort:{total:1}}
])

查询每个栏目下商品的平均价格并升序排序

SELECT category_id,AVG(price) AS avg FROM goods GROUP BY category_id ORDER BY avg ASC

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}, average:{$avg:'$price'}}},
  {$sort:average:-1}
])

mapReduce

mapReduce是一个轻松并行化到多台服务器的聚合方法,它会拆分问题,再将各个部分发送到不同机器上,让每台机器都完成一部分。当所有机器都完成后,再将结果汇集起来形成最终完整的结果。

mapReduce最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么无作为,要么产生一些键和x个值。接着进入中间环节(洗牌shuffle),按照分组并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值就是最后的结果。

mapReduce的代价是速度,group不是很快,mapReduce更慢,绝不要用在实时环境中,要作为后台任务运行,将创建一个保存结果的结合,可对这个集合进行实时查询。

mapReduce的工作过程

  • map 映射
    现将同一个组的数据映射到一个文档(数组)上,在映射环节想要得到文档中每个键,map()使用emit()返回要处理的值。edit()会给mapReduce一个键和一个值,键类似group所使用键key。
  • reduce 归约
    将数组(同一组)数据进行运算,reduce()由两个参数,一个是key也就是emit()返回的第一个值,另一个是数组,由一个或多个对应于键的文档构成。reduce()一定要能够被反复调用,不论是映射环节还是前一个简化环节。

mapReduce语法

db.runCommand({
  mapreduce:字符串,集合名
  map:函数,
  reduce:函数,
  [query:文档,发往map()前先给过渡的文档],
  [sort:文档,发往map()前先给文档排序],
  [limit:整数,发往map()的文档数量上限],
  [out:字符串,统计结果保存的集合],
  [keeptemp:布尔值,链接关闭时临时结果集合是否保存],
  [finalize:函数,将reduce的结果发给此函数做最后处理],
  [scope:文档,js代码中要用到的变量],
  //jsMode=true时 BSON>JS>map>reduce>BSON
  //jsMode=false时 BSON>JS>map>BSON>JS>reduce>BSON,可处理非常大的mapreduce
  [jsMode:布尔值,是否减少执行过程中BSON和JS的转换,默认为true],
  [verbose:布尔值,是否产生更加详细的服务器日期,默认为true]
})

《MongoDB聚合》 mapReduce

MongoDB没有模式,所以并不晓得每个文档由多少个键,通常找到集合的所有键的最好方法就是用MapReduce。

查询结合中所有键

var map = function(){
  for(var k in this){ //this当前映射文档的引用
    emit(k, {count:1});//将文档某个键的计数返回
  }
}
var reduce = function(key,emits){
  var total = 0;
  for(var k in emits){

    }
}

计算每个栏目下商品库存总量

SELECT category_id,SUM(store) AS sum FROM goods GROUP BY category_id

var map = function(){
  emit(this.category_id, this.store);//获得栏目下商品的库存量
};
var reduce = function(key,store){
  return Array.sum(store);
}
db.goods.mapReduce(map,reduce,query,{out:'result'});

查询每个栏目下商品的平均价格

var map = function(){
  emit(this.category_id, this.price);
}
var reduce = function(category_id,price){
  return Array.avg(price);
}
db.goods.mapReduce(map, reduce, {out:'result'})

将MongoDB组成的shard分片集群把地震数据分布到各节点上,将中国区域按10个经度10个维度为一组约30块,并用mapReduce计算地震数据,统计每组上每月的地震次数及地震级别。分析出结果把地震高发区用偏红色标注,低发区偏绿标注。

    原文作者:JunChow520
    原文地址: https://www.jianshu.com/p/42845d117587
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞