i am planning to do batch processing using spring kafka batch listener. I am looking for few samples for these 2 scenarios.
- How do we implement filter record strategy with batch processing? UPDATE : From the document - " In addition, a FilteringBatchMessageListenerAdapter is provided, for when you use a batch message listener." is not clear. I did not see any container factory method to set this filterbatchmessagelisteneradapter object or filter implementation.
HERE is my code for batch listener filter strategy :
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecords) {
//log.info("Retrieved the record {} from the partition {} with offset {}", consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());
return true;
}
});
return factory;
}
- How can we do a manual offset commit, once we retrieve the batch of messages in the consumer and all got processed. During batch process if any failure comes, just want to push that message to error topic.But finally I would like to commit entire batch at a time .
Now other question I came to mind is how the above scenario works with a single consumer and with multiple consumers.
Let’s say case 1 : single consumer
Let’s say we have a topic with 5 partitions . When we subscribe to that topic, we assume we got 100 messages from the topic in which each partition has 20 messages. If we want to commit these message offset, does the acknowledgment object hold each partition and last offset of the last message?
Case2: multiple consumers
With the same input as mentioned in case1, If we enable the equal no of consumers with partition count, does the ack object hold partition and last message offset?
Can you please help me on this?