Below is my spring boot kafka consumer application to read data from kafka topic. In this application we are planning to implement heartbeat funtionally to post its heartbeat to url using @schduling annotaion to know its alive and running(which loads my json input data to db). purpose of this post request is to update the status on application monitoring tool.
to achive this i placed my heartbeat code to in manyplaces of my application but I could'not able to achive this becuase @postconstuct or consumer.poll() is not allowing to run the heartbeat code piece.
we are using apache kafka 2.12, What could be the right approach to implment this behaviour in my spring boot app? Is their any other api to do such post request to url, every few miuntes through out the application.? Writing background thread will resolve this issue, please share any? why postconstuct() or poll() is blocking other recurresive code to run. Please help me. Thanks in advance.
@SpringBootApplication
@EnableScheduling
public class KafkaApp {
@Autowired
ConsumerService kcService;
public static void main(String[] args) {
SpringApplication.run(KafkaApp.class, args);
}
@PostConstruct
public void init(){
kcService.getMessagesFromKafka();
}
}
and 2 @Service Definitions:
import org.apache.kafka.clients.consumer.Consumer; @Service public class ConsumerService { final Consumer<Long, String> consumer = createConsumer(); final int giveUp = 100; int noRecordsCount = 0; while (true) { final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000); if (consumerRecords.count()==0) { noRecordsCount++; if (noRecordsCount > giveUp) break; else continue; } consumerRecords.forEach(record -> { System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset()); }); consumer.commitAsync(); } }
@Scheduled(fixedDelay = 180000)
public void heartbeat() {
RestTemplate restTemplate = new RestTemplate();
String url = "endpoint url";
String requestJson = "{\"I am alive\":\"App name?\"}";
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<String>(requestJson,headers);
String answer = restTemplate.postForObject(url, entity, String.class);
System.out.println(answer);
}