i am using Spring Boot 2.2 together with a Kafka cluster (bitnami helm chart). And get some pretty strange behaviour.
Having some spring boot app with several consumers on several topics.
Calling kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe -group my-app gives:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-app event.topic-a 0 2365079 2365090 11 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app event.topic-a 1 2365080 2365091 11 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app batch.topic-a 0 278363 278363 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-a 1 278362 278362 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-b 0 1434 1434 0 consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47 consumer-5
my-app event.topic-b 0 2530 2530 0 consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47 consumer-6
my-app batch.topic-c 0 1779 1779 0 consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47 consumer-1
my-app event.topic-c 0 12308 13502 1194 - - -
Calling it again gives
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-app event.topic-a 0 2365230 2365245 15 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app event.topic-a 1 2365231 2365246 15 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app batch.topic-a 0 278363 278363 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-a 1 278362 278362 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-b 0 1434 1434 0 consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47 consumer-5
my-app event.topic-b 0 2530 2530 0 consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47 consumer-6
my-app batch.topic-c 0 1779 1779 0 consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47 consumer-1
my-app event.topic-c 0 12308 13505 1197 consumer-2-d52e2b96-f08c-4247-b827-4464a305cb20 /10.244.3.47 consumer-2
As you could see the the consumer for event.topic-c is now there but laggs 1197 entries. The app itself reads from the topic, but always the same events (looks like the amount of the lag) but the offset is not changed. I get no errors or log entries, eighter on kafka or on spring boot. All i have is for that specific topic the same events are processed again and again ..... all the other topics on the app are working correctly.
Here is the client config:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = sap-integration
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
Any idea.. i am a litte bit lost ..
Edit: Spring config is pretty standard:
configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = MyJsonDeserializer::class.java
configProps[JsonDeserializer.TRUSTED_PACKAGES] = "*"
Here are some example from the logs:
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603361, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584589, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603362, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584635, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
....
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
while consumer is laging
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-app topic-c 0 37603363 37720873 117510 consumer-3-2b8499c0-7304-4906-97f8-9c0f6088c469 /10.244.3.64 consumer-3
No error, no warning .. just no more messages ....
Thx
org.apache.kafka
too - but I would start with just Spring because kafka is quite verbose. – Gary Russell