1
votes

I am attempting to define a simple message flow in Spring Integration that reads from one channel and then dumps the messages into a Kafka queue. To do this, I am using spring-integration-kafka. The problem is I get an EvaluationContext error I can't decipher.

Here is my configuration in XML:

<int:channel id="myStreamChannel"/>
<int:gateway id="myGateway" service-interface="com.myApp.MyGateway" >
    <int:method name="process" request-channel="myStreamChannel"/>
</int:gateway>
<int:channel id="activityOutputChannel"/>
<int:transformer input-channel="myStreamChannel" output-channel="activityOutputChannel" ref="activityTransformer"/>    
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="false"
                                    channel="activityOutputChannel"
                                    topic="my-test"
                                    message-key-expression="header.messageKey">
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

<task:executor id="taskExecutor"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="kafkaserver.com:9092"
                                          key-class-type="java.lang.String"
                                          value-class-type="java.lang.String"
                                          topic="my-test"
                                          key-encoder="stringEncoder"
                                          value-encoder="stringEncoder"
                                          compression-codec="snappy"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

When I run my application via Spring Boot, I get this exception:

Error creating bean with name 'org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: [Assertion failed] - this argument is required; it must not be null

This is the offending line in the stack trace:

at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit(KafkaProducerMessageHandler.java:68)

Here is what happens at Line 68:

Assert.notNull(this.evaluationContext);

So the EvaluationContext is null. I have no idea why.

By the way, when I replace the Kafka endpoint with a trivial stdout endpoint to print the message body, everything works fine.

Can you tell me what is wrong with my Kafka endpoint configuration that there is no EvaluationContext available?

1

1 Answers

3
votes

Your issue belongs to the version mismatch hell. The Spring Boot pulls that Spring Integration Core version, which doesn't support IntegrationEvaluationContextAware to populate the EvaluationContext in the KafkaProducerMessageHandler.

So, you should upgrade Spring Integration Kafka to most recent release: https://github.com/spring-projects/spring-integration-kafka/releases