0
votes

We have a use-case where we are trying to consumer multiple Kafka topics (AVRO messages) integrating with Schema registry. We are using Spark Structured streaming (Spark version : 2.4.4) , Confluent Kafka (Library version: 5.4.1) for the same:

val kafkaDF: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap.server>:9092")
  .option("subscribe", [List of Topics]) // multi topic subscribe
  .load()

We are selecting value from the above DF and using Schema to de-serialize the AVRO message

val tableDF = kafkaDF.select(from_avro(col("value"), jsonSchema).as("table")).select("*")

The Roadblock here is , since we are using multiple Kafka topics , we have have integrated all our JSON schema into MAP with key being the topic name and values being the respective schema. How can we do a lookup using the same map in the select above? we tried UDF but it returns a "col" type but jsonSchema has to be of String type. Also it Schema is different for different topics

Couple of added questions:

  1. Is the above approach correct for consuming multiple topics at once?
  2. Should we use single consumer per topic?
  3. If we have more number of topics how can we achieve parallel topic processing, cos sequential might take substantial amount of time.
1
Are you transforming the data? Storing to where?thebluephantom
there is tad bit of transformation required.Ajith Sasidharan
OK, otherwise KAFKA Connect would have been a better optionthebluephantom
you can map the topics indeed, but it depends on volume. Are you writing to different output areas?thebluephantom
yes we are, mapping topics is fine, but what about schema , that's one per topic right? how can we combine them.Ajith Sasidharan

1 Answers

1
votes

Without checking it all, you appear to be OK on the basics with from_avro, etc. from_json, etc.

  1. https://sparkbyexamples.com/spark/spark-streaming-consume-and-produce-kafka-messages-in-avro-format/ can guide you on first part. This also very good https://www.waitingforcode.com/apache-spark-structured-streaming/two-topics-two-schemas-one-subscription-apache-spark-structured-streaming/read#filter_topic.

  2. I would do table.*

  3. Multiple, multiple schemas --> read multiple such versions from .avsc or code yourself.

  4. For consuming multiple topics in a Spark Streaming App the question is how many per App? No hard rule except obvious ones like large consumption vs. smaller consumption and if order is important. Executor resources can be relinquished.

  5. Then you need to process all the Topics separately like this imho - Filter on Topic - you can fill in the details as I am a little rushed - using the foreachBatch paradigm.

  6. Not sure how you are writing the data out at rest, but the question is not about that.

Similar to this, then to process multiple Topics:

...
... // Need to get Topic first
stream.toDS()
      .select($"topic", $"value")
      .writeStream
      .foreachBatch((dataset, _) => {
         dataset.persist() // need this

         dataset.filter($"topic" === topicA)
                .select(from_avro(col("value"), jsonSchemaTA)
                .as("tA"))
                .select("tA.*")
                .write.format(...).save(...)

         dataset.filter($"topic" === topicB)
                .select(from_avro(col("value"), jsonSchemaTB)
                .as("tB"))
                .select("tB.*")
                .write.format(...).save(...)
          ...
          ...
         dataset.unpersist()
         ...
         })

         .start().awaitTermination()

but blend with this excellent answer: Integrating Spark Structured Streaming with the Confluent Schema Registry