0
votes

I'm using Spark Structured Streaming as described on this page.

I get correct message from Kafka topic but value is in Avro format. Is there some way to deserialize Avro records (something like KafkaAvroDeserializer approach)?

1
Is the schema in the message? Or is it generated by Confluent serializers with only a schema ID? - OneCricketeer
I'm using schema registry. - bajky
Is there a specific reason you're not using Kafka Streams instead? Also, as the Spark documentation says, you need to deserialize values from Dataframe operations as the ByteDeserializer is always used spark.apache.org/docs/latest/… - OneCricketeer
Several posts I've found just use normal Spark Streaming. stackoverflow.com/questions/41193764/… - OneCricketeer
Check out this answer stackoverflow.com/questions/48882723/… Hope it helps - tstites

1 Answers

1
votes

Spark >= 2.4

You can use from_avro function from spark-avro library.

import org.apache.spark.sql.avro._

val schema: String = ???
df.withColumn("value", from_avro($"value", schema))

Spark < 2.4

  • Define a function which takes Array[Byte] (serialized object):

    import scala.reflect.runtime.universe.TypeTag
    
    def decode[T : TypeTag](bytes: Array[Byte]): T = ???
    

    which will deserialize Avro data and create object, that can be stored in a Dataset.

  • Create udf based on the function.

    val decodeUdf  = udf(decode _)
    
  • Call udf on value

    val df = spark
      .readStream
      .format("kafka")
      ...
      .load()
    
    df.withColumn("value", decodeUdf($"value"))