5
votes

Update: spark-avro package was update to support this scenario. https://github.com/databricks/spark-avro/releases/tag/v3.1.0

I have an AVRO file that was created by a third party outside my control, which I need to process using spark. The AVRO schema is a record where one of the fields is a mixed union type:

{    
    "name" : "Properties",                              
    "type" : {                                          
    "type" : "map",                                   
    "values" : [ "long", "double", "string", "bytes" ]
}                                                   

This is unsupported with the spark-avro reader:

In addition to the types listed above, it supports reading of three types of union types: union(int, long) union(float, double) union(something, null), where something is one of the supported Avro types listed above or is one of the supported union types.

Reading about AVRO's schema evolution and resolution, I expect to be able to read the file while skipping the problematic field by specifying a different reader schema that omits this field. According to AVRO Schema Resolution docs, it should work:

if the writer's record contains a field with a name not present in the reader's record, the writer's value for that field is ignored.

So I modified using

 val df = sqlContext.read.option("avroSchema", avroSchema).avro(path)

Where avroSchema is the exact same schema, the writer used, but without the problematic field.

But still I get the same error regarding mixed union types.

Is this scenario of schema evolution supported with AVRO? with avro-spark? Is there another way to achieve my goal?


Update: I have tested the same scenario (same file actually) with Apache Avro 1.8.1 and it works as expected. Then it must be specifically with spark-avro. any ideas?

1
Not an answer but it appears that spark-avro doesn't adhere to AVRO schema evolution \ resolution rules. see: github.com/databricks/spark-avro/issues/176 github.com/databricks/spark-avro/blob/master/src/main/scala/com/…itaysk
I posted a bounty but it would great if you post what you know so far as an answer in case nobody comes up with a solution.user6022341
What I found was that it cannot be done using spark-avro. Instead I use avro's standard api. I didn't post as answer because it doesn't solves the original question, but proposes a different solution. Do you think it's beneficial to post the code as answer?itaysk
Negative answer is still an answer and if you have a workaround it may save others some time.user6022341

1 Answers

5
votes

Update: spark-avro package was update to support this scenario. https://github.com/databricks/spark-avro/releases/tag/v3.1.0

This does not actually answer my question, rather a different solution for the same problem.

Since currently spark-avro is does not have this functionality (see my comment for the question) - I have instead used avro's org.apache.avro.mapreduce and spark's newAPIHadoopFile. Here is a simple example of that:

val path = "..."
val conf = new SparkConf().setAppName("avro test")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
val sc = new SparkContext(conf)

val avroRdd = sc.newAPIHadoopFile(path,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable])

contrarily to spark-avro, the official avro libs supports mixed union types and schema evolution.