1
votes

I have been trying to read Kafka's avro serialized messages from spark structured streaming (2.4.4) with Scala 2.11. For this purpose i have used spark-avro (dependency below). I generate kafka messages from python using confluent-kafka library. Spark streaming is able to consume the messages with the schema but it doesn't read the values of the fields correctly. I have prepared a simple example to show the problem, the code is avalible here: https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

I create records in python, the schema of the records is:

{
    "type": "record",
    "namespace": "example",
    "name": "RawRecord",
    "fields": [
        {"name": "int_field","type": "int"},
        {"name": "string_field","type": "string"}
    ]
}

And they are generated like this:

from time import sleep
from confluent_kafka.avro import AvroProducer, load, loads

def generate_records():
    avro_producer_settings = {
        'bootstrap.servers': "localhost:19092",
        'group.id': 'groupid',
        'schema.registry.url': "http://127.0.0.1:8081"
    }
    producer = AvroProducer(avro_producer_settings)
    key_schema = loads('"string"')
    value_schema = load("schema.avsc")
    i = 1
    while True:
        row = {"int_field": int(i), "string_field": str(i)}
        producer.produce(topic="avro_topic", key="key-{}".format(i), 
                         value=row, key_schema=key_schema, value_schema=value_schema)
        print(row)
        sleep(1)
        i+=1

The consumption from spark structured streaming (in Scala) is done like this:

import org.apache.spark.sql.{ Dataset, Row}
import org.apache.spark.sql.streaming.{ OutputMode, StreamingQuery}
import org.apache.spark.sql.avro._
...
        try {

            log.info("----- reading schema")
            val jsonFormatSchema = new String(Files.readAllBytes(
                                                    Paths.get("./src/main/resources/schema.avsc")))

            val ds:Dataset[Row] = sparkSession
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaServers)
                .option("subscribe", topic)
                .load()

            val output:Dataset[Row] = ds
                .select(from_avro(ds.col("value"), jsonFormatSchema) as "record")
                .select("record.*")

            output.printSchema()

            var query: StreamingQuery = output.writeStream.format("console")
                .option("truncate", "false").outputMode(OutputMode.Append()).start();


            query.awaitTermination();

        } catch {
            case e: Exception => log.error("onApplicationEvent error: ", e)
            //case _: Throwable => log.error("onApplicationEvent error:")
        }
...

Printing the schema in spark, it's strange that the fields are nullable although the avro schema does not allow that. Spark shows this:

root
 |-- int_field: integer (nullable = true)
 |-- string_field: string (nullable = true)

I have checked the messages with another consumer in python and the messages are fine but independently of the message content spark shows this.

+---------+------------+
|int_field|string_field|
+---------+------------+
|0        |            |
+---------+------------+

The main dependencies used are:

<properties>
    <spark.version>2.4.4</spark.version>
    <scala.version>2.11</scala.version>
</properties>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

Does anyone know why this might be happening?

Thanks in advance. The code to reproduce the error is here:

https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

SOLUTION

The problem was that i was using the confluent_kafka library in python and i was reading the avro messages in spark structured streaming using spark-avro library.

Confluent_kafka library uses confluent's avro format and spark avro reads using standard avro format.

The difference is that in order to use schema registry, confluent avro prepends the message with four bytes that indicates which schema should be used.

Source: https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

For being able to use confluent avro and read it from spark structured streaming i replaced spark-avro library for Abris ( abris allow to integrate avro and confluent avro with spark). https://github.com/AbsaOSS/ABRiS

1
spark-avro does not support Confluent Schema Registry encoded messages.OneCricketeer
If you have the opportunity, please create an answer for your question rather than update the question itself. It will be more valuable for users who might look at this in the future to see that this question had an answer.Tedinoz

1 Answers

1
votes

SOLUTION

The problem was that i was using the confluent_kafka library in python and i was reading the avro messages in spark structured streaming using spark-avro library.

Confluent_kafka library uses confluent's avro format and spark avro reads using standard avro format.

The difference is that in order to use schema registry, confluent avro prepends the message with four bytes that indicates which schema should be used.

Source: https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

For being able to use confluent avro and read it from spark structured streaming i replaced spark-avro library for Abris ( abris allow to integrate avro and confluent avro with spark). https://github.com/AbsaOSS/ABRiS

My dependencies changed like this:

<properties>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11</scala.version>
</properties>
<!-- SPARK- AVRO -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- SPARK -AVRO AND CONFLUENT-AVRO -->
<dependency>
    <groupId>za.co.absa</groupId>
    <artifactId>abris_2.11</artifactId>
    <version>3.1.1</version>
</dependency>

And here you can see an easy example that get the message and deserializes its values as avro and confluent avro.

var input: Dataset[Row] = sparkSession.readStream
    //.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaServers)
    .option("subscribe", topicConsumer)
    .option("failOnDataLoss", "false")
    // .option("startingOffsets", "latest")
    // .option("startingOffsets", "earliest")
    .load();


// READ WITH spark-avro library (standard avro)

val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./src/main/resources/schema.avsc")))

var inputAvroDeserialized: Dataset[Row] = input
    .select(from_avro(functions.col("value"), jsonFormatSchema) as "record")
    .select("record.*")

//READ WITH Abris library (confuent avro) 

val schemaRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topicConsumer,
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME, // choose a subject name strategy
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest" // set to "latest" if you want the latest schema version to used
)

var inputConfluentAvroDeserialized: Dataset[Row] = inputConfluentAvroSerialized
    .select(from_confluent_avro(functions.col("value"), schemaRegistryConfig) as "record")
    .select("record.*")