0
votes

I have a spark application which is required to read from two different topics using one consumer using Spark Java. The kafka message key & value schema is same for both the topics.

Below is the workflow:

1. Read messages from both the topics, same groupID, using JavaInputDStream<ConsumerRecord<String, String>> and iterate using foreachRDD
2. Inside the loop, Read offsets, filter messages based on the message key and create JavaRDD<String>
3. Iterate on JavaRDD<String> using mapPartitions
4. Inside mapPartitions loop, iterate over them using forEachRemaining.
5. Perform data enrichment, transformation, etc on the rows inside forEachRemaining loop.
6. commit 

I want to understand below questions. Please provide your answers or share any documentation which can help me find answers.

1. How the messages are received/consumed from two topics(one common group id, same schema both key/value) in one consumer.
Let say the consumer reads data every second. Producer1 produces 50 messages to Topic1 and Producer 2 produces 1000 messages to Topic2.
2. Is it going to read all msgs(1000+50) in one batch and process together in the workflow, OR is it going to read 50 msgs first, process them and then read 1000 msgs and process them.
3. What parameter should i use to control the number of messages being read in one batch per second.
4. Will same group id create any issue while consuming.
2

2 Answers

0
votes

The official document in Spark Streaming already explains on how to consume multiple topics per group id. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );
  1. One group id and follows same schema for both the topics.
  2. Not sure about this, however from my understanding it would consume all the messages depending on the batch size.
  3. "spark.streaming.backpressure.enabled" set this as true and "spark.streaming.kafka.maxRatePerPartition" set this as a numeric value, based on this spark limits the number of messaged to consume from kafka per batch. Also set the batch duration accordingly. https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html
  4. This totally depends on your application usage.
0
votes

1. How the messages are received/consumed from two topics(one common group id, same schema both key/value) in one consumer. Let say the consumer reads data every second. Producer1 produces 50 messages to Topic1 and Producer 2 produces 1000 messages to Topic2.
Any Kafka consumer can mention a list of topics, so no constraints about this.
So if you have one consumer, it will be responsible for all the partitions of both Topic1 and Topic2.
2. Is it going to read all msgs(1000+50) in one batch and process together in the workflow, OR is it going to read 50 msgs first, process them and then read 1000 msgs and process them.
3. What parameter should I use to control the number of messages being read in one batch per second.
Answer for both 2,3 questions:
It will receive all the messages together (1050) or even more, depending on your configuration.
In order to allow the consumer to receive in batches of 1050 or greater, raise max.poll.records (default 500) to 1050 (or more); other configuration may be a bottleneck, but you should be ok with the rest for the default configurations.
4. Will same group id create any issue while consuming.
The same group-id will affect you if you create more than one consumer, making the consumers to split the partitions they responsible of between topics.
Moreover, if your consumer dies or stops for some reason you have to get it back up with the same group-id, this way the consumer "remembers" the last offset consumed and keeps from the points it stopped.

If you have any more problems regarding to your consumer, I suggest you to read more information in this article, it is chapter 4 from Kafka: The Definitive Guide, explaining deeply about consumers and should answer further questions.
If you want to explore the configuration options, the documentation is always helpful.