I need to write a Java Code in order to start the kafka consumer from consuming the messages. Once the consumer is started from the command line. But not sure about the standard way to stop the consumer from processing.
In my local windows machine, I have written simple standalone producer and consumer. Now do I stop the standalone consumer from further processing using different code/script. ` private static final String TOPIC = "conftest"; private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private static Consumer<String, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
// Create the consumer using props.
consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public static void main(String args[]) throws Exception {
runConsumer();
}
private static void runConsumer(){
Consumer<String, String> consumer = createConsumer();
try{
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
}catch(WakeupException wue){
System.out.println("Wake Up Exception Occured");
wue.printStackTrace();
}
catch(Exception e){
e.printStackTrace();
}finally {
consumer.close();
}
}`
close()
method, that is only the way to stop consumer – Deadpool