Im trying the below code in spark 2.4.3
to read Avro messages from kafka.
Schema is stored in confluent schema registry
when the data gets publised on kafka.
I have been trying out few solutions which has been already discussed here (Integrating Spark Structured Streaming with the Confluent Schema Registry / Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)) but couldn't make it work.
Or I could not find a proper way of doing this especially when the schema is stored in some Schema Registry
.
Here is the current code I'm trying out where at least I'm able to get some result
but all the records are coming out as null
values. Actually the topic has got data.
Could someone please help me out on this?
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.avro.SchemaConverters
object ScalaSparkAvroConsumer {
private val topic = "customer.v1"
private val kafkaUrl = "localhost:9092"
private val schemaRegistryUrl = "http://127.0.0.1:8081"
private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
def main(args: Array[String]): Unit = {
val spark = getSparkSession()
spark.sparkContext.setLogLevel("ERROR")
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes)
)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val valueDataFrame = df.selectExpr("""deserialize(value) AS message""")
import org.apache.spark.sql.functions._
val formattedDataFrame = valueDataFrame.select(
from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
.select("parsed_value.*")
formattedDataFrame
.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()
}
object DeserializerWrapper {
val deserializer = kafkaAvroDeserializer
}
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
genericRecord.toString
}
}
}
Getting the output as below:
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------+
|header|control|
+------+-------+
|null |null |
|null |null |
|null |null |
|null |null |
+------+-------+
only showing top 20 rows
.asInstanceOf[GenericRecord]
, for example – OneCricketeervalueDataFrame
? Can you dovalueDataFrame.writeStream.format("console")
? And just to make it easier to debug, useread
(Spark SQL) notreadStream
(Structured Streaming) until it gives you proper values. – Jacek Laskowskiread
and it gave me the actual message as this.{"header": {"Id": "123"},"control": {"subject": "EOD"}}
– Leibnitz