0
votes

I need to write a Java Code in order to start the kafka consumer from consuming the messages. Once the consumer is started from the command line. But not sure about the standard way to stop the consumer from processing.

In my local windows machine, I have written simple standalone producer and consumer. Now do I stop the standalone consumer from further processing using different code/script. ` private static final String TOPIC = "conftest"; private final static String BOOTSTRAP_SERVERS = "localhost:9092";

private static Consumer<String, String> createConsumer() {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    // Create the consumer using props.
    consumer = new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(TOPIC));
    return consumer;
}

public static void main(String args[]) throws Exception {
    runConsumer();
}

private static void runConsumer(){
    Consumer<String, String> consumer = createConsumer();
    try{
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); 
            }
            consumer.commitAsync();
        }
    }catch(WakeupException wue){
        System.out.println("Wake Up Exception Occured");
        wue.printStackTrace();
    }
    catch(Exception e){
        e.printStackTrace();
    }finally {
        consumer.close();
    }
}`
1
No not to start. But to stop the consumer which is started.Sachin KB
I have written a standalone java producer to start. My question is how do I stop that using a different Java process.Sachin KB
can you post the code here it's not clear for me sorryDeadpool
Above the sample standalone code I used to start the consumer in Eclipse.Sachin KB
are you looking for close() method, that is only the way to stop consumerDeadpool

1 Answers

1
votes

Use close method to close the KafkaConsumer Just call consumer.close();

public void close()

Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. If auto-commit is enabled, this will commit the current offsets if possible within the default timeout. See close(long, TimeUnit) for details. Note that wakeup() cannot be used to interrupt close.

public void close(long timeout, java.util.concurrent.TimeUnit timeUnit) With specified time

Tries to close the consumer cleanly within the specified timeout. This method waits up to timeout for the consumer to complete pending commits and leave the group. If auto-commit is enabled, this will commit the current offsets if possible within the timeout. If the consumer is unable to complete offset commits and gracefully leave the group before the timeout expires, the consumer is force closed. Note that wakeup() cannot be used to interrupt close.