I have been facing the exception below on the Kafka consumer side. Surprisingly, this issue is not consistent and an older version of the code (with the exact same configuration but some new unrelated features) works as expected. Could anyone help in determining what could be causing this?
[ERROR][938f3c68-f481-4224-b2c6-43af5fb27ada-0-C-1][org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:77) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2010) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1854) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1720) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1699) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.1.jar:2.7.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2367) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2003) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
... 8 more
Caused by: org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:145) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1983) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
... 8 more
My app uses the following:
- A custom listener class
com.mycompany.listener.KafkaBatchListener<K, V>
which implementsorg.springframework.kafka.listener.BatchAcknowledgingMessageListener<K, V>
and overridesonMessage(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment)
with a custom marker annotation@MyKafkaListener
- A custom container factory which extends
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V>
and configuressetConsumerFactory(consumerFactory)
,setBatchErrorHandler(errorHandler)
,setBatchListener(true)
andContainerProperties.setOnlyLogRecordMetadata(true)
. - A SpringBoot
@Configuration
class which implementsorg.springframework.kafka.annotation.KafkaListenerConfigurer
and is responsible for configuringorg.springframework.kafka.core.DefaultKafkaConsumerFactory<K, V>
,org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V>
andorg.springframework.kafka.config.MethodKafkaListenerEndpoint<String, String>
(used by@MyKafkaListener
) - Spring Kafka 2.7.1
Additional query:
Even though ContainerProperties.setOnlyLogRecordMetadata(true)
is set, the exception stacktrace still contains the full payload which I have omitted. Any idea why?
Thanks in advance!
UPDATE:
- KafkaBatchListener
package com.mycompany.listener;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
public class KafkaBatchListener<K, V> implements BatchAcknowledgingMessageListener<K, V> {
@Override
@com.mycompany.listener.KafkaListener
public void onMessage(final List<ConsumerRecord<K, V>> consumerRecords, final Acknowledgment acknowledgment) {
// process batch using MyService<K, V>.process(consumerRecords)
acknowledgment.acknowledge();
}
}
- Custom Annotation
package com.mycompany.listener;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
@Retention(RUNTIME)
@Target(METHOD)
public @interface KafkaListener {
}
- Listener Container Factory
package com.mycompany.factory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;
public class KafkaBatchListenerContainerFactory<K, V>
extends ConcurrentKafkaListenerContainerFactory<K, V> {
public KafkaBatchListenerContainerFactory(final DefaultKafkaConsumerFactory<K, V> consumerFactory,
final ListenerContainerRecoveringBatchErrorHandler errorHandler, final int concurrency) {
super.setConsumerFactory(consumerFactory);
super.setBatchErrorHandler(errorHandler);
super.setConcurrency(concurrency);
super.setBatchListener(true);
super.setAutoStartup(true);
final ContainerProperties containerProperties = super.getContainerProperties();
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
containerProperties.setOnlyLogRecordMetadata(true);
}
}
- Batch Error Handler
package com.mycompany.errorhandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.listener.RecoveringBatchErrorHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;
@Component
public class ListenerContainerRecoveringBatchErrorHandler extends RecoveringBatchErrorHandler {
public ListenerContainerRecoveringBatchErrorHandler(
@Value("${spring.kafka.consumer.properties.backOffMS:0}") final int backOffTimeMS,
@Value("${spring.kafka.consumer.properties.retries:3}") final int retries) {
super(new FixedBackOff(backOffTimeMS, retries));
}
}
- Kafka Listener Configurer
package com.mycompany.config;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;
import com.mycompany.factory.KafkaBatchListenerContainerFactory;
import com.mycompany.listener.KafkaBatchListener;
@Configuration
public class KafkaBatchListenerConfigurer<K, V> implements KafkaListenerConfigurer {
private final List<KafkaBatchListener<K, V>> listeners;
private final BeanFactory beanFactory;
private final ListenerContainerRecoveringBatchErrorHandler errorHandler;
private final int concurrency;
@Autowired
public KafkaBatchListenerConfigurer(final List<KafkaBatchListener<K, V>> listeners, final BeanFactory beanFactory,
final ListenerContainerRecoveringBatchErrorHandler errorHandler,
@Value("${spring.kafka.listener.concurrency:1}") final int concurrency) {
this.listeners = listeners;
this.beanFactory = beanFactory;
this.errorHandler = errorHandler;
this.concurrency = concurrency;
}
@Override
public void configureKafkaListeners(final KafkaListenerEndpointRegistrar registrar) {
final Method listenerMethod = lookUpBatchListenerMethod();
listeners.forEach(listener -> {
registerListenerEndpoint(listener, listenerMethod, registrar);
});
}
private void registerListenerEndpoint(final KafkaBatchListener<K, V> listener, final Method listenerMethod,
final KafkaListenerEndpointRegistrar registrar) {
// final Map<String, Object> consumerConfig = get ConsumerConfig from a custom provider;
registrar.setContainerFactory(createContainerFactory(consumerConfig));
registrar.registerEndpoint(createListenerEndpoint(listener, listenerMethod, consumerConfig));
}
private KafkaBatchListenerContainerFactory<K, V> createContainerFactory(final Map<String, Object> consumerConfig) {
final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig);
final KafkaBatchListenerContainerFactory<K, V> containerFactory = new KafkaBatchListenerContainerFactory<>(
consumerFactory, errorHandler, concurrency);
return containerFactory;
}
private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(final KafkaBatchListener<K, V> listener,
final Method listenerMethod, final Map<String, Object> consumerConfig) {
final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setId(UUID.randomUUID().toString());
endpoint.setBean(listener);
endpoint.setMethod(listenerMethod);
endpoint.setBeanFactory(beanFactory);
endpoint.setGroupId("my-group-id");
endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
// final String topicName = get TopicName for this key-value from a custom utility;
endpoint.setTopics(topicName);
final Properties properties = new Properties();
properties.putAll(consumerConfig);
endpoint.setConsumerProperties(properties);
return endpoint;
}
private Method lookUpBatchListenerMethod() {
return Arrays.stream(com.mycompany.listener.KafkaBatchListener.class.getMethods())
.filter(m -> m.isAnnotationPresent(com.mycompany.listener.KafkaListener.class))
.findAny()
.orElseThrow(() -> new IllegalStateException(
String.format("[%s] class should have at least 1 method with [%s] annotation.",
com.mycompany.listener.KafkaBatchListener.class.getCanonicalName(),
com.mycompany.listener.KafkaListener.class.getCanonicalName())));
}
}
@KafkaListener
if your listener already implementsBatchAcknowledgingMessageListener
?@KafkaListener
is intended for invoking POJO endpoints; if you implement the interface, just pass it into the container directly; please show your listener and configuration so I can understand more what you are trying to do. – Gary Russell@KafkaListener
and create listener beans for each key-value pair. These key-value pairs are tied to a single topic and can be processed by their own implementation of a key-value generic service. Question updated with code config. – RGB314