0
votes

I'm getting the following exception when processing fails with @StreamListener and Spring Cloud Stream Kafka binder tries to re-route messages to DLQ. Using Spring Cloud Edgware.SR5.

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'my.message.destination.my.message.group.errors'; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.integration.support.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:155) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.2.RELEASE.jar:?]
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.2.RELEASE.jar:?]
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar:?]
        at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.8.RELEASE.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_192]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_192]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:360) ~[spring-cloud-stream-binder-kafka-1.3.3.RELEASE.jar:1.3.3.RELEASE]
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        ... 19 more

Tried producing messages from kafka-console-producer, and figured out that this only happens when Kafka key is used.

Following is the relevant code snippets:

MyMessageConsumer.java:

  @StreamListener(MyMessageSink.MY_MESSAGE_INPUT)
  @Transactional
  public void consumeMyMessage(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String myMessageId, @Payload MyMessage myMessage) {
    if (true) {
      throw new RuntimeException("MockRuntimeException");
    }
  }

application.properties (for consumer):

spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.bindings.my-message-in.destination=my.message.destination
spring.cloud.stream.bindings.my-message-in.group=my.message.group
spring.cloud.stream.bindings.my-message-in.content-type=application/json
spring.cloud.stream.bindings.my-message-in.consumer.headerMode=raw
spring.cloud.stream.bindings.my-message-in.consumer.partitioned=true
spring.cloud.stream.kafka.bindings.my-message-in.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqName=my.message.destination.dlq
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqProducerProperties.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqProducerProperties.configuration.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.my-message-in.consumer.maxAttempts=3

application.properties (for producer):

spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.bindings.my-message-out.destination=my.message.destination
spring.cloud.stream.bindings.my-message-out.content-type=application/json
spring.cloud.stream.bindings.my-message-out.producer.headerMode=raw
spring.cloud.stream.bindings.my-message-out.producer.partitionKeyExtractorClass=com.example.message.TransactionKeyExtractor
spring.cloud.stream.bindings.my-message-out.producer.partitionCount=80

Is there any way to get the DLQ re-routing to work with a message key?

1

1 Answers

0
votes

Dead lettering with 1.3.x only supports Spring Cloud Stream's default key/value type (byte[]/byte[]).

Try upgrading to a more recent version.