3
votes

I have a Kafka topic with 3 partitions and I'm consuming that data using spark structured streaming. I have 3 consumers (lets say consumer group A) reading from single partition each, everything is working file till here.

I have a new requirement to read from the same topic and I want to parallelize it by creating 3 consumers (say consumer group B) again each reading from single partition. As I'm using structured streaming I can't mention group.id explicitly.

Will consumers from different group pointing to single/same partition read all the data ?

3
I don't know how spark works with this, but if the question is if the read is independent between groups, yes; You'll have two consumers for each partition, each one with its own group id, reading all the messages independentlyaran

3 Answers

2
votes

From Spark 3.0.1 documentation:

By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics.

So, if you are using assign option and mentioning which partition to use it will read all data from a specific partition as by it's default nature it will be a different consumer group (group.id). assign option takes json string as a value and can have multiple partitions from different topics as well. For e.g., {"topicA":[0,1],"topicB":[2,4]}.

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("assign", "{"topic-name":[0]}")
  .load()
0
votes

Use can use group.id as below for streaming

String processingGroup = "processingGroupA";

Dataset<Row> raw_df = sparkSession
                      .readStream()
                      .format("kafka")
                      .option("kafka.bootstrap.servers", consumerAppProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
                      .option("subscribe", topicName) 
                      .option("startingOffsets", "latest")
                      .option("group.id",  processingGroup)
                      .load();
0
votes

Unless you are using Spark 3.x or higher, you will not be able to set the group.id in your Kafka input stream. Using Spark 3.x you could, as you have mentioned, have two different Structured Streaming jobs providing two different group.id to ensure that each job reads all message of the topic independent of the other job.

For Spark versions <= 2.4.x, Spark itself will create a unique Consumer Group for you as you can look up in the code on GitHub:

// Each running query should use its own group id. Otherwise, the query may be only 
// assigned partial data since Kafka will assign partitions to multiple consumers having
// the same group id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

So, also in that case, having two different Streaming Jobs will ensure that you have two different ConsumerGroup which allows both jobs to read all messages from the topic independent of the other job.