MongoDB的聚合操作

聚合简介

MongoDB的聚合框架,主要用来对集合中的文档进行变换和组合,从而对数据进行分析以加以利用。

聚合框架的基本思路是:采用多个构件来创建一个管道,用于对一连串的文档进行处理。这些构件包括:筛选(filtering)、投影(projecting)、分组(grouping)、排序(sorting)、限制(limiting)、跳过(skipping)。

聚合的使用方式

db.集合.aggregate(构件1,构件2…);

注意:由于聚合的结果要返回到客户端,因此聚合的结果必须限制在16M内,这是MongoDB支持的最大响应消息的大小。

示例

准备测试数据

向scores集合中插入400条测试数据,100个学生,每个学生有4门课程。

1
2
3
4
5
for (var i=0;i<100;i++ ) {
for (var j=0;j<4;j++) {
db.scores.insert({'studentId':'s'+i,'course':'课程'+j,'score':Math.random()*100});
}
}

查询下数据db.scores.find();

image-20191229155137908

下面要完场的功能是:找出score在80分以上的课程门数最多的3个学生。

实现查找

这里实现的思路如下:

  1. 过滤出分数在80分以上的,不区分课程

    构件为:{'$match':{'score':{$gte:80}}}

  2. 投影:提取id字段

    构件为:{'$project':{'studentId':1}}

  3. 按studentId分组,得到每个学生80分的课程出现的次数

    构件为:{'$group':{'_id':'$studentId','count':{$sum:1}}},studentId每出现一次就加1

  4. 对次数倒序排序

    构件为:{'$sort':{'count':-1}}

  5. 限制查询结果为3条。

    构件为:{'$limit':3}

所以结合起来就是:

1
2
3
4
5
6
7
db.scores.aggregate(
{'$match':{'score':{$gte:80}}}/*过滤出分数在80分以上的*/
,{'$project':{'studentId':1}}/*投影:提取id字段*/
,{'$group':{'_id':'$studentId','count':{$sum:1}}}/*按studentId分组*/
,{'$sort':{'count':-1}}/*倒序*/
,{'$limit':3}/*限制查询3条*/
);

执行得到如下结果:

image-20191229155627150

管道操作符

管道操作符介绍

每个操作符接受一系列的文档,对这些文档做相应的处理,然后把转换的文档作为结果传递给下一个操作符,最后一个操作符会将结果返回。不同的管道操作符可以任意顺序,任意个数组合在一起使用。

管道操作符$match

用于文档集合进行筛选,里面可以使用所有常规的查询操作符。通常会放置在管道的最前面的位置,原因如下:

  1. 快速将不需要的文档过滤,减少后续操作的数据量;
  2. 在投影和分组之前做筛选,查询可以使用索引。

管道操作符$project

用来从文档中提取字段,可以指定包含和排除字段,也可以重命名字段。比如将studentId改名为sid,如下:

1
db.scores.aggregate({'$project':{'sid':'$studentId'}});

管道操作符还可以使用表达式,来满足更复杂的需求。

管道操作符$project的数学表达式

支持的操作符和相应的语法:

  1. $add:[expr1[,expr2,…exprn]]
  2. $subtract:[expr1,expr2]
  3. $multiply:[expr1[,expr2,…exprn]]
  4. $divice:[expr1,expr2]
  5. $mod:[expr1,expr2]

比如将score整体加20分,如下(score被重命名为newScore):

1
2
3
db.scores.aggregate(
{'$project':{'studentId':1,'newScore':{'$add':['$score',20]}}}
);

管道操作符$project的日期表达式

聚合框架还包含了一些用于提取日期信息的表达式,如下:

$year,$month,$week,$dayOfMonth,$dayOfWeek,$dayOfYear,$hour,$minute,$second。

注意:这些只能操作日期型的字段,不能操作数据,使用示例:

1
{'$project':{'opeDay':{'$dayOfMonth':'$payTime'}}}

上面渔具,opeDay是新字段名,payTime是原字段名。

管道操作符$project的字符串表达式

  1. $substr:[expr,开始位置,要取的字节个数];
  2. $concat:[expr1[expr2,…exprn]];
  3. $toLower:expr
  4. $toUpper:expr

比如对studentId加上前缀ss:

