4
votes

I am working on a spark streaming application using spark structured streaming, confluent open source Kafka cluster and running spark job in AWS EMR. We have at-least 20+ Kafka topics producing data into individual Kafka topics in AVRO format and each topic has been partitioned between 3 to 4 partitions. I am reading all 20+ topics (comma separated topic values) using Spark ReadStream. Then filtering each message row from a resulting DataFrame, applying each message with correct Avro schema and writing the resulting Dataset[T] to S3 and Cassandra.

Few questions that I couldn't find answers for

  1. Am I ok to use one ReadStream for all the topics? Will it be considered as one Spark Consumer for all the topics and partitions because I am only executing one 'spark-submit job' in EMR?

  2. How does the spark application distribute processing across the partitions? Does spark read these topic/partitions in parallel using different executors or do I need to implement any multi-threading for each each partition?

  3. Is it possible to scale to multiple consumers within a consumer group to parallelise?

Apologies for loads of questions and I think they all related. Appreciate any of your feedback or pointers where I can find documentation.

  1. MyConfig

    val kafkaParams=  Map("kafka.bootstrap.servers" -> "topic1,topic2,topic3,topic4,topic5,
        "failOnDataLoss" -> param.fail_on_data_loss.toString,
        "subscribe" -> param.topics.toString,
        "startingOffsets" -> param.starting_offsets.toString,
        "kafka.security.protocol" -> param.kafka_security_protocol.toString,
        "kafka.ssl.truststore.location" -> param.kafka_ssl_truststore_location.toString,
        "kafka.ssl.truststore.password" -> param.kafka_ssl_truststore_password.toString
      )
    
  2. ReadStream code

    val df = sparkSession.readStream
      .format("kafka")
      .options(kafkaParams)
      .load()
    
  3. Then splitting Input Dataframe into multiple data frames using the 'topic column, and apply the Avro schema for each resulting dataframe.

  4. Writing each Dataset[T] to different sinks like S3, Cassandra, etc...

1
Did you manage to get answers for your questions ? - KarthikJ

1 Answers

0
votes

Am I ok to use one ReadStream for all the topics?

Assuming all topics can use the same set of Kafka configurations, then sure. Might not be fault tolerant, though. For example, failOnDataLoss will cause the whole job to fail on a single topic failure.

Will it be considered as one Spark Consumer for all the topics and partitions... Does spark read these topic/partitions in parallel using different executors?

Yes. And you can scale the number of Spark Executors up to the total sum of the partitions for all the topics.

do I need to implement any multi-threading for each each partition?

Spark should be handling that for you.

Is it possible to scale to multiple consumers within a consumer group to parallelise?

You should try to set a group.id property in your code, but having multiple executors is already creating a consumer group.


Unrelated to the question - What you are trying to do is already what Kafka Connect is meant for. Reading Kafka data into various data sources. S3 and Cassandra are already two existing implementations of plugins.