5
votes

I'm moving data from one collection to another in other cluster using Spark. the data's schema is not consistent(I mean that has few schema's in a single collection with different data types with little variations). When I try to read data from spark, the sampling is unable to get all the schema's of the data and throwing the below error.(I have a complex schema which I can't explicitly mention instead of spark gets by sampling.)

com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast ARRAY into a NullType (value: BsonArray{values=[{ "type" : "GUEST_FEE", "appliesPer" : "GUEST_PER_NIGHT", "description" : null, "minAmount" : 33, "maxAmount" : 33 }]})

I tried reading the collection as an RDD and write as an RDD still the issue persists.

Any help on this.!

Thanks.

2

2 Answers

6
votes

All these com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast SOME_TYPE into a NullType come from incorrect schema inference. For schema-less data sources such as JSON file or mongodb, Spark does a scan of small fraction of the data to determine the types. If some particular field has lots of NULL's you can get unlucky and type will be set as NullType.

One thing you can do is increase the number of entries scanned for schema inference.

Another - get the inferred schema first, fix it, and reload dataframe with fixed schema:

def fix_spark_schema(schema):
  if schema.__class__ == pyspark.sql.types.StructType:
    return pyspark.sql.types.StructType([fix_spark_schema(f) for f in schema.fields])
  if schema.__class__ == pyspark.sql.types.StructField:
    return pyspark.sql.types.StructField(schema.name, fix_spark_schema(schema.dataType), schema.nullable)
  if schema.__class__ == pyspark.sql.types.NullType:
    return pyspark.sql.types.StringType()
  return schema

collection_schema = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load() \
    .schema

collection = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load(schema=fix_spark_schema(collection_schema))

In my case all problematic fields could be represented with StringType, you might make the logic more complex if needed.

0
votes

As far as I understood your problem: * either Spark incorrectly detected your schema and considered some fields as required (nullable = false) - in such case, you can still define it explicitly and set nullable to true. It would work if your schema was evolving and in some time in the past you added or removed a field but still keeping column type (e.g. String will be always a String and not a Struct or other completely different type) * or your schemas are completely inconsistent, i.e. your String field transformed at some time to a Struct or other completely different type. In such case I don't see other solution than use RDD abstraction and work with very permissive types as Any in Scala (Object in Java) and using isInstanceOf tests to normalize all fields into a 1 common format

Actually I see also another possible solution, but only if you know what data has which schema. For instance, if you know that for data between 2018-01-01 and 2018-02-01 you use schema#1 and for the others schema#2, you can write a pipeline that will transform schema#1 to schema#2. Later you could simply union both datasets and apply your transformations on consistently structured values.


Edit:

I've just tried similar code you give and it worked correctly on my local MongoDB instance:

val sc = getSparkContext(Array("mongodb://localhost:27017/test.init_data")) 

// Load sample data
import com.mongodb.spark._

val docFees =
  """
    | {"fees": null}
    | {"fees": { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ]} }
  """.stripMargin.trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docFees.map(Document.parse)))

val rdd = MongoSpark.load(sc)
rdd.saveToMongoDB(WriteConfig(Map("uri"->"mongodb://localhost:27017/test.new_coll_data", "replaceDocument"->"true")))

And when I checked the result in MongoDB shell I got:

> coll = db.init_data; 
test.init_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }
> coll = db.new_coll_data;
test.new_coll_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }