I am using the Logstash Kafka input plugin to read messages from a topic. I was earlier able to start new consumers-belonging to new consumer groups and by setting auto_offset_reset=earliest was able to consume messages from the start of the topic.
Plugin configuration:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test_topic"]
group_id => "new_consumer"
client_id => "new_consumer"
consumer_threads => 1
auto_offset_reset => "earliest"
}
}
But now I notice a strange behaviour. Even though this is a new consumer belonging to a new consumer group and auto_offset_reset is set to 'earliest', I am unable to consumer any messages.
Enabled debug logs are following is the behaviour: It clearly shows that the consumer has no previous offset and suddenly the partition offset is fetched and the consumer uses this and sets its new offset (Please note: 36387 messages were read earlier from the topic and hence the number in the logs below)
[2016-12-22T16:45:13,454][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group new_consumer with generation 1
[2016-12-22T16:45:13,455][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [test_topic-0] for group new_consumer
[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer fetching committed offsets for partitions: [test_topic-0]
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer has no committed offset for partition test_topic-0
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Resetting offset for partition test_topic-0 to earliest offset.
[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient] Initiating connection to node 0 at localhost:9092.
[2016-12-22T16:45:13,657][DEBUG][logstash.instrument.collector] Collector: Sending snapshot to observers {:created_at=>2016-12-22 16:45:13 -0800}
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-sent
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-received
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.latency
[2016-12-22T16:45:13,742][DEBUG][org.apache.kafka.clients.NetworkClient] Completed connection to node 0
[2016-12-22T16:45:13,901][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Fetched offset 36387 for partition test_topic-0
[2016-12-22T16:45:18,050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0
[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0
Can anyone tell me why we are seeing this behaviour?