I'm trying to commit a message just after reading it from the topic. I've followed this link (https://www.confluent.io/blog/apache-kafka-spring-boot-application) to create a Kafka consumer with spring. Normally it works perfect and the consumer gets the message and waits till anotherone enters in the queue. But the problem is that when I process this messages it takes a lot of time (circa 10 minutes) the kafka queue thinks that the message is not consumed (commited) and the consumers reads it again and again. I have to say that when my process time is less than 5 minutes it works well but when it lastas longer it doesn't commit the message.
I've looked for some answers around but it doesn't help me because I'm not using the same source code (and of course a different structure). I've tried to send asynchronous methods and also to commit asynchronously the message but I've failed. Some of the sources are:
Spring Boot Kafka: Commit cannot be completed since the group has already rebalanced
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
Kafka 0.10 Java consumer not reading message from topic
https://github.com/confluentinc/confluent-kafka-dotnet/issues/470
The main class is here :
@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApp .class, args);
}
The consumer class (where I need to commit my message)
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
Properties props=prope.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
How can I commit the message just after I read it from the queue.
I want to be sure that when I receive the message I commit the message immediately. Right now the message is commited when I finish to execute the method just after the (System.out.println). So can anybody tell me how to do this?
----- update -------
Sorry for the late reply but as @GirishB suggested I've been looking to the configuration of GirishB but I don't see where I can define the topic I want to read/listen from my configuration file (applications.yml). All the examples that I see use a structure similar to this (http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html). Is there any option that I can read a topic that is declared in other server? Using something similar to this @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
=========== SOLUTION 1 ========================================
I followed @victor gallet advice and included the declaration of the confumer porperties in oder to accomodate the "Acknowledgment" object in the consume method. I've also followed this link (https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration.java) to get all the methods that I've used to declare and set all the properties (consumerProperties, consumerFactory, kafkaListenerContainerFactory). The only problem I found is the "new SeekToCurrentErrorHandler() " declaration because I'm getting an error and for the moment I'm not able to resolve it (would be great if someone explain it to me).
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer"));
return props;
}
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
acknowledgment.acknowledge();// commit immediately
Properties props=prop.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
``````````````````````````````````````````````````````````
max.poll.interval.ms
,max.poll.interval.ms
andmax.poll.records
see kafka.apache.org/documentation – Paizo