1
db.scores.aggregate({'$project':{'sid':{'$concat':['cc','$studentId']}}});

管道操作符$project的逻辑表达式

  1. $cmp:[expr1,expr2],比较2个表达式是否相等,整数前面的大,负数后面的大;
  2. $strcasecmp:[string1,string2],比较2个字符串,区分大小写,只对有罗马字符组成的字符串有效;
  3. $eq,$ne,$gt,$gte,$lt,$lte:[expr1,expr2];
  4. $and,$or,$not
  5. $cond:[boolean表达式,true时的表达式,false时的表达式]
  6. $ifNull:[null时的表达式,不为null时的表达式]

比如显示学生没门课程分数是否达到80分:

1
db.scores.aggregate({'$project':{'studentId':1,'course':1,'score':1,'result':{'$gte':['$score',80]}}});

管道操作符$group

用来将文档依据特定字段的不同值进行分组。选定了分组字段后,就可以把这些字段传递给$group函数的“_id”字段了。比如:

1
2
3
4
/*按studentId分组*/
db.scores.aggregate({"$group":{"_id":"$studentId"}});
/*按studentId和score分组*/
db.scores.aggregate({"$group":{"_id":{"sid":"$studentId","score":"$score"}}});

$group支持的操作符

  1. $sum:value:对每个文档,将value与计算结果相加;
  2. $avg:value:返回每个分组的平均值;
  3. $max:expr:返回分组内的最大值;
  4. $min:expr:返回分组内的最小值;
  5. $first:expr:返回分组的第一个值,忽略其他的值,一般只有在排序后,明确知道数据顺序的时候,这个操作才有意义;
  6. $last:expr:返回分组的最后一个值;
  7. $addToSet:expr:如果当前数组中不包含expr,就将它加入到数组中;
  8. $push:expr:把expr加入数组中。

比如,查询每个学生的平均分数:

1
2
3
4
db.scores.aggregate(
{"$project":{"studentId":1,"score":1}}
,{"$group":{"_id":"$studentId","avgScore":{"$avg":"$score"}}}
);

image-20191229171752023

管道操作符$lookup

$lookup可以用来实现表关联查询,它相当于mysql中的左连接。

比如,有下面的3个集合:

1
2
3
4
5
6
7
8
9
10
db.product.insert({"_id":1,"productname":"商品1","price":15});
db.product.insert({"_id":2,"productname":"商品2","price":36});

db.order.insert({"_id":1,"pid":1,"ordername":"订单1","uid":1});
db.order.insert({"_id":2,"pid":2,"ordername":"订单2","uid":2});
db.order.insert({"_id":3,"pid":2,"ordername":"订单3","uid":2});
db.order.insert({"_id":4,"pid":1,"ordername":"订单4","uid":1});

db.user.insert({"_id":1,"username":1});
db.user.insert({"_id":2,"username":2});

$lookup两表关联join

1
2
3
4
5
6
7
8
9
10
11
db.product.aggregate([
{
$lookup:
{
from: "order",
localField: "_id",
foreignField: "pid",
as: "inventory_docs"
}
}
]);

执行结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"_id" : 1.0,
"productname" : "商品1",
"price" : 15.0,
"inventory_docs" : [
{
"_id" : 1.0,
"pid" : 1.0,
"ordername" : "订单1"
},
{
"_id" : 4.0,
"pid" : 1.0,
"ordername" : "订单4"
}
]
}

lookup 就是使用 aggregate 的 $lookup 属性,$lookup 操作需要一个四个参数的对象,该对象的属性解释如下:

  • localField:在输入文档中的查找字段
  • from:需要连接的集合
  • foreignField:需要在from集合中查找的字段
  • as:输出的字段名字

在输出的结果中,会包含一个 inventory_docs 的字段,它会把 order 中所关联的数据在数组中展现。

$lookup 三表关联join

$lookup 的三表关联查询很简单,只需要配置两个 $lookup 即可。具体查询代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
db.order.aggregate([{
$lookup:
{
from: "product",
localField: "pid",
foreignField: "_id",
as: "inventory_docs"
}
},{
$lookup:
{
from: "user",
localField: "uid",
foreignField: "_id",
as: "user_docs"
}
}]);

