0
votes

After calculating the distance matrix related to a set of points stored in a file on HDFS, I need to store the calculated distance matrix which is in a distributed form (CoordinateMatrix/RowMatrix), in MongoDB through MongoDB Connector for Apache Spark. Is there a recommended way to do this or even a better connector for such an operation ?

Here is the part of my code:

val data = sc.textFile("hdfs://localhost:54310/usrp/copy_sample_data.txt")
val points = data.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val indexed = points.zipWithIndex()
val indexedData = indexed.map{case (value, index) => (index, value)}
val pairedSamples = indexedData.cartesian(indexedData)
val dist = pairedSamples.map{case (x,y) => ((x,y),distance(x._2,y._2))}.map{case ((x,y),z) => (((x,y),z,covariance(z)))}    
val entries: RDD[MatrixEntry] = dist.map{case (((x,y),z,cov)) => MatrixEntry(x._1, y._1, cov)}
val coomat: CoordinateMatrix = new CoordinateMatrix(entries)        

To further note, I have created this matrix in Spark from a RDD. So maybe it is even better/possible to save data from RDD to Mongodb ?

1

1 Answers

1
votes

CoordinateMatrix and RowMatrix are basically wrappers around RDD[MatrixEntry] and RDD[Vector] respectively and both can be relatively saved to MongoDB. For coordinate matrix:

val spark: SparkSession = ???
import spark.implicits._

// For 1.x
// val sqlContext: SQLContext = ???
// import sqlContext.implicits._

val options = Map(
   "uri" -> ???
   "database" -> ???
)

val coordMat = new CoordinateMatrix(sc.parallelize(Seq(
  MatrixEntry(1, 3, 1.4), MatrixEntry(3, 6, 2.8))
))

coordMat.entries.toDF().write
  .options(options)
  .option("collection", "coordinates")    
  .format("com.mongodb.spark.sql")
  .save()

you'll get documents of shape:

{'_id': ObjectId('...'), 'i': 3, 'j': 6, 'value': 2.8}

which can be easily casted back to the original form:

val entries = spark.read
  .options(options)
  .option("collection", "coordinates")    
  .format("com.mongodb.spark.sql")
  .load()
  .drop("_id")  
  .schema(...)
  .as[MatrixEntry]

new CoordinateMatrix(entries.rdd)

Pretty much the same thing can be done for RowMatrix but you'll need a little bit more work (represent Vectors either as dense arrays or sparse tuple (size, indices, values)).

Unfortunately in both cases (CoordinateMatrix, RowMatrix) you'll loose information about matrix shape.