I've implemented Kafka consumer using spring-kafka lib.
I have a Kafka topic with 2 partitions and also I use ConcurrentKafkaListenerContainerFactory
with concurrency level set to 2 as result every container instance should consume from the single partition in accordance to spring-kafka documentation.
The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.
There is my consumer class:
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
As you see I have 'hashMap' collection in that I put my events to a corresponding queue based on message 'customer_id' attribute. Such functionality requires additional synchronization in case of multiple threads access and as I see spring-kafka creates only one bean instance for all containers, instead of a separate bean instance for each container to avoid concurrency problems.
How can I change this logic programmatically?
I see the only weird way that fixes this problem is to use two JVM's running a separate application with single threaded consumer inside it, as a result access to KafkaConsumer class with #receive method will be single-threaded.