执行结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"_id": 3,
"pid": 2,
"ordername": "订单3",
"uid": 2,
"inventory_docs": [
{
"_id": 2,
"productname": "商品2",
"price": 36
}
],
"user_docs": [
{
"_id": 2,
"username": 2
}
]
}

与populate类似,可以参考:https://blog.csdn.net/qq_40140699/article/details/86064932

以上内容来自:https://www.xttblog.com/?p=3787

拆分命令$unwind

用来把数组中的某个值拆分成单独的文档。

示例

1.准备一条数据

1
db.t1.insert({"userId":1,scores:[95,80,66]});

scores字段是一个数组,我们用$unwind将scores拆分成多个文档。

2.拆分scores

1
db.t1.aggregate({"$unwind":"$scores"});

拆分结果如下:

image-20191229172513424

排序命令$sort

可以根据任何字段进行排序,与普通查询中的语法相同。如果要对大量的文档进行排序,强烈建议在管道的第一个阶段进行排序(可以在$match后排序),这时可以使用索引。

比如,按score降序排序:

1
db.scores.aggregate({"$sort":{"score":-1}});

综合示例

下面通过例子来看下常用聚合函数的用法。

有集合game,数据如下:

image-20200102164458779

集合trade_item:

image-20200102164517813

其中gameId为game集合的_id。

还有s_ratio集合,数据为:

image-20200102164620765

s_ratio中的tid是trade_item中的_id。

现在的需求为:根据matchId和source和gameType,将s_ratio的ratio和changeTime按gameId和changeTime分组,并按changeTime倒序排序。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
db.game.aggregate(
{
$match:{ // 过滤数据
matchId:10000,
source:"31",
gameType:"1"
}
},
{
$project:{ // 映射需要的字段
indexNum:1,
gameClass:1,
gid:{$toString:"$_id"} // 这里需要将ObjectId转换为String类型
},
},
{
$lookup:{ // 关联trade_item表,主要要查询出tid
from:"trade_item",
localField:"gid",
foreignField:"gameId",
as:"trade_items"
}
},
{
$unwind:"$trade_items" // 由于$lookup后trade_item是一个集合,所以这里先拆解开为一个一个Document
},
{
$project:{ // 在此映射需要的字段
gameClass:1,
indexNum:1,
tName:"$trade_items.tName",
tid:{$toString:"$trade_items._id"}
}
},
{
$lookup:{ // 关联查询s_ratio,这里主要查询出ratio、changeTime
from:"s_ratio",
localField:"tid",
foreignField:"tid",
as:"odds"
}
},
{
$unwind:"$odds" // 将数组扁平化为document
},
{
$project:{ // 过滤最终需要的字段
gameClass:1,
indexNum:1,
tName:1,
changeTime:"$odds.changeTime",
ratio:"$odds.ratio",
isEarly:"$odds.isEarly"
}
},
{
$group:{ // 按gameId和oddTime分组
_id:{gid:"$_id",oddTime:"$changeTime"},
tNames:{$push:"$tName"},
ratios:{$push:"$ratio"}
}
},
{
$sort:{'_id.oddTime':-1} // 按oddTime倒序排序
}
);

最终得到的数据为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"_id" : {
"gid" : ObjectId("5e09c830e7f47f29f8378cca"),
"oddTime" : ISODate("2020-01-02T03:07:03.101Z")
},
"tNames" : [
"home",
"away"
],
"ratios" : [
"0.88",
"1.02"
]
}
{
"_id" : {
"gid" : ObjectId("5e09c830e7f47f29f8378cca"),
"oddTime" : ISODate("2019-12-30T11:40:06.941Z")
},
"tNames" : [
"home",
"away"
],
"ratios" : [
"0.86",
"1.05"
]
}

常见聚合函数

  1. count:用于返回集合中文档的数量;

  2. distinct:找出给定键的所有不同值,使用时必须制定集合和键,例如:

    查找所有的课程db.runCommand({"distinct":"scores","key":"course"});,distinct后面是集合,key后面是键。

MapReduce

在MongoDB的聚合框架中,还可以使用MapReduce,它非常灵活和强大,但具有一定的复杂性,专门用于实现一些复杂的聚合功能。

MongoDB中的MapReduce使用JavaScript来作为查询语言,因此能表达任意的逻辑,但是它运行非常慢,不应该用在实时的数据分析中。

