1
votes

Inside of a spark-submit job (.JAR written in Scala), I need to access an existing MongoDB, create a new collection in the db, add an index, write data from an RDD distributed over 1,000's of executors to the collection.

I can't find one library that can do all of this. Right now, I'm using mongo-spark-connector to write from RDD, and then I use casbah to create the index.

mongo spark connector (where is scaladoc for this?)- https://docs.mongodb.com/spark-connector/current/scala-api/

casbah - http://mongodb.github.io/casbah/3.1/scaladoc/#package

The process looks like this...

  • create the RDD
  • write from RDD to new collection (using mongo spark connector)
  • create index on collection after writing (using casbah)

Would this approach speed things up? Any ideas how to accomplish it?

  • create empty collection
  • create index
  • build RDD and write to this collection
  • use one library to do it

Here's how I go about it right now, but I suspect there's a better way.

imports

// casbah - used to create index after new collection is created 
import com.mongodb.casbah.Imports.{MongoClient,MongoCollection,MongoClientURI}

// mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process)
import com.mongodb.spark.MongoSpark 
import com.mongodb.spark.config.{WriteConfig,ReadConfig}
import org.bson.Document 

connection info

object MyConnect {
  // mongodb connect
  val host       = "128.128.128.128"
  val port       = 12345
  val db         = "db"
  val collection = "collection"
  val user       = "user"
  val password   = "password"

  // casbah - to create index 
  val casbah_db_uri = MongoClientURI(
    s"mongodb://${user}:${password}@${host}:${port}/${db}"
  )

  // mongodb spark connector - to write from RDD 
  val collection_uri = s"mongodb://${user}:${password}@${host}:${port}/${db}.${collection}"
  val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri))
}

do the work

object sparkSubmit {

  def main(args: Array[String]): Unit = {

    // dummy dataset - RDD[(id, cnt)]
    val rdd_dummy: RDD[(String, Int)] = ???

    // data as Mongo docs - as per mongo spark connector
    val rdd_bson: RDD[Document] = {
      rdd_dummy
      .map(tup => s"""{"hex":"${tup._1}", "cnt":${tup._2}}""")
      .map(str => Document.parse(str))
    }

    // save to mongo / create new collection in process - using mongo spark connector
    MongoSpark.save(rdd_bson, MyConnect.writeConfig)

    // create index on new collection - using casbah
    val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection)
    new_table.createIndex("hex")
  }
}
1

1 Answers

1
votes

Would this approach speed things up?

Generally with any databases (including MongoDB), index building operation will have a cost to it. If you create an index on an empty collection, the index building operation cost will be incurred during (per) insert operations. If you create the index after all the inserts, the index building cost will be incurred afterwards as well, which may lock the collection until the index build completes.

You can choose either depending on your use case, i.e. if you'd like to access the collection as soon as it completes create the index on an empty collection.

Note that MongoDB has two index build operations type: foreground and background. See MongoDB: Index Creation for more information.

where is scaladoc for this?

There is no scaladoc for it, however there's a javadoc: https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_2.11/2.2.1

This is because the MongoDB Spark Connector utilises the MongoDB Java driver jars underneath.

Instead of using the legacy Scala driver, Casbah, to create an index you should try to use the official MongoDB Scala driver. For example Create An Index.

collection.createIndex(ascending("i"))