I am now experimenting the Spark and Mongodb, which uses mongodb-hadoop connector to bridge the spark and mongodb communication. Here is a example of https://github.com/plaa/mongo-spark, this example works well for me.
Then based on this example, I used a bigger dataset from https://github.com/10gen-interns/big-data-exploration, which has a 6 millions records of flight data. What I want to do is to query the mongodb dataset and then do some further processing.
The schema for the flights data is in https://gist.github.com/sweetieSong/6016700
see data example:
{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ),
"age" : 27,
"airTime" : 316,
"airlineId" : 19805,
"arrDelay" : -37,
"arrTime" : Date( 1336304580000 ),
"carrier" : "AA",
"carrierId" : "AA",
"crsArrTime" : Date( 1336306800000 ),
"crsDepTime" : Date( 1336294800000 ),
"crsElapsedTime" : 380,
"date" : Date( 1336262400000 ),
"dayOfMonth" : 6,
"dayOfWeek" : 7,
"depDelay" : -5,
"depTime" : Date( 1336294500000 ),
"destAirport" : "LAX",
"destAirportId" : 12892,
"destCity" : "Los Angeles, CA",
"destCityId" : 32575,
"destState" : "California",
"destStateId" : "CA",
"destWAC" : 91,
"distance" : 2475,
"diverted" : true,
"elapsedTime" : 348,
"flightNum" : 1,
"month" : 5,
"numDivAirportLandings" : 0,
"numFlights" : 1,
"origAirport" : "JFK",
"origAirportId" : 12478,
"origCity" : "New York, NY",
"origCityId" : 31703,
"origState" : "New York",
"origStateId" : "NY",
"origWAC" : 22,
"quarter" : 2,
"tailNum" : "N323AA",
"taxiIn" : 19,
"taxiOut" : 13,
"wheelsOff" : Date( 1336295280000 ),
"wheelsOn" : Date( 1336303440000 ),
"year" : 2012 }
My scala code is
val sc = new SparkContext("local", "Scala Word Count")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights")
config.set("mongo.input.query","{destAirport: 'LAX'}");
//config.set("mongo.input.query","{_id.destAirport: 'LAX'}");
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
println("We're running scala..count ", mongoRDD.count())
For the testing purpose, I just want to first get all the records from destAirport 'LAX', I don't know how query is like so I tried two different format of queries, "{destAirport: 'LAX'}" and "{_id.destAirport: 'LAX'}"
When running the application, the console outputs such info
INFO MongoCollectionSplitter: Created split: min={ "_id" : { "$oid" : "51bf29d8ca69141e42097d7f"}}, max= { "_id" : { "$oid" : "51bf29dfca69141e420991ad"}}
14/08/05 10:30:51 INFO Executor: Running task ID 751
14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192)
14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750)
14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally
14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from:
14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597
14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver
14/08/05 10:30:51 INFO Executor: Finished task ID 751
No matter what the query is (even don't set the query), the spark always execute 1191 Tasks. Each task will output similar words. and mongoRDD.count() always output 0.
My first question is what is the right query?
Moreover previously I thought what mongodb-hadoop does, is that mongodb firstly query all the collection, and then send the results back to spark for processing. but now it seems to me, mongodb will split the collection into many, and then query that small part of collection, and then send the results of that part to spark. Is it ?