1
votes

Im trying the below code in spark 2.4.3 to read Avro messages from kafka.

Schema is stored in confluent schema registry when the data gets publised on kafka. I have been trying out few solutions which has been already discussed here (Integrating Spark Structured Streaming with the Confluent Schema Registry / Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)) but couldn't make it work. Or I could not find a proper way of doing this especially when the schema is stored in some Schema Registry.

Here is the current code I'm trying out where at least I'm able to get some result but all the records are coming out as null values. Actually the topic has got data. Could someone please help me out on this?

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.avro.SchemaConverters

object ScalaSparkAvroConsumer {

    private val topic = "customer.v1"
    private val kafkaUrl = "localhost:9092"
    private val schemaRegistryUrl = "http://127.0.0.1:8081"

    private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

    private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
    private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

    def main(args: Array[String]): Unit = {
      val spark = getSparkSession()

      spark.sparkContext.setLogLevel("ERROR")

      spark.udf.register("deserialize", (bytes: Array[Byte]) =>
        DeserializerWrapper.deserializer.deserialize(bytes)
      )

      val df = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUrl)
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .load()

      val valueDataFrame = df.selectExpr("""deserialize(value) AS message""")

      import org.apache.spark.sql.functions._

      val formattedDataFrame = valueDataFrame.select(
        from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
        .select("parsed_value.*")

      formattedDataFrame
        .writeStream
        .format("console")
        .option("truncate", false)
        .start()
        .awaitTermination()
    }

    object DeserializerWrapper {
      val deserializer = kafkaAvroDeserializer
    }

    class AvroDeserializer extends AbstractKafkaAvroDeserializer {
      def this(client: SchemaRegistryClient) {
        this()
        this.schemaRegistry = client
      }

      override def deserialize(bytes: Array[Byte]): String = {
        val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
        genericRecord.toString
      }
    }
}

Getting the output as below:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------+
|header|control|
+------+-------+
|null  |null   |
|null  |null   |
|null  |null   |
|null  |null   |
+------+-------+
only showing top 20 rows        
1
I already tried out those as I stated in the description and couldn't make it work. Can you please advise me on this?Leibnitz
I wrote the answer there and can verify it worked for me. If you get null, it's likely that the generated schema didn't align with the record content. The answer there didn't use .asInstanceOf[GenericRecord], for exampleOneCricketeer
Can you check what's inside valueDataFrame? Can you do valueDataFrame.writeStream.format("console")? And just to make it easier to debug, use read (Spark SQL) not readStream (Structured Streaming) until it gives you proper values.Jacek Laskowski
Yes, I used read and it gave me the actual message as this. {"header": {"Id": "123"},"control": {"subject": "EOD"}}Leibnitz

1 Answers

0
votes

An integration of Avro serialization, Kafka schema server and Spark Streaming with from_confluence_avro() will make your life easier. You can find it here:

https://github.com/AbsaOSS/ABRiS