As I understand from Kafka 0.9.0.1, offsets are managed in Kafka's topic. Strangely, when the consumer group dies, the offsets are deleted from the topic. Next time I start with the same consumerGroupId - the offsets are reset back from the earliest.
Is this expected? I would want the offsets to remain even if the consumer group has died completely and continue from the offsets where it left off when it comes back on.
Here are the settings in consumer.config
:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = classfk.el.client.mappers.kafka.KafkaDeserializer
group.id =testConsumerId
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [k1:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =testAppId1463671497716
ssl.endpoint.identification.algorithm = null
key.deserializer = classorg.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
I see in the logs :
20:55:00.024 DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing leader SyncGroup (SYNC_GROUP: {group_id=testConsumerId,generation_id=1,member_id=testAppId1463671497716-d1ce3669-b451-4197-a5dd-39dd38c61102,group_assignment=[{member_id=testAppId1463671497716-d1ce3669-b451-4197-a5dd-39dd38c61102,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]}]}) to coordinator 2147483647 20:55:00.379 DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful sync group response for group testConsumerId: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]} 20:55:00.431 DEBUG o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [sampleEntity-1, sampleEntity-0] 20:55:00.432 DEBUG o.a.k.c.c.i.ConsumerCoordinator - Fetching committed offsets for partitions: [sampleEntity-1, sampleEntity-0] 20:55:00.605 DEBUG o.a.k.c.c.i.ConsumerCoordinator - No committed offset for partition sampleEntity-1 20:55:00.606 o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition sampleEntity-1 to earliest offset. 20:55:00.732 o.a.k.c.consumer.internals.Fetcher - Fetched offset 0 for partition sampleEntity-1 20:55:00.732 o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition sampleEntity-0 to the committed offset 25
On the server logs, when the consumer startsup and dies - [2016-05-19
16:09:50,113] INFO [GroupCoordinator 0]: Preparing to restabilize group testConsumerId with old generation 0 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:09:50,114] INFO [GroupCoordinator 0]: Stabilized group testConsumerId generation 1 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:09:50,125] INFO [GroupCoordinator 0]: Assignment received from leader for group testConsumerId for generation 1 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:09:50,158] TRACE [Group Metadata Manager on Broker 0]: Getting offsets Vector([sampleEntity,1], [sampleEntity,0]) for group testConsumerId. (kafka.coordinator.GroupMetadataManager) [2016-05-19 16:10:38,158] TRACE [GroupCoordinator 0]: Member testAppId1463674187858-ea8c9c30-4c9d-4b52-bfef-44c299442d45 in group testConsumerId has failed (kafka.coordinator.GroupCoordinator) [2016-05-19 16:10:38,158] INFO [GroupCoordinator 0]: Preparing to restabilize group testConsumerId with old generation 1 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:10:38,158] TRACE [Group Metadata Manager on Broker 0]: Marking group testConsumerId as deleted. (kafka.coordinator.GroupMetadataManager) [2016-05-19 16:10:38,159] INFO [GroupCoordinator 0]: Group testConsumerId generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)
After it is dead and removed, I can't access the earlier offsets.