1
votes

I want to write a DataFrame in Avro format using a provided Avro schema rather than Spark's auto-generated schema. How can I tell Spark to use my custom schema on write?

{
  "type" : "record",
  "name" : "name1",
  "namespace" : "com.data"
  "fields" : [
  {
    "name" : "id",
    "type" : "string"
  },
  {
    "name" : "count",
    "type" : "int"
  },
  {
    "name" : "val_type",
    "type" : {
      "type" : "enum",
      "name" : "ValType"
      "symbols" : [ "s1", "s2" ]
    }
  }
  ]
}
  1. avro reading with the using of avroSchema. On this step everything is ok.

    Dataset d1 = spark .read() .option("avroSchema",String.valueOf(inAvroSchema)) .format("com.databricks.spark.avro") .load("s3_path");

  2. here I perform some spark.sql on above data and storing to DataFrame.

  3. When I tried to write avro data to s3 based on avro schema

DF datatypes:

root
 |-- id: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- val_type: string (nullable = true)

FinalDF.write().option("avroSchema",String.valueOf(inAvroSchema)).format("com.databricks.spark.avro").mode("overwrite").save("target_s3_path");

I got the error:

User class threw exception: org.apache.spark.SparkException: Job aborted.
    ......
    Caused by: org.apache.avro.AvroRuntimeException: **Not a union: "string"**
        at org.apache.avro.Schema.getTypes(Schema.java:299)
        at 
org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)

Is there any way to use avro schema for writing the avro data or if it's right approach (with "option("avroSchema",String.valueOf(inAvroSchema))") - may be I'm doing something wrong? "forceSchema" option doesn't work in my case.

Thanks in advance.

2
i think the problem has more to do with "type" : "enum", , can you change it to string and check if it works?Sai Kiran KrishnaMurthy
Thanks @SaiKiranKrishnaMurthy Yeah, I know it. It works with string. I should follow by provided avro schema (I have more fields there, it's just part and most of them are enum). I found issues.apache.org/jira/browse/SPARK-24855 and it's in progress, and for spark 3.0. We are using 2.x. May be I need to use DatumWriter. But how to write down spark dataframe with using GenericDatumWriter. It's question :)Sergii Chukhno

2 Answers

2
votes

I did some digging around and I found something interesting,

case class Name1(id: String, count: Int, val_type: String)

val schema = """{
                   |  "type" : "record",
                   |  "name" : "name1",
                   |  "namespace" : "com.data",
                   |  "fields" : [
                   |  {
                   |    "name" : "id",
                   |    "type" : "string"
                   |  },
                   |  {
                   |    "name" : "count",
                   |    "type" : "int"
                   |  },
                   |  {
                   |    "name" : "val_type",
                   |    "type" : {
                   |      "type" : "enum",
                   |      "name" : "ValType",
                   |      "symbols" : [ "s1", "s2" ]
                   |    }
                   |  }
                   |  ]
                   |}""".stripMargin


val d = Seq(Name1("1",2,"s1"),Name1("1",3,"s2"),Name1("1",4,"s2"),Name1("11",2,"s1")).toDF()

d.write.mode(SaveMode.Overwrite).format("avro").option("avroSchema",schema).save("data/tes2/")

The code above fails when I execute the code with spark 2.4.x, however when I run the same code with the new Spark 3.0.0 the code succeeds and the data is successfully written.

val df = spark.read.format("avro").load("data/tes2/")
df.printSchema()
df.show(10)

root
 |-- id: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- val_type: string (nullable = true)

+---+-----+--------+
| id|count|val_type|
+---+-----+--------+
| 11|    2|      s1|
|  1|    4|      s2|
|  1|    3|      s2|
|  1|    2|      s1|
+---+-----+--------+

I guess the best idea would be to upgrade spark version or change the avro schema definition.

0
votes

You can use org.apache.spark:spark-avro package and try to set avroSchema option on the to_avro function. here is the doc: https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro