MongoDB虽然不像我们常用的mysql,sqlserver,oracle等关系型数据库有group by函数那样方便分组,但是MongoDB要实现分组也有3个办法:
Mongodb三种分组方式:
下面就来看下mapreduce方式:
Mongodb官网对MapReduce介绍:
Map/reduce in MongoDB is useful for batch processing of data and aggregation operations. It is similar in spirit to using something like Hadoop with all input coming from a collection and output going to a collection. Often, in a situation where you would have used GROUP BY in SQL, map/reduce is the right tool in MongoDB. 大致意思是:Mongodb中的Map/reduce主要是用来对数据进行批量处理和聚合操作,有点类似于使用Hadoop对集合数据进行处理,所有输入数据都是从集合中获取,而MapReduce后输出的数据也都会写入到集合中。通常类似于我们在SQL中使用Group By语句一样。 使用MapReduce要实现两个函数:Map和Reduce。Map函数调用emit(key,value)遍历集合中所有的记录,将key与value传给Reduce函数进行处理。Map函数和Reduce函数是使用Javascript编写的,并可以通过db.runCommand或mapreduce命令来执行MapReduce操作。
MapReduce命令如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 db.runCommand( { mapreduce : <collection>, map : <mapfunction>, reduce : <reducefunction> [, query : <query filter object >] [, sort : <sort the query. useful for optimization>] [, limit : <number of objects to return from collection>] [, out : <output-collection name >] [, keeptemp: <true |false >] [, finalize : <finalizefunction>] [, scope : <object where fields go into javascript global scope >] [, verbose : true ] } );
参数说明:
mapreduce:要操作的目标集合
map:映射函数(生成键值对序列,作为Reduce函数的参数)
reduce:统计函数
query:目标记录过滤
sort:对目标记录排序
limit:限制目标记录数量
out:统计结果存放集合(如果不指定则使用临时集合,在客户端断开后自动删除)
keeptemp:是否保留临时集合
finalize:最终处理函数(对reduce返回结果执行最终整理后存入结果集合)
scope:向map、reduce、finalize导入外部变量
verbose:显示详细的时间统计信息
map函数 map函数调用当前对象,并处里对象的属性,传值给reduce,map方法使用this来操作当前对象,最少调用一次emit(key,value)方法来向reduce提供参数,其中emit的key为最终数据的id。
reduce函数 接收一个值和数组,根据需要对数组进行合并分组等处理,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组。
Finalize函数 此函数为可选函数,可在执行完map和reduce后执行,对最后的数据进行统一处理。
看完基本介绍,我们再来看一个实例:
已知集合feed,测试数据如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 { "_id" : ObjectId("50ccb3f91e937e2927000004" ), "feed_type" : 1 , "to_user" : 234 , "time_line" : "2012-12-16 01:26:00" } { "_id" : ObjectId("50ccb3ef1e937e0727000004" ), "feed_type" : 8 , "to_user" : 123 , "time_line" : "2012-12-16 01:26:00" } { "_id" : ObjectId("50ccb3e31e937e0a27000003" ), "feed_type" : 1 , "to_user" : 123 , "time_line" : "2012-12-16 01:26:00" } { "_id" : ObjectId("50ccb3d31e937e0927000001" ), "feed_type" : 1 , "to_user" : 123 , "time_line" : "2012-12-16 01:26:00" }
我们按动态类型feed_type和用户to_user进行分组统计,实现结果:
feed_type
to_user
cout
1
234
1
8
123
1
1
123
2
实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 $map = ' function() { var key = {to_user:this.to_user,feed_type:this.feed_type}; var value = {count :1 }; emit(key ,value ); } '; $reduce = ' function(key , values) { var ret = {count :0 }; for(var i in values) { ret.count += 1 ; } return ret; }'; $query = null ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 $mongo = new Mongo ('mongodb://root:root@127.0.0.1: 28017/' ); $instance = $mongo ->selectDB ("testdb" ); $cmd = $instance ->command (array ( 'mapreduce' => 'feed' , 'map' => $map , 'reduce' => $reduce , 'query' => $query , 'out' => 'feed_temp_res' )); $cursor = $instance ->selectCollection ('feed_temp_res' )->find (); $result = array (); try { while ($cursor ->hasNext ()) { $result [] = $cursor ->getNext (); } } catch (MongoConnectionException $e ) { echo $e ->getMessage (); } catch (MongoCursorTimeoutException $e ) { echo $e ->getMessage (); } catch (Exception $e ){ echo $e ->getMessage (); } var_dump ($result );
下面是输出的结果,和预期结果一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 { "_id" : { "to_user" : 234 , "feed_type" : 1 } , "value" : { "count" : 1 } } { "_id" : { "to_user" : 123 , "feed_type" : 8 } , "value" : { "count" : 1 } } { "_id" : { "to_user" : 123 , "feed_type" : 1 } , "value" : { "count" : 2 } }
以上只是简单的统计实现,你可以实现复杂的条件统计编写复杂的reduce函数,可以增加查询条件,排序等等。
附上mapReduce数据库处理函数(简单封装)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 /** * mapReduce分组 * * @param string $table_name 表名(要操作的目标集合名) * @param string $map 映射函数(生成键值对序列,作为 reduce 函数参数) * @param string $reduce 统计处理函数 * @param array $query 过滤条件 如:array ('uid' =>123 ) * @param array $sort 排序 * @param number $limit 限制的目标记录数 * @param string $out 统计结果存放集合 (不指定则使用tmp_mr_res_ $table_name , 1.8 以上版本需指定) * @param bool $keeptemp 是否保留临时集合 * @param string $finalize 最终处理函数 (对reduce返回结果进行最终整理后存入结果集合) * @param string $scope 向 map 、reduce、finalize 导入外部js变量 * @param bool $jsMode 是否减少执行过程中BSON和JS的转换,默认true (注:false 时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,//true时BSON-->js-->map-->reduce-->BSON) * @param bool $verbose 是否产生更加详细的服务器日志 * @param bool $returnresult 是否返回新的结果集 * @param array &$cmdresult 返回mp命令执行结果 array ("errmsg" =>"" ,"code" =>13606 ,"ok" =>0 ) ok=1 表示执行命令成功 * @return */ function mapReduce($table_name ,$map ,$reduce ,$query =null,$sort =null,$limit =0 ,$out ='' ,$keeptemp =true ,$finalize =null,$scope =null,$jsMode =true ,$verbose =true ,$returnresult =true ,&$cmdresult ){ if (empty ($table_name ) || empty ($map ) || empty ($reduce )){ return null; } $map = new MongoCode($map ); $reduce = new MongoCode($reduce ); if (empty ($out )){ $out = 'tmp_mr_res_' .$table_name ; } $cmd = array ( 'mapreduce' => $table_name , 'map' => $map , 'reduce' => $reduce , 'out' =>$out ); if (!empty ($query ) && is_array($query )){ array_push($cmd , array ('query' =>$query )); } if (!empty ($sort ) && is_array($sort )){ array_push($cmd , array ('sort' =>$query )); } if (!empty ($limit ) && is_int($limit ) && $limit >0 ){ array_push($cmd , array ('limit' =>$limit )); } if (!empty ($keeptemp ) && is_bool($keeptemp )){ array_push($cmd , array ('keeptemp' =>$keeptemp )); } if (!empty ($finalize )){ $finalize = new Mongocode($finalize ); array_push($cmd , array ('finalize' =>$finalize )); } if (!empty ($scope )){ array_push($cmd , array ('scope' =>$scope )); } if (!empty ($jsMode ) && is_bool($jsMode )){ array_push($cmd , array ('jsMode' =>$jsMode )); } if (!empty ($verbose ) && is_bool($verbose )){ array_push($cmd , array ('verbose' =>$verbose )); } $dbname = $this- >curr_db_name; $cmdresult = $this- >mongo->$dbname- >command($cmd ); if ($returnresult ){ if ($cmdresult && $cmdresult ['ok' ]==1 ){ $result = $this- >find($out , array ()); } } if ($keeptemp ==false ){ //删除集合 $this- >mongo->$dbname- >dropCollection($out ); } return $result ; }
MongoDB官方网站介绍:
MapReduce介绍 http://docs.mongodb.org/manual/core/map-reduce/
Aggregation介绍 http://docs.mongodb.org/manual/aggregation/