0
votes

What I'm trying to achieve here is to insert several documents into a given collection for several databases.

The database is different depending on a given attribute within the object, so each of them should choose from a small cache Map with (key, MongoCollection [pointing to its respective DB, not always the same]) the one they need and .insertOne with the Document.

The problem I'm finding with this is that MongoCollectionImpl seems to be not serializable when performing the following action (simplified, but which replicates the error)...

.map(
  MongoClient(s"mongodb://localhost:27017")
    .getDatabase("sample")
    .getCollection("sample_collection")
    .insertOne)

I know that there is a connector specifically made for Spark within MongoDB, but I can't make my mind up how to use more than one DB, as I would need several WriteConfig that should be applied conditionally to each of the elements, so they aren't inserted in another DB they shouldn't be in.

https://docs.mongodb.com/spark-connector/master/scala/write-to-mongodb/

Is there any workaround related to this?

1

1 Answers

1
votes

Just use the MongoSpark connector and partition / split your original dataframe based on a condition.

myDataFrame.cache() // recommended to prevent repeating the calculation

val df1 = myDataFrame.filter('myColumn < 0)
val df2 = myDataFrame.filter('myColumn >= 0 && 'myColumn < 50)
val df3 = myDataFrame.filter('myColumn >= 50 )

MongoSpark.save(df1.write.mode(SaveMode.Overwrite), WriteConfig(databaseName = ???, collectionName = ???, connectionString = Some(???))

MongoSpark.save(df2.write.mode(SaveMode.Overwrite), WriteConfig(databaseName = ???, collectionName = ???, connectionString = Some(???))

MongoSpark.save(df3.write.mode(SaveMode.Overwrite), WriteConfig(databaseName = ???, collectionName = ???, connectionString = Some(???))