MapReduce的helloworld

实现的功能:找出集合中所有的键,并统计每个键出现的次数。

  1. Map函数使用emit函数来返回要处理的值,如下:

    1
    2
    3
    4
    5
    var map = function() {
    for (var key in this) {
    emit(key,{count:1});
    }
    }

    this表示对当前文档的引用。

  2. reduce函数需要处理Map阶段或前一个reduce的数据,因此reduce返回的文档必须要能作为reduce的第二个参数的一个元素,如下:

    1
    2
    3
    4
    5
    6
    7
    var reduce = function(key,emits) {
    var total = 0;
    for (var i in emits) {
    total += emits[i].count;
    }
    return {"count":total};
    }
  1. 运行MapReduce,如下:

    1
    var mr = db.runCommand({"mapreduce":"scores","map":map,"reduce":reduce,"out":"mrout"});

    mapreduce:指定集合,map:指定map函数,reduce:指定reduce函数,out:指定存放执行结果的变量名。

  2. 查询最终的结果,如下:

    1
    db.mrout.find();

更多的MapReduce可以使用的键

  1. finalize:function,可以将reduce的结果发送到finalize,这是整个处理的最后一步;
  2. keeptemp:boolean,是否在连接关闭的时候,保存临时结果集合;
  3. query:document,在发送给map前对文档进行过滤;
  4. sort:document,在发送给map前对文档进行排序;
  5. limit:integer,在发送给map前对文档进行限制;
  6. scope:document,可以在javascript中使用的变量;
  7. verbose:boolean,是否纪录详细的服务器日志。

示例:

1
2
3
4
5
6
var query = {"studentId":{"$gt":"s2"}};
var sort = {"studentId":1};
var finalize = function(key,value) {
return {"theKey":key,"theValue":value};
}
var mr = db.runCommand({"mapreduce":"scores","map":map,"reduce":reduce,"out":"myOut","query":query,"sort":sort,"limit":2,"finalize":finalize});

聚合命令group

用来对集合进行分组,分组之后,再对每个分组内的文档进行聚合。比如要对studentId进行分组,找到每个学生的最高分数,可以如下步骤执行:

  1. 测试数据就用聚合框架一开始的准备的数据;

  2. 使用group,命令如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    db.runCommand({"group":{
    "ns":"scores",
    "key":{"studentId":1},
    "initial":{"score":0},
    "$reduce":function(doc,prev) {
    if (doc.score > pre.score) {
    prev.score = doc.score;
    }
    }
    }
    });

    ns:指定要分组的集合;

    key:指定分组的键;

    initial:每一组的reduce函数调用的时候,在开始的时候调用一次,以做初始化;

    $reduce:在每组的每个文档上执行,系统会自动传入2个参数,doc是当前处理的文档,prev是本组前一次执行的结果文档。

还可以在分组的时候加入condition,如:

1
"condition":{"studentId":{"$1t":"s2"}}

同样还可以使用finalizer对reduce的结果进行最后的处理,比如要求每个学生的平均分,就可以先按照studentId分组,求出一个总的分数,然后在finalizer里面求平均分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
db.runCommand({
"group":{
"ns":"scores",
"key":{"studentId":1},
"initial":{"total":0},
"$reduce":function(doc,prev) {
prev.total += doc.score;
},
"condition":{"studentId":{"$lt":"s2"}},
"finalize":function(prev) {
prev.avg = prev.total / 3;
}
}
});

注意:finalize是只在每组结果返回给客户端前调用一次,也就是每组结果只调用一次。

对于分组的key较为复杂的时候,还可以使用函数来作为键,比如让键不区分大小写,就可以如下定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
db.runCommand({
"group":{
"ns":"score",
$keyf:function(doc) {
return {studentId:doc.studentId.toLowerCase()};
},
"initial":{"total":0},
"$reduce":function(doc,prev) {
prev.total += doc.score;
},
"condition":{"studentId":{"$lt":"s2"}},
"finalize":function(prev) {
prev.avg = prev.total / 3;
}
}
});

注意:要使用$keyf来定义函数作为键,另外一定要返回对象的格式。

注:文章内容整理自《跟着CC学架构》。

Author: Donny
Link: https://tommy88.top/2019/12/29/MongoDB的聚合操作/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
微信打赏