I have been trying to read Kafka's avro serialized messages from spark structured streaming (2.4.4) with Scala 2.11. For this purpose i have used spark-avro (dependency below). I generate kafka messages from python using confluent-kafka library. Spark streaming is able to consume the messages with the schema but it doesn't read the values of the fields correctly. I have prepared a simple example to show the problem, the code is avalible here: https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala
I create records in python, the schema of the records is:
{
"type": "record",
"namespace": "example",
"name": "RawRecord",
"fields": [
{"name": "int_field","type": "int"},
{"name": "string_field","type": "string"}
]
}
And they are generated like this:
from time import sleep
from confluent_kafka.avro import AvroProducer, load, loads
def generate_records():
avro_producer_settings = {
'bootstrap.servers': "localhost:19092",
'group.id': 'groupid',
'schema.registry.url': "http://127.0.0.1:8081"
}
producer = AvroProducer(avro_producer_settings)
key_schema = loads('"string"')
value_schema = load("schema.avsc")
i = 1
while True:
row = {"int_field": int(i), "string_field": str(i)}
producer.produce(topic="avro_topic", key="key-{}".format(i),
value=row, key_schema=key_schema, value_schema=value_schema)
print(row)
sleep(1)
i+=1
The consumption from spark structured streaming (in Scala) is done like this:
import org.apache.spark.sql.{ Dataset, Row}
import org.apache.spark.sql.streaming.{ OutputMode, StreamingQuery}
import org.apache.spark.sql.avro._
...
try {
log.info("----- reading schema")
val jsonFormatSchema = new String(Files.readAllBytes(
Paths.get("./src/main/resources/schema.avsc")))
val ds:Dataset[Row] = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", topic)
.load()
val output:Dataset[Row] = ds
.select(from_avro(ds.col("value"), jsonFormatSchema) as "record")
.select("record.*")
output.printSchema()
var query: StreamingQuery = output.writeStream.format("console")
.option("truncate", "false").outputMode(OutputMode.Append()).start();
query.awaitTermination();
} catch {
case e: Exception => log.error("onApplicationEvent error: ", e)
//case _: Throwable => log.error("onApplicationEvent error:")
}
...
Printing the schema in spark, it's strange that the fields are nullable although the avro schema does not allow that. Spark shows this:
root
|-- int_field: integer (nullable = true)
|-- string_field: string (nullable = true)
I have checked the messages with another consumer in python and the messages are fine but independently of the message content spark shows this.
+---------+------------+
|int_field|string_field|
+---------+------------+
|0 | |
+---------+------------+
The main dependencies used are:
<properties>
<spark.version>2.4.4</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
Does anyone know why this might be happening?
Thanks in advance. The code to reproduce the error is here:
https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala
SOLUTION
The problem was that i was using the confluent_kafka library in python and i was reading the avro messages in spark structured streaming using spark-avro library.
Confluent_kafka library uses confluent's avro format and spark avro reads using standard avro format.
The difference is that in order to use schema registry, confluent avro prepends the message with four bytes that indicates which schema should be used.
For being able to use confluent avro and read it from spark structured streaming i replaced spark-avro library for Abris ( abris allow to integrate avro and confluent avro with spark). https://github.com/AbsaOSS/ABRiS