0
votes

In a spring-boot (version 2.1.4) application, there is requirement to migrate apache-kafka to spring-kafka. Current kafka consumer does: 1) KafkaConsumer bean initialized at the time of application up 2) It has "0" topic partition set 3) poll the data using apache kafkaConsumer into ConsumerRecord 4) The application its own has Retry mechanism to wait and poll again till max_retry

The legacy code looks below:

 while (!done.get()) {
    ConsumerRecords<byte[], <byte[]> records = kafkaConsumer.poll(<MAX_VALUE>);
    if (records.isEmpty()) {
        retryCount++;
        Thread.sleep(<some_time>);
    } else {
    // Process records;
    }
    if (retryCount > <max_retry_count>) {
    done.set(true); 
    }    
  }

Tried below approaches: 1) Using spring kafka annotation (@KafkaListener), but it does not let us have control over polling. 2) Created "ConcurrentMessageListenerContainer" and setupMessageListener adds records into queue for polling. This gives us control on consumer.

I wanted to know, am I heading towards correct direction? What would be better solution to achieve above requirement using spring-kafka?

1

1 Answers

0
votes

It's not clear what you mean by "control on consumer". Creating a container is the same as using a @KafkaListener (a container is created under the covers).

Spring uses a "message-driven" approach.

You can set the idleEventInterval and the container will publish a ListenerContainerIdleEvent if no records are received during that time. You can listen for these events with an ApplicationListener bean, or an @EventListener method.