Inside of a spark-submit job (.JAR written in Scala), I need to access an existing MongoDB, create a new collection in the db, add an index, write data from an RDD distributed over 1,000's of executors to the collection.
I can't find one library that can do all of this. Right now, I'm using mongo-spark-connector to write from RDD, and then I use casbah to create the index.
mongo spark connector (where is scaladoc for this?)- https://docs.mongodb.com/spark-connector/current/scala-api/
casbah - http://mongodb.github.io/casbah/3.1/scaladoc/#package
The process looks like this...
- create the RDD
- write from RDD to new collection (using mongo spark connector)
- create index on collection after writing (using casbah)
Would this approach speed things up? Any ideas how to accomplish it?
- create empty collection
- create index
- build RDD and write to this collection
- use one library to do it
Here's how I go about it right now, but I suspect there's a better way.
imports
// casbah - used to create index after new collection is created
import com.mongodb.casbah.Imports.{MongoClient,MongoCollection,MongoClientURI}
// mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process)
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.{WriteConfig,ReadConfig}
import org.bson.Document
connection info
object MyConnect {
// mongodb connect
val host = "128.128.128.128"
val port = 12345
val db = "db"
val collection = "collection"
val user = "user"
val password = "password"
// casbah - to create index
val casbah_db_uri = MongoClientURI(
s"mongodb://${user}:${password}@${host}:${port}/${db}"
)
// mongodb spark connector - to write from RDD
val collection_uri = s"mongodb://${user}:${password}@${host}:${port}/${db}.${collection}"
val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri))
}
do the work
object sparkSubmit {
def main(args: Array[String]): Unit = {
// dummy dataset - RDD[(id, cnt)]
val rdd_dummy: RDD[(String, Int)] = ???
// data as Mongo docs - as per mongo spark connector
val rdd_bson: RDD[Document] = {
rdd_dummy
.map(tup => s"""{"hex":"${tup._1}", "cnt":${tup._2}}""")
.map(str => Document.parse(str))
}
// save to mongo / create new collection in process - using mongo spark connector
MongoSpark.save(rdd_bson, MyConnect.writeConfig)
// create index on new collection - using casbah
val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection)
new_table.createIndex("hex")
}
}