0
votes

I have created a multithreaded Kafka consumer in which one thread is assigned to each of the partition (I have total 100 partitions). I have followed https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example link.

Below is the init method of my consumer.

consumer =  kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        System.out.println("Kafka Consumer initialized.");
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topicName, 100);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName);

        executor = Executors.newFixedThreadPool(100);

In the above init method, I got the list of Kafka streams (total 100) which should be connected to each of the partition (Which is happening as expected).

Then I did submit each of the streams to a different thread using below snippet.

public Object call() {

  for (final KafkaStream stream : streams) {
        executor.execute(new StreamWiseConsumer(stream));
    }
    return true;
  }

Below is the StreamWiseConsumer class.

public class StreamWiseConsumer extends Thread {

    ConsumerIterator<byte[], byte[]> consumerIterator;
    private KafkaStream m_stream;

    public StreamWiseConsumer(ConsumerIterator<byte[], byte[]> consumerIterator) {
        this.consumerIterator = consumerIterator;
    }

    public StreamWiseConsumer(KafkaStream kafkaStream) {
        this.m_stream = kafkaStream;
    }


    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> consumerIterator  = m_stream.iterator();

        while(!Thread.currentThread().isInterrupted() && !interrupted) {
            try {
                if (consumerIterator.hasNext()) {
                    String reqId = UUID.randomUUID().toString();
                    System.out.println(reqId+ " : Event received by threadId : "+Thread.currentThread().getId());
                    MessageAndMetadata<byte[], byte[]> messageAndMetaData = consumerIterator.next();
                    byte[] keyBytes = messageAndMetaData.key();
                    String key = null;
                    if (keyBytes != null) {
                        key = new String(keyBytes);
                    }
                    byte[] eventBytes = messageAndMetaData.message();
                    if (eventBytes == null){
                        System.out.println("Topic: No event fetched for transaction Id:" + key);
                        continue;
                    }
                    String event = new String(eventBytes).trim();
                    // Some Processing code
                    System.out.println(reqId+" : Processing completed for threadId = "+Thread.currentThread().getId());
                    consumer.commitOffsets();
            } catch (Exception ex) {

            }

        }
    }
}

Ideally, it should start processing from all the 100 partitions in parallel. But it is picking some random number of events from one of the threads and processing it then some other thread starts processing from another partition. It seems like it's sequential processing but with different-different threads. I was expecting processing to happen from all the 100 threads. Am I missing something here?

PFB for the logs link. https://drive.google.com/file/d/14b7gqPmwUrzUWewsdhnW8q01T_cQ30ES/view?usp=sharing https://drive.google.com/file/d/1PO_IEsOJFQuerW0y-M9wRUB-1YJuewhF/view?usp=sharing

1
You are using old version of Kafka client (<= 0.11.x.x). kafka.consumer.Consumer is deprecated, you should use `org.apache.kafka.clients.consumer.Consumer - Bartosz WardziƄski
What impact does it make? Even if it's older version then it should work. I am using Kafka version <version>0.8.2.1</version>. - Vivek Garg

1 Answers

0
votes

I doubt whether this is the right approach for vertically scaling kafka streams.

Kafka streams inherently supports multi thread consumption.

Increase the number of threads used for processing by using num.stream.threads configuration.

If you want 100 threads to process the 100 partitions, set num.stream.threads as 100.