代码编织梦想

问题回顾

今天,同事小张 Q 我, 说自己辛苦花了一天的时间,基于 mongodb 数据库开发的待办统计功能一直报错!

于是笔者花了近半小时了解小张的开发需求以及代码实现方式,大致明白问题出在对待办 collection 做统计时,调用 collection 的分组 group 函数、聚合 aggregate 函数的使用方式不对。

待办 collection 文档分组(group )函数代码

GroupByResults groupByResults = mongoTemplate.group(new Criteria().andOperator(criteriaArray),mongoTemplate.getCollectionName(PendingEntity.class), groupBy, PendingEntity.class);long resultCount = ((List)groupByResults.getRawResults().get("retval")).size();

待办 collection 文档聚合(aggregate)函数代码

AggregationResults<PendingEntity> results = mongoTemplate.aggregate(aggregation, "studentScore", PendingEntity.class);double totleScore = results.getUniqueMappedResult().getCollect();

问题定位

异常信息

Map-reduce supports operations on sharded collections, both as an input and as an output. This p describes the behaviors of mapReduce specific to sharded collections.However, starting in version 4.2, MongoDB deprecates the map-reduce option to create a new sharded collection as well as the use of the sharded option for map-reduce. To output to a sharded collection, create the sharded collection first. MongoDB 4.2 also deprecates the replacement of an existing sharded collection.Sharded Collection as InputWhen using sharded collection as the input for a map-reduce operation, mongos will automatically dispatch the map-reduce job to each shard in parallel. There is no special option required. mongos will wait for jobs on all shards to finish.Sharded Collection as OutputIf the out field for mapReduce has the sharded value, MongoDB shards the output collection using the _idfield as the shard key.

从异常信息提示来看,我注意到 errmsg 字段值:“can't do command: group on sharded collection”,大意是说分片文档(sharded collection)不能使用分组 group 函数。

笔者猜测是 sharded collection 的问题,于是笔者从一些技术博客和 mongodb 官网查了下使用 group 函数的一些限制,大致如下:

  • 分片表不能 group 分组

can't do command: group on sharded collection
  • group 操作不会处理超过 20000 个唯一键( group by 的关键字具有唯一性约束条件下)

exception: group() can't handle more than 20000 unique keys

显然,分片表不能 group 的限制,也验证了我的当初的猜想。

于是我问了下运维组的同事,也证实了 mongodb 在创建 collection 文档时,会指定文档数据分片到不同服务器上 ,这是出于对 mongodb 稳定性的考虑吧。

解决方案

既然分片表不能 group ,那如何解决分组统计的问题呢?

答案是用 “mapReduce” 。

想到什么呢?

是不是很类似 Hadoop 中的 Map-Reduce 的思想:

MapReduce最重要的一个思想: 分而治之. 就是将负责的大任务分解成若干个小任务, 并行执行. 完成后在合并到一起. 适用于大量复杂的任务处理场景, 大规模数据处理场景.

Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

Reduce负责“合”,即对map阶段的结果进行全局汇总。

Hadoop 中的 Map-Reduce 执行流程

来源网络

翻阅 mongodb 官网文档,对 mapReduce 函数介绍如下:

Map-reduce supports operations on sharded collections, both as an input and as an output. This p describes the behaviors of mapReduce specific to sharded collections.

However, starting in version 4.2, MongoDB deprecates the map-reduce option to create a new sharded collection as well as the use of the sharded option for map-reduce. To output to a sharded collection, create the sharded collection first. MongoDB 4.2 also deprecates the replacement of an existing sharded collection

.

Sharded Collection as Input

When using sharded collection as the input for a map-reduce operation, mongos will automatically dispatch the map-reduce job to each shard in parallel. There is no special option required. mongos will wait for jobs on all shards to finish.

Sharded Collection as Output

If the out field for mapReduce has the sharded value, MongoDB shards the output collection using the _idfield as the shard key.

大意是 mapReduce 支持对 sharded collections 分片文档 input / output 操作,其处理逻辑如下:

  1. mongos接收到mapreduce的操作请求后,根据query条件,将map-reduce任务发给持有数据的shards(sharding collection将会被分裂成多个chunks并分布在多个shards中,shard即为mongod节点)。

  2. 每个shards都依次执行mapper和reducer,并将结果写入到本地的临时collection中,结果数据是根据_id(即reducer的key)正序排列。

  3. 当所有的shards都reduce完成之后,将各自结果数据中_id的最大值和最小值(即min、max key)返回给mongos。

  4. mongos负责shuffle和partition,将所有shards反馈的min、max key进行汇总,并将整个key区间分成多个partitions,每个partition包含[min,max]区间,此后mongos将partiton信息封装在finalReduce指令中并发给每个shard,最终每个shard都会收到一个特定的partition的任务;partition不会重叠。

  5. 此后每个shard将与其他所有的shards建立链接,根据partition信息,从min到max,遍历每个key。对于任何一个key,当前shard都将从其他shards获取此key的所有数据,然后执行reduce和finalize方法,每个key可能会执行多次reduce,这取决于values的条数,但是finalize只会执行一次,最终将此key的finalize的结果通过本地方式写入sharding collection中。

  6. 当所有的shards都处理完毕后,mongos将处理结果返回给客户端(inline)。

mapReduce 语法格式:

db.collection.mapReduce(<map>,         <reduce>,                         {                           out: <collection>,                           query: <document>,                           sort: <document>,                           limit: <number>,                           finalize: <function>,                           scope: <document>,                           jsMode: <boolean>,                           verbose: <boolean>,                           bypassDocumentValidation: <boolean>                         }                       )

参数说明:

  • map:映射函数(生成键值对序列,作为reduce函数参数)

  • reduce:统计函数

  • query:目标记录过滤

  • sort:目标记录排序

  • limit:限制目标记录数量

  • out:统计结果存放集合(不指定使用临时集合,在客户端断开后自动删除)

  • finalize:最终处理函数(对 reduce 返回结果进行最终整理后存入结果集合)

  • Scope:向map、reduce、finalize导入外部变量

  • jsMode说明:为 false 时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,为 true 时 BSON-->js-->map-->reduce-->BSON

  • verbose:显示详细的时间统计信息

于是,我让小张同学把 group 换成 mapReduce 函数,问题解决!

String reducef = "function(key,values){var total = {count:0};for(var i=0;i<values.length;i++){total.count += values[i].count;} return total;}";MapReduceResults<BasicDBObject> mrr = readMongoTemplate.mapReduce(query,readMongoTemplate.getCollectionName(ReadingEntity.class), map, reducef, BasicDBObject.class);

问题总结

有时候,问题就出在最显眼的问题描述上,需要有心人去细细琢磨。

另外,其实大部分问题都可以在官网上找到相关技术解决方案,却又苦于受英语单词的折磨。。。

参考

https://docs.mongodb.com/manual/aggregation/

https://docs.mongodb.com/manual/core/map-reduce-sharded-collections/

https://www.cnblogs.com/chenpingzhao/p/7913247.html

https://blog.csdn.net/weixin_42582592/article/details/83080900

https://blog.csdn.net/iteye_19607/article/details/82644559

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接: https://blog.csdn.net/sshduanzhijun/article/details/106670580