I am getting the following issues when trying to get Message Driven Channel Adaptor working with Spring-Kafka 2.3+. Does anyone have any example code which would help me?
1. org.springframework.kafka.listener.config.ContainerProperties does not actually exist.
2. org.springframework.kafka.listener.ContainerProperties does exist but produces the below issue when trying to run.
Description:
An attempt was made to call a method that does not exist. The attempt was made from the following location:
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.onInit(KafkaMessageDrivenChannelAdapter.java:318)
The following method did not exist:
org.springframework.kafka.listener.ContainerProperties.isDeliveryAttemptHeader()Z
3. This issue goes if you use kafka version 2.5 and above but is instead replaced by 2021-03-22 13:56:05.102-0400 org{local_sparta} WARN [data-pipeline,,,] [DP-ACCOUNT] [DPA] [] AnnotationConfigServletWebServerApplicationContext:main Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Bean instantiation via constructor failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration]: Constructor threw exception; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
Tried both with a Java Version and an XML version below both give same error.
Java Version
@Configuration
@Slf4j
public class KafkaChannelConsumer {
@Autowired
MessageChannel preRouterLOB;
@Value("${spring.kafka.bootstrap-servers:localhost9092}")
private String bootstrapServers;
@Value("${spring.kafka.topic:55iptest}")
private String springIntegrationKafkaTopic;
@Bean
public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(preRouterLOB);
return kafkaMessageDrivenChannelAdapter;
}
@SuppressWarnings("unchecked")
@Bean
public ConcurrentMessageListenerContainer kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);
return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(
consumerFactory(), containerProps);
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map properties = new HashMap();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy");
return properties;
}
}
XML Version
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
POM for issue 1 and 2
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.4.5</version>
</dependency>
This includes version Spring-Kafka 2.3.6
POM for issue 3
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.4.5</version>
<exclusions>
<exclusion>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>