0
votes

I'm using spring-cloud-stream kafka binder with schema registry. (not kakfa streams) What I'm trying to do is when un-deserializable message has got into an input topic, send the un-deserializable message to dlq.

So I tried like below configuration but spring cloud stream app keeps retrying infinitely and says

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
  kafka:
    binder:
      brokers: localhost:9092
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: input-topic-dlq
          autoCommitOnError: true
          autoCommitOffset: true
    default:
      consumer:
        configuration:
          schema.registry.url: http://localhost:8081
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

What am I doing wrong? Please help.

1

1 Answers

1
votes

Please don't ask the same question in multiple places, it's a waste of your time and ours; as I already answered on Gitter:

This error occurs too far down the stack for spring-cloud-stream to help with it. You need to use a ListenerContainerCustomizer @Bean to configure a SeekToCurrentErrorHandler with a DeadLetterPublishingRecoverer and configure an ErrorHandlingDeserializer.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

That said, Stack Overflow is a better medium for questions like this.