2
votes

I am using spring Kafka with spring boot 2.1.9 and confluentinc/cp-kafka:5.3.0(3 brokers clusterized) as Broker.

sometimes in the listener, if any message gets failed due to some exception the message keeps on repeating, The listener will receive again and again the same message and will throw exceptions.

And also due to this issue the other messages for this topic, will not be processed, the newer message itself paused, it's not listened by the listener

The below logs are coming after every time. (it came nearly 30 times same error for the same message)

I tried restarting the Kafka and also spring boot application still the same issue.

Logs:


2019-11-06 16:04:49.176  WARN [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@658b4494, txId=xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2]
2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to completeNormalFulfillmentItem 124

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.createAuditForCompleteFulfilmentFail(<generated>)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeDefaultFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1214)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1312)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
        at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.handleExistingTransaction(AbstractPlatformTransactionManager.java:430)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:354)
        at org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:69)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:106)
        ... 67 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
        at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:499)
        at brave.kafka.clients.TracingProducer.beginTransaction(TracingProducer.java:50)
        at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:103)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:160)
        ... 71 common frames omitted

2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to complete fulfillment item (completeFulfillmentItem) :124

com.xxxxx.model.exception.ProcessException: failed to completeNormalFulfillmentItem 124
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1316)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
        at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at 

consumer config

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:5}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:180000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:100}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Value("${kafka.max.records:100}")
    private Integer maxPollRecords;

    @Value("${kafka.max.poll.interval.time:500000}")
    private Integer maxPollIntervalMs;

    @Value("${kafka.max.session.timeout:200000}")
    private Integer sessionTimoutMs;

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);


    @Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }

    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);

        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        factory.setStatefulRetry(true);
        // NOTE: retryMaxAttempts should always +1 due to spring kafka bug
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic() + " value:"
                    + record.value());
            messageProducer.saveFailedMessage(record, exception);
        }, retryMaxAttempts + 1);

        factory.setErrorHandler(errorHandler);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // Disable the Auto Commit if required for testing
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

listener

@KafkaListener(id = TOPIC_FULFILLMENT_CREATE, topics = TOPIC_FULFILLMENT_CREATE)
    @Transactional(readOnly = false)
    public void processCreateRequest(@Payload String message) throws IOException {
        ComponentWorkflowModel componentWorkflowModel = JsonUtil.toObject(message, ComponentWorkflowModel.class);
        componentWorkflowStarter.processCreateRequest(componentWorkflowModel);
    }
  1. Is there any solution to stop listening to the error message? I used seekErrorHandler It works sometimes only, is there any config issue?

  2. is there any problem with Kafka with spring Kafka?

  3. How to resolve this issue?

1

1 Answers

1
votes

Looks like incorrect configuration.

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
...
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)

You are trying to start a new transaction when one already exists.

You have injected the chained transaction manager into the listener container so there is already a transaction.

It looks like you have a @Transactional annotation that also references the same transaction manager, perhaps with propagation REQUIRES_NEW. You cannot use that with Kafka unless you have a different transactional.id - but that is probably not what you want anyway.