As far as I understood your problem:
* either Spark incorrectly detected your schema and considered some fields as required (nullable = false
) - in such case, you can still define it explicitly and set nullable
to true. It would work if your schema was evolving and in some time in the past you added or removed a field but still keeping column type (e.g. String will be always a String and not a Struct or other completely different type)
* or your schemas are completely inconsistent, i.e. your String field transformed at some time to a Struct or other completely different type. In such case I don't see other solution than use RDD
abstraction and work with very permissive types as Any
in Scala (Object
in Java) and using isInstanceOf
tests to normalize all fields into a 1 common format
Actually I see also another possible solution, but only if you know what data has which schema. For instance, if you know that for data between 2018-01-01 and 2018-02-01 you use schema#1 and for the others schema#2, you can write a pipeline that will transform schema#1 to schema#2. Later you could simply union
both datasets and apply your transformations on consistently structured values.
Edit:
I've just tried similar code you give and it worked correctly on my local MongoDB instance:
val sc = getSparkContext(Array("mongodb://localhost:27017/test.init_data"))
// Load sample data
import com.mongodb.spark._
val docFees =
"""
| {"fees": null}
| {"fees": { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ]} }
""".stripMargin.trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docFees.map(Document.parse)))
val rdd = MongoSpark.load(sc)
rdd.saveToMongoDB(WriteConfig(Map("uri"->"mongodb://localhost:27017/test.new_coll_data", "replaceDocument"->"true")))
And when I checked the result in MongoDB shell I got:
> coll = db.init_data;
test.init_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }
> coll = db.new_coll_data;
test.new_coll_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }