1
votes

I am wondering if $group function in aggregation framework of MongoDB 2.2 is multiple threaded.

For this question I did some small tests. The data set I used is used to store about 4 million emails and each email has a format as below:

shard1:PRIMARY> db.spams.findOne()
{
"IP" : "113.162.134.245",
"_id" : ObjectId("4ebe8c84466e8b1a56000028"),
"attach" : [ ],
"bot" : "Lethic",
"charset" : "iso-8859-1",
"city" : "",
"classA" : "113",
"classB" : "113.162",
"classC" : "113.162.134",
"content_type" : [ ],
"country" : "Vietnam",
"cte" : "7bit",
"date" : ISODate("2011-11-11T00:07:12Z"),
"day" : "2011-11-11",
"from_domain_a" : "domain157939.com",
"geo" : "VN",
"host" : "",
"lang" : "unknown",
"lat" : 16,
"long" : 106,
"sequenceID" : "user648",
"size" : 1060,
"smtp-mail-from_a" : "[email protected]",
"smtp-rcpt-to_a" : "[email protected]",
"subject_ta" : "nxsy8",
"uri" : [ ],
"uri_domain" : [ ],
"x_p0f_detail" : "2000 SP4, XP SP1+",
"x_p0f_genre" : "Windows",
"x_p0f_signature" : "65535:105:1:48:M1402,N,N,S:."
}

I designed a query to look for all emails within one day, one week, one month, half a year and one year. Then group the result by "bot" field.

I use aggregation framework and java drive to do it. The Java code is as below:

public class RangeQuery {
final private String mongoUrl = "172.16.10.61:30000";
final private String databaseName = "test";
final private String collecName = "spams";
private DBCollection collection = null;
private DB db = null;

    public void init(){
    Mongo mongo = null;
    try {
        mongo = new Mongo(new DBAddress(mongoUrl));
    } catch (MongoException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (UnknownHostException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    db = mongo.getDB(databaseName);
    db.requestStart();
    collection = db.getCollection(collecName);
}

    public void queryRange_GroupBot(boolean printResult){
    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'");
    String toDateStr [] = new String[5] ;
    toDateStr[0] = "2011-01-02T00:00:00Z";
    toDateStr[1] = "2011-01-07T00:00:00Z";
    toDateStr[2] = "2011-02-01T00:00:00Z";
    toDateStr[3] = "2011-06-01T00:00:00Z";
    toDateStr[4] = "2012-01-01T00:00:00Z";

    String toPrint [] = new String[5];
    toPrint[0] = "Within One day";
    toPrint[1] = "Within One week";
    toPrint[2] = "Within One month";
    toPrint[3] = "Within half year";
    toPrint[4] = "Within One year";

    try {
        System.out.println("\n------Query Time Range Group by Bot------");
        for(int i = 0;i < 5;i++){
            System.out.println("    ---" + toPrint[i] + "---");
            Date fromDate = formatter.parse("2011-01-01T00:00:00Z");
            Date toDate = formatter.parse(toDateStr[i]);

            DBObject groupFields = new BasicDBObject( "_id", "$bot");
            groupFields.put("sum", new BasicDBObject( "$sum", 1));
            DBObject group = new BasicDBObject("$group", groupFields);

            DBObject cond1 = new BasicDBObject();
            cond1.put("date", new BasicDBObject("$gte", fromDate));
            DBObject cond2 = new BasicDBObject();
            cond2.put("date", new BasicDBObject("$lte", toDate));
            DBObject match1 = new BasicDBObject("$match", cond1 );
            DBObject match2 = new BasicDBObject("$match", cond2 );

            for(int j = 0;j < 1;j++){
                Long runBefore = Calendar.getInstance().getTime().getTime();
                AggregationOutput aggOutput = collection.aggregate(match1, match2, group);
                Long runAfter = Calendar.getInstance().getTime().getTime();
                if(printResult){
                    System.out.println(aggOutput.getCommandResult());
                }
                System.out.println("[Query Range + Group by Bot]: " + (runAfter - runBefore) + " ms.");
            }
        }
    } catch (ParseException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

    public static void main(String[] args){
    RangeQuery rangQuery = new RangeQuery();
    rangQuery.init();
    rangQuery.queryRange_GroupBot_MapReduce(true);
  }
  }

The result looks as this:

  Within One day(2011-01-01 -> 2011-01-02)      54173 ms
  Within One week(2011-01-01 -> 2011-01-07)     54277 ms
  Within One month(2011-01-01 -> 2011-02-01)    54387 ms
  Within half year(2011-01-01 -> 2011-06-01)    53035 ms
  Within One year(2011-01-01 -> 2012-01-01)     54116 ms

What surprise me is that normally the group over one year should be slower than one day, since it contains more records. (the records in data set is uniform distributed with time)

If I just use db.spams.find({"date":{$gt:ISODate(xxx), {$lt: xxx}}}).count, I can see that querying a year costs longer than querying a day.

But why when I use $group, this function takes nearly the same time when I enlarge the time range?

I know aggregation framework is in C++, I use mongodb 2.2, have aggregation framework used multiple threads or some other methods to improve the performance?

1

1 Answers

2
votes

According to this discussion: https://groups.google.com/forum/?fromgroups=#!topic/mongodb-user/xCSww5spXPc

Each pipeline is currently single threaded, but you can run different pipelines in parallel. So if you have 100 connections each running an aggregation command, those will potentially run in parallel - but each command will run on 1 thread.