0
votes

I am new to kafka and using Apache kafka consumer to read messages from producer. But when I stop and start for certain time. All the produced messages between are lost. how to handle this scenario. I am using these properties "auto.offset.reset", "latest" and "enable.auto.commit", "false" .

This is the code I am using.Any help is appreciated.

Properties props = new Properties();
        props.put("bootstrap.servers", localhost:9092);
        props.put("group.id", "service");
        props.put("enable.auto.commit", "false"); 
        props.put("auto.offset.reset", "latest"); 
        props.put("key.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);
        props.put("value.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);

        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicname));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {        
                JSONObject jsonObj = new JSONObject(record.value());
                JdbcUtilToUdm.insertdataintodb(args, jsonObj);   
            }
        }   
1

1 Answers

0
votes

You have to explicitly call consumer.commitSync() or consumer.commitAsync() since you disabled auto commit. You can do the commit synchronously or not depending on which method you need/prefer. This is how the consumer group position in the log will be persisted. You should call commit after records are processed (so probably after you have finished all inserts but before you poll again in your case).