2
votes

i am using Spring Boot 2.2 together with a Kafka cluster (bitnami helm chart). And get some pretty strange behaviour.

Having some spring boot app with several consumers on several topics.

Calling kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe -group my-app gives:

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app event.topic-a                 0          2365079         2365090         11              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app event.topic-a                 1          2365080         2365091         11              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app batch.topic-a                 0          278363          278363          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-a                 1          278362          278362          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-b                 0          1434            1434            0               consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47    consumer-5
my-app event.topic-b                 0          2530            2530            0               consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47    consumer-6
my-app batch.topic-c                 0          1779            1779            0               consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47    consumer-1
my-app event.topic-c                 0          12308           13502           1194            -                                               -               - 

Calling it again gives

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app event.topic-a                 0          2365230         2365245         15              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app event.topic-a                 1          2365231         2365246         15              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app batch.topic-a                 0          278363          278363          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-a                 1          278362          278362          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-b                 0          1434            1434            0               consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47    consumer-5
my-app event.topic-b                 0          2530            2530            0               consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47    consumer-6
my-app batch.topic-c                 0          1779            1779            0               consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47    consumer-1
my-app event.topic-c                 0          12308           13505           1197            consumer-2-d52e2b96-f08c-4247-b827-4464a305cb20 /10.244.3.47    consumer-2

As you could see the the consumer for event.topic-c is now there but laggs 1197 entries. The app itself reads from the topic, but always the same events (looks like the amount of the lag) but the offset is not changed. I get no errors or log entries, eighter on kafka or on spring boot. All i have is for that specific topic the same events are processed again and again ..... all the other topics on the app are working correctly.

Here is the client config:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = sap-integration
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

Any idea.. i am a litte bit lost ..

Edit: Spring config is pretty standard:

configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = MyJsonDeserializer::class.java
configProps[JsonDeserializer.TRUSTED_PACKAGES] = "*"

Here are some example from the logs:

2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603361, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584589, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603362, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584635, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
....
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}

while consumer is laging

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app          topic-c              0          37603363        37720873       117510          consumer-3-2b8499c0-7304-4906-97f8-9c0f6088c469 /10.244.3.64    consumer-3

No error, no warning .. just no more messages ....

Thx

2
You need to show your Spring configuration. Also turn on DEBUG logging to observe polling and offset committing.Gary Russell
DEBUG on clinet side or on kafka side?IEE1394
Client side, initially.Gary Russell
okay .hopefully this is correct: "org.springframework.kafka"IEE1394
You should start with that - we log the records received and when offsets are committed. If that doesn't help your investigation, you will then need to enable org.apache.kafka too - but I would start with just Spring because kafka is quite verbose.Gary Russell

2 Answers

1
votes

You need to look for logs like this...

2019-11-01 16:33:31.825 INFO 35182 --- [ kgh1231-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=kgh1231] (Re-)joining group

...

2019-11-01 16:33:31.872 INFO 35182 --- [ kgh1231-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [kgh1231-0, kgh1231-2, kgh1231-1, kgh1231-4, kgh1231-3]

...

2019-11-01 16:33:31.897 DEBUG 35182 --- [ kgh1231-0-C-1] essageListenerContainer$ListenerConsumer : Received: 10 records

...

2019-11-01 16:33:31.902 DEBUG 35182 --- [ kgh1231-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo1, headers={kafka_offset=80, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3d00c543, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kgh1231, kafka_receivedTimestamp=1572640411869}]]

...

2019-11-01 16:33:31.906 DEBUG 35182 --- [ kgh1231-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo5, headers={kafka_offset=61, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3d00c543, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=3, kafka_receivedTopic=kgh1231, kafka_receivedTimestamp=1572640411870}]]

2019-11-01 16:33:31.907 DEBUG 35182 --- [ kgh1231-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {kgh1231-0=OffsetAndMetadata{offset=82, metadata=''}, kgh1231-2=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-1=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-4=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-3=OffsetAndMetadata{offset=62, metadata=''}}

2019-11-01 16:33:31.908 DEBUG 35182 --- [ kgh1231-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {kgh1231-0=OffsetAndMetadata{offset=82, metadata=''}, kgh1231-2=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-1=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-4=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-3=OffsetAndMetadata{offset=62, metadata=''}}

If you don't see anything like that then your consumer is not configured properly.

If you can't figure it out, post your log someplace like PasteBin.

0
votes

Mysteriously fixed this issue by doing all of that:

  • Upgrade Kafka to newer Version
  • Upgrade Spring Boot to newer Version
  • Improve performance of the Software
  • Switch to Batch processing
  • Add Health Check and combine it with liveness probe

It runns now for more than a week without having that error again.