6
votes

I am now experimenting the Spark and Mongodb, which uses mongodb-hadoop connector to bridge the spark and mongodb communication. Here is a example of https://github.com/plaa/mongo-spark, this example works well for me.

Then based on this example, I used a bigger dataset from https://github.com/10gen-interns/big-data-exploration, which has a 6 millions records of flight data. What I want to do is to query the mongodb dataset and then do some further processing.

The schema for the flights data is in https://gist.github.com/sweetieSong/6016700

see data example:

{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ),
  "age" : 27,
  "airTime" : 316,
  "airlineId" : 19805,
  "arrDelay" : -37,
  "arrTime" : Date( 1336304580000 ),
  "carrier" : "AA",
  "carrierId" : "AA",
  "crsArrTime" : Date( 1336306800000 ),
  "crsDepTime" : Date( 1336294800000 ),
  "crsElapsedTime" : 380,
  "date" : Date( 1336262400000 ),
  "dayOfMonth" : 6,
  "dayOfWeek" : 7,
  "depDelay" : -5,
  "depTime" : Date( 1336294500000 ),
  "destAirport" : "LAX",
  "destAirportId" : 12892,
  "destCity" : "Los Angeles, CA",
  "destCityId" : 32575,
  "destState" : "California",
  "destStateId" : "CA",
  "destWAC" : 91,
  "distance" : 2475,
  "diverted" : true,
  "elapsedTime" : 348,
  "flightNum" : 1,
  "month" : 5,
  "numDivAirportLandings" : 0,
  "numFlights" : 1,
  "origAirport" : "JFK",
  "origAirportId" : 12478,
  "origCity" : "New York, NY",
  "origCityId" : 31703,
  "origState" : "New York",
  "origStateId" : "NY",
  "origWAC" : 22,
  "quarter" : 2,
  "tailNum" : "N323AA",
  "taxiIn" : 19,
  "taxiOut" : 13,
  "wheelsOff" : Date( 1336295280000 ),
  "wheelsOn" : Date( 1336303440000 ),
  "year" : 2012 }

My scala code is

val sc = new SparkContext("local", "Scala Word Count")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights")
config.set("mongo.input.query","{destAirport: 'LAX'}");
//config.set("mongo.input.query","{_id.destAirport: 'LAX'}");

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

println("We're running scala..count ", mongoRDD.count())

For the testing purpose, I just want to first get all the records from destAirport 'LAX', I don't know how query is like so I tried two different format of queries, "{destAirport: 'LAX'}" and "{_id.destAirport: 'LAX'}"

When running the application, the console outputs such info

INFO MongoCollectionSplitter: Created split: min={ "_id" : { "$oid" : "51bf29d8ca69141e42097d7f"}}, max= { "_id" : { "$oid" : "51bf29dfca69141e420991ad"}}

14/08/05 10:30:51 INFO Executor: Running task ID 751
14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192)
14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750)
14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally
14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from:
14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597
14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver
14/08/05 10:30:51 INFO Executor: Finished task ID 751

No matter what the query is (even don't set the query), the spark always execute 1191 Tasks. Each task will output similar words. and mongoRDD.count() always output 0.

My first question is what is the right query?

Moreover previously I thought what mongodb-hadoop does, is that mongodb firstly query all the collection, and then send the results back to spark for processing. but now it seems to me, mongodb will split the collection into many, and then query that small part of collection, and then send the results of that part to spark. Is it ?

1

1 Answers

6
votes

My first question is what is the right query?

I don't think there's a "right" query - you need to query based on the data you would like to process

Moreover previously I thought what mongodb-hadoop does, is that mongodb firstly query all the collection, and then send the results back to spark for processing. but now it seems to me, mongodb will split the collection into many, and then query that small part of collection, and then send the results of that part to spark. Is it ?

I encountered the same issue.

I believe that newAPIHadoopRDD, given the MongoInputSplit.class, does not account for the query when calculating the splits. It is only applied after the splits are calculated. This means that no matter how lean your query may be, the number of splits will remain the same, and will be proportional to the size of the collection.

newAPIHadoopRDD is using the StandaloneMongoSplitter. Note that this class is not using the query to calculate the split boundaries. It is just using mongo's internal "splitVector" command; from the documentation here - http://api.mongodb.org/internal/current/commands.html, it also looks like it does not account for the query.

I don't have a good solution though. A better approach would split the mongo collection only after calculating the query, but this requires another implementation of the splitter. Here's some good read about the issue: http://www.ikanow.com/how-well-does-mongodb-integrate-with-hadoop/