0
votes

I am using the Logstash Kafka input plugin to read messages from a topic. I was earlier able to start new consumers-belonging to new consumer groups and by setting auto_offset_reset=earliest was able to consume messages from the start of the topic.

Plugin configuration:

input {     
    kafka {         
    bootstrap_servers => "localhost:9092"
        topics => ["test_topic"]
        group_id => "new_consumer"
        client_id => "new_consumer"
        consumer_threads => 1
        auto_offset_reset => "earliest"   
  } 
}

But now I notice a strange behaviour. Even though this is a new consumer belonging to a new consumer group and auto_offset_reset is set to 'earliest', I am unable to consumer any messages.

Enabled debug logs are following is the behaviour: It clearly shows that the consumer has no previous offset and suddenly the partition offset is fetched and the consumer uses this and sets its new offset (Please note: 36387 messages were read earlier from the topic and hence the number in the logs below)

[2016-12-22T16:45:13,454][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group new_consumer with generation 1

[2016-12-22T16:45:13,455][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [test_topic-0] for group new_consumer

[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer fetching committed offsets for partitions: [test_topic-0]

[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer has no committed offset for partition test_topic-0

[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Resetting offset for partition test_topic-0 to earliest offset.

[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient] Initiating connection to node 0 at localhost:9092.

[2016-12-22T16:45:13,657][DEBUG][logstash.instrument.collector] Collector: Sending snapshot to observers {:created_at=>2016-12-22 16:45:13 -0800}

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-sent

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-received

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.latency

[2016-12-22T16:45:13,742][DEBUG][org.apache.kafka.clients.NetworkClient] Completed connection to node 0

[2016-12-22T16:45:13,901][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Fetched offset 36387 for partition test_topic-0

[2016-12-22T16:45:18,050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0

[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0

Can anyone tell me why we are seeing this behaviour?

1
Which Logstash and Kafka versions are you using?Val
Sorry for not adding this earlier. Logstash 5.0.2 and Kafka 2.0.10minion
2.10 is the Scala version. Currently, Kafka is at the 0.10.1.0 version, is it the one you're using?Val
Yes you are right. Scala 2.10 and kafka v0.10.0.1minion

1 Answers

1
votes

Could the old messages have been deleted based on the configured retention period? It could be that offset 36387 is the earliest offset and all earlier messages have been expired. The default retention period is 7 days.