0
votes

I am getting the following issues when trying to get Message Driven Channel Adaptor working with Spring-Kafka 2.3+. Does anyone have any example code which would help me?

1. org.springframework.kafka.listener.config.ContainerProperties does not actually exist.

2. org.springframework.kafka.listener.ContainerProperties does exist but produces the below issue when trying to run.

Description:

An attempt was made to call a method that does not exist. The attempt was made from the following location:

org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.onInit(KafkaMessageDrivenChannelAdapter.java:318)

The following method did not exist:

org.springframework.kafka.listener.ContainerProperties.isDeliveryAttemptHeader()Z

3. This issue goes if you use kafka version 2.5 and above but is instead replaced by 2021-03-22 13:56:05.102-0400 org{local_sparta} WARN [data-pipeline,,,] [DP-ACCOUNT] [DPA] [] AnnotationConfigServletWebServerApplicationContext:main Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Bean instantiation via constructor failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration]: Constructor threw exception; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}

Tried both with a Java Version and an XML version below both give same error.

Java Version

@Configuration
@Slf4j
public class KafkaChannelConsumer {

    @Autowired
    MessageChannel preRouterLOB;

    @Value("${spring.kafka.bootstrap-servers:localhost9092}")
    private String bootstrapServers;

    @Value("${spring.kafka.topic:55iptest}")
    private String springIntegrationKafkaTopic;

    @Bean
    public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
                kafkaListenerContainer());
        kafkaMessageDrivenChannelAdapter.setOutputChannel(preRouterLOB);
        return kafkaMessageDrivenChannelAdapter;
    }

    @SuppressWarnings("unchecked")
    @Bean
    public ConcurrentMessageListenerContainer kafkaListenerContainer() {
        ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);

        return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(
                consumerFactory(), containerProps);
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean
    public Map consumerConfigs() {
        Map properties = new HashMap();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy");
        return properties;
    }
}

XML Version

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/integration/jms"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:int="http://www.springframework.org/schema/integration"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">


<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>

POM for issue 1 and 2

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>5.4.5</version>
        </dependency>

This includes version Spring-Kafka 2.3.6

POM for issue 3

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>5.4.5</version>
    <exclusions>
       <exclusion>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
       </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.7</version>
</dependency>
       
2
Thanks all this is resolved now the issues were to do with the spring boot version we were using under the covers which was not compatible with the spring-integration-kafka I was trying to use.Sankster

2 Answers

0
votes

<version>5.4.5</version>

This includes version Spring-Kafka 2.3.6

No it does not; the 5.4.x versions of spring-integration-kafka require 2.6.x; that method was added to the properties in 2.5.

See the project page for compatible versions.

https://spring.io/projects/spring-kafka

If you are using Spring Boot, it will bring in all of the right versions, and you should not specify versions at all in your pom.

For problem 3, it looks like you have a producer factory declared somewhere that is not compatible with the kafka template bean.

0
votes

It looks like you are messing around with different versions.

Since your project is based on the Spring Boot, you definitely have to rely on versions for dependencies it provides. Some version combinations are indeed not going to be compatible. For example that deliveryAttemptHeader property for the ContainerProperties has been introduced starting with 2.5:

/**
 * Set to true to populate the
 * {@link org.springframework.kafka.support.KafkaHeaders#DELIVERY_ATTEMPT} header when
 * the error handler or after rollback processor implements
 * {@code DeliveryAttemptAware}. There is a small overhead so this is false by
 * default.
 * @param deliveryAttemptHeader true to populate
 * @since 2.5
 */
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader) {

Just make sure that you rely on Spring Boot plugin and its dependency management. All the deps in Spring Boot are tested together. Only the problem you are going to have when you try to change deps int your own versions.