0
votes

I saw this note in kafka consumer documentation -

Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances than partitions.

I have 50 partitions for a single topic. If I give a_numThreads value as 50, 1 message be fetched from each partition? Does the above message mean that I cannot create more than 50 threads at any point in time in my case?

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}
1

1 Answers

1
votes

The fact that you are doing a_numThreads = 50 and then Executors.newFixedThreadPool(a_numThreads); yes, means that you can not create more that 50 threads in any point in time at least not with that executor.

What the documentation is saying is that a partition can only be assigned to 1 Stream, if you instead of creating 50 streams creates 51 streams the latter one will get nothing as explained here