2
votes

I want to create single kafka consumer for several topics. Method constructor for consumer allows me to transfer arguments for a list of topics inside subscription, like that:

private Consumer createConsumer() {
    Properties props = getConsumerProps();
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    ArrayList<String> topicMISL = new ArrayList<>();
    for (String s:Connect2Redshift.kafkaTopics) {
        topicMISL.add(systemID + "." + s);
    }
    consumer.subscribe(topicMISL);
    return consumer;
}


private boolean consumeMessages( Duration duration, Consumer<String, byte[]> consumer) {
        try {  Long start = System.currentTimeMillis();
            ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(duration);
            }
   }

Afterwards I want to poll records from kafka into stream every 3 sec and process them, but I wonder what is inside this consumer - how will records from different topics be polled - at first one topic, then another, or in parallel. Could it be that one topic with large amount of messages would be processed all the time and another topic with small amount of messages would wait?

2

2 Answers

0
votes

in general it depends on your topic settings. Kafka scales by using multiple partitions per topic.

  • If you have 3 partitions on 1 topic, kafka can read from them in parallel
  • The same is true for multiple topics, reading can happen in parallel

If you have a partition that receives a lot more messages than the others, you may run into the scenario of a consumer lag for this particular partition. Tweaking the batch size and consumer settings may help them, also compressing messages. Ideally making sure to distribute the load evenly avoids this scenario.

Look into this blog article, it gave me a good understanding of the internals: https://www.confluent.io/blog/configure-kafka-to-minimize-latency/

0
votes
ConsumerRecords<String, String> records = consumer.poll(long value);
    for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
        for (ConsumerRecord<String, String> record : partitionRecords) {
            
        }
        
    }

Also need to do commit for offset by finding offset and commit using consumer.commitSync