3
votes

I have started my zookeeper and Kafka server. I started my Kafka producer which sends 10 messages with topic 'xxx'. Then stopped my Kafka producer. Now I started my Kafka consumer and subscribed with topic 'xxx'. My consumer consumes those 10 messages sent by my Kafka producer, which is not running now. I need my Kafka consumer should only consume messages from running Kafka server. Is there any way to achieve this ? Following things in my consumer properties.

props.put("bootstrap.servers", "localhost:9092");
    String consumeGroup = "cg1";
    props.put("group.id", consumeGroup);
    props.put("enable.auto.commit", "true");
    props.put("auto.offset.reset", "earliest");
    props.put("auto.commit.interval.ms", "100");
    props.put("heartbeat.interval.ms", "3000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
2
You can discard any messages which are waiting and only take ones which arrive after your program has started. Kafka is designed to persist messages (you use Kafka because you want this feature), if you only want non durable messaging just about any other messaging solution might be a better choice.Peter Lawrey
Is there any flag to identify those messages waited for a long time ?Prasath
Remove this property props.put("auto.offset.reset", "earliest"); , its same as ConsumerConfig.AUTO_OFFSET_RESET_CONFIG.Keep only the latest property.Rambler
Yes "auto.offset.reset" is same as ConsumerConfig.AUTO_OFFSET_RESET_CONFIG. I did it. Now my properties contain "auto.offset.reset", "latest". Still same issue exists.Prasath
My final consumer props : bootstrap.servers=localhost:9092 group.id=cg1 enable.auto.commit=true auto.offset.reset=latest auto.commit.interval.ms=100 heartbeat.interval.ms=3000 session.timeout.ms=30000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializerPrasath

2 Answers

2
votes

Set the following property :

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

It tells the consumer to read only the latest messages , that is , the messages which were published after the consumer started.

0
votes

Please create a new topic and keep the property ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest". make sure to not commit the offset. i.e we should not use commitSync()

By default, receivers start consuming records from the last committed offset of each assigned partition. If a committed offset is not available, the offset reset strategy ConsumerConfig#AUTO_OFFSET_RESET_CONFIG configured for the KafkaConsumer is used to set the start offset to the earliest or latest offset on the partition.

I think in your case you are committing the offset or there ia committ offset available for given topic.