42
votes

I have a long history with relational databases, but I'm new to MongoDB and MapReduce, so I'm almost positive I must be doing something wrong. I'll jump right into the question. Sorry if it's long.

I have a database table in MySQL that tracks the number of member profile views for each day. For testing it has 10,000,000 rows.

CREATE TABLE `profile_views` (
  `id` int(10) unsigned NOT NULL auto_increment,
  `username` varchar(20) NOT NULL,
  `day` date NOT NULL,
  `views` int(10) unsigned default '0',
  PRIMARY KEY  (`id`),
  UNIQUE KEY `username` (`username`,`day`),
  KEY `day` (`day`)
) ENGINE=InnoDB;

Typical data might look like this.

+--------+----------+------------+------+
| id     | username | day        | hits |
+--------+----------+------------+------+
| 650001 | Joe      | 2010-07-10 |    1 |
| 650002 | Jane     | 2010-07-10 |    2 |
| 650003 | Jack     | 2010-07-10 |    3 |
| 650004 | Jerry    | 2010-07-10 |    4 |
+--------+----------+------------+------+

I use this query to get the top 5 most viewed profiles since 2010-07-16.

SELECT username, SUM(hits)
FROM profile_views
WHERE day > '2010-07-16'
GROUP BY username
ORDER BY hits DESC
LIMIT 5\G

This query completes in under a minute. Not bad!

Now moving onto the world of MongoDB. I setup a sharded environment using 3 servers. Servers M, S1, and S2. I used the following commands to set the rig up (Note: I've obscured the IP addys).

S1 => 127.20.90.1
./mongod --fork --shardsvr --port 10000 --dbpath=/data/db --logpath=/data/log

S2 => 127.20.90.7
./mongod --fork --shardsvr --port 10000 --dbpath=/data/db --logpath=/data/log

M => 127.20.4.1
./mongod --fork --configsvr --dbpath=/data/db --logpath=/data/log
./mongos --fork --configdb 127.20.4.1 --chunkSize 1 --logpath=/data/slog

Once those were up and running, I hopped on server M, and launched mongo. I issued the following commands:

use admin
db.runCommand( { addshard : "127.20.90.1:10000", name: "M1" } );
db.runCommand( { addshard : "127.20.90.7:10000", name: "M2" } );
db.runCommand( { enablesharding : "profiles" } );
db.runCommand( { shardcollection : "profiles.views", key : {day : 1} } );
use profiles
db.views.ensureIndex({ hits: -1 });

I then imported the same 10,000,000 rows from MySQL, which gave me documents that look like this:

{
    "_id" : ObjectId("4cb8fc285582125055295600"),
    "username" : "Joe",
    "day" : "Fri May 21 2010 00:00:00 GMT-0400 (EDT)",
    "hits" : 16
}

Now comes the real meat and potatoes here... My map and reduce functions. Back on server M in the shell I setup the query and execute it like this.

use profiles;
var start = new Date(2010, 7, 16);
var map = function() {
    emit(this.username, this.hits);
}
var reduce = function(key, values) {
    var sum = 0;
    for(var i in values) sum += values[i];
    return sum;
}
res = db.views.mapReduce(
    map,
    reduce,
    {
        query : { day: { $gt: start }}
    }
);

And here's were I run into problems. This query took over 15 minutes to complete! The MySQL query took under a minute. Here's the output:

{
        "result" : "tmp.mr.mapreduce_1287207199_6",
        "shardCounts" : {
                "127.20.90.7:10000" : {
                        "input" : 4917653,
                        "emit" : 4917653,
                        "output" : 1105648
                },
                "127.20.90.1:10000" : {
                        "input" : 5082347,
                        "emit" : 5082347,
                        "output" : 1150547
                }
        },
        "counts" : {
                "emit" : NumberLong(10000000),
                "input" : NumberLong(10000000),
                "output" : NumberLong(2256195)
        },
        "ok" : 1,
        "timeMillis" : 811207,
        "timing" : {
                "shards" : 651467,
                "final" : 159740
        },
}

Not only did it take forever to run, but the results don't even seem to be correct.

db[res.result].find().sort({ hits: -1 }).limit(5);
{ "_id" : "Joe", "value" : 128 }
{ "_id" : "Jane", "value" : 2 }
{ "_id" : "Jerry", "value" : 2 }
{ "_id" : "Jack", "value" : 2 }
{ "_id" : "Jessy", "value" : 3 }

I know those value numbers should be much higher.

My understanding of the whole MapReduce paradigm is the task of performing this query should be split between all shard members, which should increase performance. I waited till Mongo was done distributing the documents between the two shard servers after the import. Each had almost exactly 5,000,000 documents when I started this query.

So I must be doing something wrong. Can anyone give me any pointers?

Edit: Someone on IRC mentioned adding an index on the day field, but as far as I can tell that was done automatically by MongoDB.

4
Gah.. Just realized one reason why the results are incorrect. I should have been sorting on "value" rather than "hits".mellowsoon
One problem is that when you import your data into Mongo, the 'day' value is a giant string, but in mysql, it is a date (integer). When you put your data into mongo, make sure to store it as a Date type.Clint
you might also separate date and time field, and store the date as string "20110101" or integer 20110101 and index based on dateBahadir Cambel

4 Answers

53
votes

excerpts from MongoDB Definitive Guide from O'Reilly:

The price of using MapReduce is speed: group is not particularly speedy, but MapReduce is slower and is not supposed to be used in “real time.” You run MapReduce as a background job, it creates a collection of results, and then you can query that collection in real time.

options for map/reduce:

"keeptemp" : boolean 
If the temporary result collection should be saved when the connection is closed. 

"output" : string 
Name for the output collection. Setting this option implies keeptemp : true. 
29
votes

Maybe I'm too late, but...

First, you are querying the collection to fill the MapReduce without an index. You shoud create an index on "day".

MongoDB MapReduce is single threaded on a single server, but parallelizes on shards. The data in mongo shards are kept together in contiguous chunks sorted by sharding key.

As your sharding key is "day", and you are querying on it, you probably are only using one of your three servers. Sharding key is only used to spread the data. Map Reduce will query using the "day" index on each shard, and will be very fast.

Add something in front of the day key to spread the data. The username can be a good choice.

That way the Map reduce will be launched on all servers and hopefully reducing the time by three.

Something like this:

use admin
db.runCommand( { addshard : "127.20.90.1:10000", name: "M1" } );
db.runCommand( { addshard : "127.20.90.7:10000", name: "M2" } );
db.runCommand( { enablesharding : "profiles" } );
db.runCommand( { shardcollection : "profiles.views", key : {username : 1,day: 1} } );
use profiles
db.views.ensureIndex({ hits: -1 });
db.views.ensureIndex({ day: -1 });

I think with those additions, you can match MySQL speed, even faster.

Also, better don't use it real time. If your data don't need to be "minutely" precise, shedule a map reduce task every now an then and use the result collection.

6
votes

You are not doing anything wrong. (Besides sorting on the wrong value as you already noticed in your comments.)

MongoDB map/reduce performance just isn't that great. This is a known issue; see for example http://jira.mongodb.org/browse/SERVER-1197 where a naive approach is ~350x faster than M/R.

One advantage though is that you can specify a permanent output collection name with the out argument of the mapReduce call. Once the M/R is completed the temporary collection will be renamed to the permanent name atomically. That way you can schedule your statistics updates and query the M/R output collection real-time.

0
votes

Have you already tried using hadoop connector for mongodb?

Look at this link here: http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/

Since you are using only 3 shards, I don't know whether this approach would improve your case.