0
votes

I have Kafka deployed in OpenShift and there is a topic that is created automatically when I send a message in my application with spring-kafka. Earlier everything was ok, but now I'm switching to using kafka transactions, and I can not get messages from kafka due to error:

[/] 2020-10-05 09:20:44.589 ERROR --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 1121:o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
    at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1102)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1080)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:911)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:727)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
    at org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:69)
    at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:106)
    ... 10 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
    at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
    at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:459)
    at brave.kafka.clients.TracingProducer.beginTransaction(TracingProducer.java:50)
    at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:63)
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:135)
    ... 13 common frames omitted

Also I can't get a message from a kafka topic using kafka-console-consumer, it just hangs. (I checked, topic really has some messages)

Kafka pods have following settings:

--override log.dir=/var/lib/kafka/log
--override log.dirs=/var/lib/kafka/topics
--override auto.create.topics.enable=true
--override auto.leader.rebalance.enable=true
--override background.threads=10
--override compression.type=producer
--override delete.topic.enable=true
--override leader.imbalance.check.interval.seconds=300
--override leader.imbalance.per.broker.percentage=10
--override log.flush.interval.messages=9223372036854775807
--override log.flush.offset.checkpoint.interval.ms=60000
--override log.flush.scheduler.interval.ms=9223372036854775807
--override log.retention.bytes=-1
--override log.retention.hours=168
--override log.roll.hours=168
--override log.roll.jitter.hours=0
--override log.segment.bytes=1073741824
--override log.segment.delete.delay.ms=60000
--override message.max.bytes=1000012
--override min.insync.replicas=1
--override num.io.threads=8
--override num.network.threads=3
--override num.recovery.threads.per.data.dir=1
--override num.replica.fetchers=1
--override offset.metadata.max.bytes=4096
--override offsets.commit.required.acks=-1
--override offsets.commit.timeout.ms=5000
--override offsets.load.buffer.size=5242880
--override offsets.retention.check.interval.ms=600000
--override offsets.retention.minutes=1440
--override offsets.topic.compression.codec=0
--override offsets.topic.num.partitions=50
--override offsets.topic.replication.factor=3
--override offsets.topic.segment.bytes=104857600
--override queued.max.requests=500
--override quota.consumer.default=9223372036854775807
--override quota.producer.default=9223372036854775807
--override replica.fetch.min.bytes=1
--override replica.fetch.wait.max.ms=500
--override replica.high.watermark.checkpoint.interval.ms=5000
--override replica.lag.time.max.ms=10000
--override replica.socket.receive.buffer.bytes=65536
--override replica.socket.timeout.ms=30000
--override request.timeout.ms=30000
--override socket.receive.buffer.bytes=102400
--override socket.request.max.bytes=104857600
--override socket.send.buffer.bytes=102400
--override unclean.leader.election.enable=true
--override zookeeper.session.timeout.ms=6000
--override zookeeper.set.acl=false
--override broker.id.generation.enable=true
--override connections.max.idle.ms=600000
--override controlled.shutdown.enable=true
--override controlled.shutdown.max.retries=3
--override controlled.shutdown.retry.backoff.ms=5000
--override controller.socket.timeout.ms=30000
--override default.replication.factor=1
--override fetch.purgatory.purge.interval.requests=1000
--override group.max.session.timeout.ms=300000
--override group.min.session.timeout.ms=6000
--override log.cleaner.backoff.ms=15000
--override log.cleaner.dedupe.buffer.size=134217728
--override log.cleaner.delete.retention.ms=86400000
--override log.cleaner.enable=true
--override log.cleaner.io.buffer.load.factor=0.9
--override log.cleaner.io.buffer.size=524288
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
--override log.cleaner.min.cleanable.ratio=0.5
--override log.cleaner.min.compaction.lag.ms=0
--override log.cleaner.threads=1
--override log.cleanup.policy=delete
--override log.index.interval.bytes=4096
--override log.index.size.max.bytes=10485760
--override log.message.timestamp.difference.max.ms=9223372036854775807
--override log.message.timestamp.type=CreateTime
--override log.preallocate=false
--override log.retention.check.interval.ms=300000
--override max.connections.per.ip=2147483647
--override num.partitions=1
--override producer.purgatory.purge.interval.requests=1000
--override replica.fetch.backoff.ms=1000
--override replica.fetch.max.bytes=1048576
--override replica.fetch.response.max.bytes=10485760
--override reserved.broker.max.id=1000

I guess there are some misconfiguration with ISR. Can somebody tell me what's wrong?

P.S. Since I use spring-kafka, I have following in my application.properties:

kafka:
  consumer:
    group-id: my-site
  producer:
    transaction-id-prefix: tx-
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: my.site.serializer.JacksonSerializer
    properties:
      interceptor.classes: my.site.interceptors.TokenInterceptor

and I send it using autowired KafkaTemplate.

1
@mike, added to postVladimir Shadrin

1 Answers

0
votes

I am not completely sure about the transaction API in Spring.

Reading the Exception stack trace it looks like there is a problem with starting a transaction on the producer site. This happens when the transaction-id is not unique. Therefore, you need to change the setting transaction-id-prefix: tx- to something else as you may have another producer using the same prefix.

To be able to read transactional data in the kafka-console-consumer, you need to make sure to set --property isolation.level=read_commited. Otherwise the kafka-console-consumer will only read uncommitted messages by default.