Using spring-kafka 1.0.5, I am consuming from a busy topic with 10 partitions with a concurrency of 10.
My current code adds a message to a queue based on the partition ID which are both persisted in a HashMap.
@KafkaListener(topics = "${kafka.topic}")
public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) {
//Pseudo code
add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition.
}
Unfortunately, that design is taking twice the processing time a simple consumption would take.
My requirement is to process partitions separately but how can avoid having a hashmap with a reference to a partition based on the @KafkaListener.
Is there a more efficient way of going about this? Ideally, each thread from the listener annotation would manage its own list. Is there a way to do that without having a cross reference such as the hashmap mentioned above based on the partition ID?