0
votes

Below is my spring boot kafka consumer application to read data from kafka topic. In this application we are planning to implement heartbeat funtionally to post its heartbeat to url using @schduling annotaion to know its alive and running(which loads my json input data to db). purpose of this post request is to update the status on application monitoring tool.

to achive this i placed my heartbeat code to in manyplaces of my application but I could'not able to achive this becuase @postconstuct or consumer.poll() is not allowing to run the heartbeat code piece.

we are using apache kafka 2.12, What could be the right approach to implment this behaviour in my spring boot app? Is their any other api to do such post request to url, every few miuntes through out the application.? Writing background thread will resolve this issue, please share any? why postconstuct() or poll() is blocking other recurresive code to run. Please help me. Thanks in advance.

@SpringBootApplication
@EnableScheduling 
public class KafkaApp {

    @Autowired
    ConsumerService kcService;

    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }

    @PostConstruct
    public void init(){
        kcService.getMessagesFromKafka();
    }   
}

and 2 @Service Definitions:

import org.apache.kafka.clients.consumer.Consumer;

@Service public class ConsumerService { 
    final Consumer<Long, String> consumer = createConsumer();
    final int giveUp = 100; 
    int noRecordsCount = 0;

    while (true) {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
        if (consumerRecords.count()==0) { 
            noRecordsCount++; 
            if (noRecordsCount > giveUp) break; 
            else continue;
            }     
            consumerRecords.forEach(record -> { 
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
              record.key(), record.value(),
              record.partition(), record.offset()); 
              }); 
              consumer.commitAsync();
              } 
    }
@Scheduled(fixedDelay = 180000)
        public void heartbeat() {
            RestTemplate restTemplate = new RestTemplate();
            String url = "endpoint url";
            String requestJson = "{\"I am alive\":\"App name?\"}";
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);    
            HttpEntity<String> entity = new HttpEntity<String>(requestJson,headers);
            String answer = restTemplate.postForObject(url, entity, String.class);
            System.out.println(answer);
    }
1

1 Answers

0
votes

Add annotation to your main class like:

@SpringBootApplication
@EnableScheduling 
public class KafkaApp {
    @Autowired
    ConsumerService kcService;
    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }
    @PostConstruct
    public void init(){
        kcService.getMessagesFromKafka();
    }   

}

For more detail,spring-boot-task-scheduling-with-scheduled-annotation you can visit this link:

If you want to write a cron job for this purpose then in application.properties add this:

cron.expression=5 0 0 ? * * *   //Its means it'll execute every 5 sec

You can make cron expression online here is a link:cron-expression-generator-quartz.

And in your heart beat function write this above function like:

@Scheduled(cron = "${cron.expression}")
public void heartbeat() {
    //Your code here.
}