4
votes

I've implemented Kafka consumer using spring-kafka lib. I have a Kafka topic with 2 partitions and also I use ConcurrentKafkaListenerContainerFactory with concurrency level set to 2 as result every container instance should consume from the single partition in accordance to spring-kafka documentation.

The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.

There is my consumer class:

@Component
public class KafkaConsumer {
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
        String message = record.value().toString();
        Event event = EventFactory.createEvent(message);
        String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
        // add event to hashMap
        LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
        if (queue == null) {
            queue = new LinkedBlockingQueue<>();
            queue.add(event);
            hashMap.put(customerId, queue);
        } else {
            queue.add(event);
        }
    }
}

As you see I have 'hashMap' collection in that I put my events to a corresponding queue based on message 'customer_id' attribute. Such functionality requires additional synchronization in case of multiple threads access and as I see spring-kafka creates only one bean instance for all containers, instead of a separate bean instance for each container to avoid concurrency problems.

How can I change this logic programmatically?

I see the only weird way that fixes this problem is to use two JVM's running a separate application with single threaded consumer inside it, as a result access to KafkaConsumer class with #receive method will be single-threaded.

1

1 Answers

6
votes

That's correct. That how it works. The Framework relies really not on a bean but just its method to deliver messages to the function.

You may consider to have two @KafkaListener methods for each partition in your topic. That's true that records from one partition are delivered to the @KafkaListener in a single Thread. So, if you really can't live with that state you may use two HashMap for each thread.

The general idea behind that listener abstraction is exactly about stateless behavior. That KafkaConsumer is the regular Spring singleton bean. You have to live with that fact and redesign your solution according this situation.