0
votes

pom.xml

<dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
         </dependency>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        </dependencies>
        <dependencyManagement>
        <dependencies>
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Edgware.SR3</version>
                <type>pom</type>
                <scope>import</scope>
          </dependency>
          </dependencies>
          </dependencyManagement>

@Component
public class QueueConsumer {

    /** The Constant LOG. */
    public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);

    /** The processor. */
    @Autowired
    private IMessageProcessor processor;

    /**
     * Consume.
     *
     * @param message the message
     */
    @StreamListener(value = OrderEventSink.ORDER_EVENT)
    public void consume(Message<String> message) {
        try {
            processor.process(message);
        } catch (MessageProcessingFailedException e) {
            LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
            throw e;
        }
    }
}
  1. I am using spring cloud stream to read messages from a kafka topic. The message is being read from queue, processed and if it fails while processing, the message should got to error queue configured but gives the following error.
  2. The exception comes when extracting headers from the message, what could be the best possible way to fix this?
  3. Kafka version is 1.0 and kafka client is 2.11-1.0

application.properties

     spring.cloud.stream.bindings.orderEvent.destination=orderEvents
     spring.cloud.stream.bindings.orderEvent.content- 
     type=application/json
     spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
     spring.cloud.stream.bindings.orderEvent.consumer.back-off- 
     multiplier=5
     spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial- 
     interval=60000
     spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
     spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
     spring.cloud.stream.bindings.kafka.binder.brokers=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
     spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     enableDlq=true
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqName=dead-queue
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.key.
     serializer=org.apache.kafka.common.serialization.StringSerializer
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.value.
     serializer=org.apache.kafka.common.serialization.StringSerializer

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'scm-orderEvents.scm-orderEvents-consumer.errors'; nested exception is java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:207) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:191) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$200(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162] Caused by: java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:380) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] ... 16 common frames omitted Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_162] at java.lang.String.(String.java:425) ~[na:1.8.0_162] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.oldExtractHeaders(EmbeddedHeaderUtils.java:154) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:115) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:107) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:368) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] ... 20 common frames omitted

1

1 Answers

1
votes

This is a bug in the 1.3.2.RELEASE of the kafka binder; it is fixed on master (1.3.3.BUILD-SNAPSHOT).

BTW, the best solution is to use Spring Boot 2.0.1 and SCSt Emlhurst.RELEASE (pulled in by cloud FINCHLEY - currently at M9 milestone).

These versions have native support for Kafka 1.0.

You might also have some success moving to the kafka11 binder artifact (1.3.0) which is compatible with SCSt 1.3.x, as discussed on the Wiki.