映射化简¶
映射化简操作能够处理复杂的聚合任务。若要执行映射化简操作,MongoDB提供了
[mapReduce
](mapReduce.html#mapReduce) 命令,以及在 [mongo
](mongo-1.html#mongo)
壳中的 [db.collection.mapReduce()
](db.collection.mapReduce.html#db.collection.m
apReduce) 的包装方法。
对于很多简单的聚合任务, 查看 聚合框架.
示例¶
本节提供了一些在 [mongo
](mongo-1.html#mongo) 壳中使用 [db.collection.mapReduce()
](db.c
ollection.mapReduce.html#db.collection.mapReduce) 方法的实例:
db.collection.mapReduce(
<mapfunction>,
<reducefunction>,
{
out: <collection>,
query: <document>,
sort: <document>,
limit: <number>,
finalize: <function>,
scope: <document>,
jsMode: <boolean>,
verbose: <boolean>
}
)
欲了解更多有关参数信息, 查看 [db.collection.mapReduce()
](db.collection.mapReduce.html#db.
collection.mapReduce) 参考页。
Consider the following map-reduce operations on a collection orders
that
contains documents of the following prototype:
{
_id: ObjectId("50a8240b927d5d8b5891743c"),
cust_id: "abc123",
ord_date: new Date("Oct 04, 2012"),
status: 'A',
price: 250,
items: [ { sku: "mmm", qty: 5, price: 2.5 },
{ sku: "nnn", qty: 5, price: 2.5 } ]
}
返回客户ID的总价¶
Perform map-reduce operation on the orders
collection to group by the
cust_id
, and for each cust_id
, calculate the sum of the price
for each
cust_id
:
- Define the map function to process each input document:
* In the function, `this` refers to the document that the map-reduce operation is processing.
* The function maps the `price` to the `cust_id` for each document and emits the `cust_id` and `price` pair.
var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
Define the corresponding reduce function with two arguments
keyCustId
andvaluesPrices
:- The
valuesPrices
is an array whose elements are theprice
values emitted by the map function and grouped bykeyCustId
. The function reduces the
valuesPrice
array to the sum of its elements.var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices); };
- The
Perform the map-reduce on all documents in the
orders
collection using themapFunction1
map function and thereduceFunction1
reduce function.db.orders.mapReduce(
mapFunction1, reduceFunction1, { out: "map_reduce_example" } )
This operation outputs the results to a collection named map_reduce_example
.
If the map_reduce_example
collection already exists, the operation will
replace the contents with the results of this map-reduce operation:
计算的订单数量,总数量,平均每个项目的数量¶
In this example you will perform a map-reduce operation on the orders
collection, for all documents that have an ord_date
value greater than
01/01/2012
. The operation groups by the item.sku
field, and for each sku
calculates the number of orders and the total quantity ordered. The operation
concludes by calculating the average quantity per order for each sku
value:
Define the map function to process each input document:
- In the function,
this
refers to the document that the map-reduce operation is processing. For each item, the function associates the
sku
with a new objectvalue
that contains thecount
of1
and the itemqty
for the order and emits thesku
andvalue
pair.var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) { var key = this.items[idx].sku; var value = { count: 1, qty: this.items[idx].qty }; emit(key, value); } };
- In the function,
Define the corresponding reduce function with two arguments
keySKU
andvaluesCountObjects
:valuesCountObjects
is an array whose elements are the objects mapped to the groupedkeySKU
values passed by map function to the reducer function.- The function reduces the
valuesCountObjects
array to a single objectreducedValue
that also contains thecount
and theqty
fields. In
reducedValue
, thecount
field contains the sum of thecount
fields from the individual array elements, and theqty
field contains the sum of theqty
fields from the individual array elements.var reduceFunction2 = function(keySKU, valuesCountObjects) {
reducedValue = { count: 0, qty: 0 }; for (var idx = 0; idx < valuesCountObjects.length; idx++) { reducedValue.count += valuesCountObjects[idx].count; reducedValue.qty += valuesCountObjects[idx].qty; } return reducedValue; };
Define a finalize function with two arguments
key
andreducedValue
. The function modifies thereducedValue
object to add a computed field namedaverage
and returns the modified object:var finalizeFunction2 = function (key, reducedValue) {
reducedValue.average = reducedValue.qty/reducedValue.count; return reducedValue; };
Perform the map-reduce operation on the
orders
collection using themapFunction2
,reduceFunction2
, andfinalizeFunction2
functions.db.orders.mapReduce( mapFunction2,
reduceFunction2, { out: { merge: "map_reduce_example" }, query: { ord_date: { $gt: new Date('01/01/2012') } }, finalize: finalizeFunction2 } )
This operation uses the query
field to select only those documents with
ord_date
greater than new Date(01/01/2012)
. Then it output the results to
a collection map_reduce_example
. If the map_reduce_example
collection
already exists, the operation will merge the existing contents with the
results of this map-reduce operation:
增量式¶
If the map-reduce dataset is constantly growing, then rather than performing the map-reduce operation over the entire dataset each time you want to run map-reduce, you may want to perform an incremental map-reduce.
To perform incremental map-reduce:
- Run a map-reduce job over the current collection and output the result to a separate collection.
- When you have more data to process, run subsequent map-reduce job with:
- the
query
parameter that specifies conditions that match only the new documents. - the
out
parameter that specifies thereduce
action to merge the new results into the existing output collection.
- the
Consider the following example where you schedule a map-reduce operation on a
sessions
collection to run at the end of each day.
数据设置¶
The sessions
collection contains documents that log users' session each day,
for example:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
初始化当前集合的映射化简¶
Run the first map-reduce operation as follows:
Define the
map
function that maps theuserid
to an object that contains the fieldsuserid
,total_time
,count
, andavg_time
:var mapFunction = function() {
var key = this.userid; var value = { userid: this.userid, total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); };
Define the corresponding
reduce
function with two argumentskey
andvalues
to calculate the total time and the count. Thekey
corresponds to theuserid
, and thevalues
is an array whose elements corresponds to the individual objects mapped to theuserid
in themapFunction
.var reduceFunction = function(key, values) {
var reducedObject = { userid: key, total_time: 0, count:0, avg_time:0 }; values.forEach( function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; } ); return reducedObject; };
Define
finalize
function with two argumentskey
andreducedValue
. The function modifies thereducedValue
document to add another fieldaverage
and returns the modified document.var finalizeFunction = function (key, reducedValue) {
if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; };
Perform map-reduce on the
session
collection using themapFunction
, thereduceFunction
, and thefinalizeFunction
functions. Output the results to a collectionsession_stat
. If thesession_stat
collection already exists, the operation will replace the contents:db.sessions.mapReduce( mapFunction,
reduceFunction, { out: { reduce: "session_stat" }, finalize: finalizeFunction } )
后续的增量 Map-Reduce¶
Later as the sessions
collection grows, you can run additional map-reduce
operations. For example, add new documents to the sessions
collection:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
At the end of the day, perform incremental map-reduce on the sessions
collection but use the query
field to select only the new documents. Output
the results to the collection session_stat
, but reduce
the contents with
the results of the incremental map-reduce:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
out: { reduce: "session_stat" },
finalize: finalizeFunction
}
);
临时集合¶
The map-reduce operation uses a temporary collection during processing. At completion, the map-reduce operation renames the temporary collection. As a result, you can perform a map-reduce operation periodically with the same target collection name without affecting the intermediate states. Use this mode when generating statistical output collections on a regular basis.
并发¶
The map-reduce operation is composed of many tasks, including:
- reads from the input collection,
- executions of the
map
function, - executions of the
reduce
function, - writes to the output collection.
These various tasks take the following locks:
The read phase takes a read lock. It yields every 100 documents.
The JavaScript code (i.e.
map
,reduce
,finalize
functions) is executed in a single thread, taking a JavaScript lock; however, most JavaScript tasks in map-reduce are very short and yield the lock frequently.The insert into the temporary collection takes a write lock for a single write.
If the output collection does not exist, the creation of the output collection takes a write lock.
If the output collection exists, then the output actions (i.e. merge
,
replace
, reduce
) take a write lock.
Although single-threaded, the map-reduce tasks interleave and appear to run in parallel.
注解
The final write lock during post-processing makes the results appear
atomically. However, output actions merge
and reduce
may take minutes to
process. For the merge
and reduce
, the nonAtomic
flag is available. See
the [db.collection.mapReduce()
](db.collection.mapReduce.html#db.collection.m
apReduce) reference for more information.
片式集群¶
片式输入¶
When using sharded collection as the input for a map-reduce operation,
[mongos
](mongos.html#mongos) will automatically dispatch the map-reduce job
to each shard in parallel. There is no special option required.
[mongos
](mongos.html#mongos) will wait for jobs on all shards to finish.
片式输出¶
By default the output collection is not sharded. The process is:
[mongos
](mongos.html#mongos) dispatches a map-reduce finish job to the shard that will store the target collection.The target shard pulls results from all other shards, and runs a final reduce/finalize operation, and write to the output.
If using the
sharded
option to theout
parameter, MongoDB shards the output using_id
field as the shard key.
在 2.2 版更改.
If the output collection does not exist, MongoDB creates and shards the collection on the
_id
field. If the collection is empty, MongoDB creates [chunks](glossary.html#term-chunk) using the result of the first stage of the map-reduce operation.[mongos
](mongos.html#mongos) dispatches, in parallel, a map-reduce finish job to every shard that owns a chunk.Each shard will pull the results it owns from all other shards, run a final reduce/finalize, and write to the output collection.
注解
- During later map-reduce jobs, MongoDB splits chunks as needed.
- Balancing of chunks for the output collection is automatically prevented during post-processing to avoid concurrency issues.
In MongoDB 2.0:
[mongos
](mongos.html#mongos) retrieves the results from each shard, and performs merge sort to order the results, and performs a reduce/finalize as needed.[mongos
](mongos.html#mongos) then writes the result to the output collection in sharded mode.- This model requires only a small amount of memory, even for large datasets.
- Shard chunks are not automatically split during insertion. This requires manual intervention until the chunks are granular and balanced.
警告
For best results, only use the sharded output options for
[mapReduce
](mapReduce.html#mapReduce) in version 2.2 or later.
映射化简操作故障排除¶
You can troubleshoot the map
function and the reduce
function in the
[mongo
](mongo-1.html#mongo) shell.
映射功能故障排除¶
You can verify the key
and value
pairs emitted by the map
function by
writing your own emit
function.
Consider a collection orders
that contains documents of the following
prototype:
{
_id: ObjectId("50a8240b927d5d8b5891743c"),
cust_id: "abc123",
ord_date: new Date("Oct 04, 2012"),
status: 'A',
price: 250,
items: [ { sku: "mmm", qty: 5, price: 2.5 },
{ sku: "nnn", qty: 5, price: 2.5 } ]
}
Define the
map
function that maps theprice
to thecust_id
for each document and emits thecust_id
andprice
pair:var map = function() { emit(this.cust_id, this.price); };
Define the
emit
function to print the key and value:var emit = function(key, value) { print("emit"); print("key: " + key + " value: " + tojson(value)); }
Invoke the
map
function with a single document from theorders
collection:var myDoc = db.orders.findOne( { _id: ObjectId("50a8240b927d5d8b5891743c") } ); map.apply(myDoc);
Verify the key and value pair is as you expected.
emit key: abc123 value:250
Invoke the
map
function with multiple documents from theorders
collection:var myCursor = db.orders.find( { cust_id: "abc123" } );
while (myCursor.hasNext()) { var doc = myCursor.next(); print ("document _id= " + tojson(doc._id)); map.apply(doc); print(); }
- Verify the key and value pairs are as you expected.
化简功能故障排除¶
确认输出类型¶
You can test that the reduce
function returns a value that is the same type
as the value emitted from the map
function.
Define a
reduceFunction1
function that takes the argumentskeyCustId
andvaluesPrices
.valuesPrices
is an array of integers:var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices); };
Define a sample array of integers:
var myTestValues = [ 5, 5, 10 ];
Invoke the
reduceFunction1
withmyTestValues
:reduceFunction1('myKey', myTestValues);
Verify the
reduceFunction1
returned an integer:20
Define a
reduceFunction2
function that takes the argumentskeySKU
andvaluesCountObjects
.valuesCountObjects
is an array of documents that contain two fieldscount
andqty
:var reduceFunction2 = function(keySKU, valuesCountObjects) {
reducedValue = { count: 0, qty: 0 }; for (var idx = 0; idx < valuesCountObjects.length; idx++) { reducedValue.count += valuesCountObjects[idx].count; reducedValue.qty += valuesCountObjects[idx].qty; } return reducedValue; };
Define a sample array of documents:
var myTestObjects = [
{ count: 1, qty: 5 }, { count: 2, qty: 10 }, { count: 3, qty: 15 } ];
Invoke the
reduceFunction2
withmyTestObjects
:reduceFunction2('myKey', myTestObjects);
Verify the
reduceFunction2
returned a document with exactly thecount
and theqty
field:{ "count" : 6, "qty" : 30 }
确保映射值的顺序不敏感¶
The reduce
function takes a key
and a values
array as its argument. You
can test that the result of the reduce
function does not depend on the order
of the elements in the values
array.
Define a sample
values1
array and a samplevalues2
array that only differ in the order of the array elements:var values1 = [
{ count: 1, qty: 5 }, { count: 2, qty: 10 }, { count: 3, qty: 15 } ];
var values2 = [
{ count: 3, qty: 15 }, { count: 1, qty: 5 }, { count: 2, qty: 10 } ];
Define a
reduceFunction2
function that takes the argumentskeySKU
andvaluesCountObjects
.valuesCountObjects
is an array of documents that contain two fieldscount
andqty
:var reduceFunction2 = function(keySKU, valuesCountObjects) {
reducedValue = { count: 0, qty: 0 }; for (var idx = 0; idx < valuesCountObjects.length; idx++) { reducedValue.count += valuesCountObjects[idx].count; reducedValue.qty += valuesCountObjects[idx].qty; } return reducedValue; };
Invoke the
reduceFunction2
first withvalues1
and then withvalues2
:reduceFunction2('myKey', values1); reduceFunction2('myKey', values2);
Verify the
reduceFunction2
returned the same result:{ "count" : 6, "qty" : 30 }
确保化简功能Idempotentcy¶
Because the map-reduce operation may call a reduce
multiple times for the
same key, the reduce
function must return a value of the same type as the
value emitted from the map
function. You can test that the reduce
function
process "reduced" values without affecting the final value.
Define a
reduceFunction2
function that takes the argumentskeySKU
andvaluesCountObjects
.valuesCountObjects
is an array of documents that contain two fieldscount
andqty
:var reduceFunction2 = function(keySKU, valuesCountObjects) {
reducedValue = { count: 0, qty: 0 }; for (var idx = 0; idx < valuesCountObjects.length; idx++) { reducedValue.count += valuesCountObjects[idx].count; reducedValue.qty += valuesCountObjects[idx].qty; } return reducedValue; };
Define a sample key:
var myKey = 'myKey';
Define a sample
valuesIdempotent
array that contains an element that is a call to thereduceFunction2
function:var valuesIdempotent = [
{ count: 1, qty: 5 }, { count: 2, qty: 10 }, reduceFunction2(myKey, [ { count:3, qty: 15 } ] ) ];
Define a sample
values1
array that combines the values passed toreduceFunction2
:var values1 = [
{ count: 1, qty: 5 }, { count: 2, qty: 10 }, { count: 3, qty: 15 } ];
Invoke the
reduceFunction2
first withmyKey
andvaluesIdempotent
and then withmyKey
andvalues1
:reduceFunction2(myKey, valuesIdempotent); reduceFunction2(myKey, values1);
Verify the
reduceFunction2
returned the same result:{ "count" : 6, "qty" : 30 }