1
votes

I am working on spring boot app using spring-cloud-stream:1.3.0.RELEASE, spring-cloud-stream-binder-kafka:1.3.0.RELEASE. I am using spring integration dsl to split the lines in a file and beanio to convert lines to json, requirement is to write successful json messages to a kafka topic and write error messages to different kafka topic . Below is the configuration in application.yml.

spring:
  cloud:
    stream:
      kafka:
        binder:
          autoAddPartitions: true
      bindings.webmarketbasket:
        destination: webmarketbasket
        group: usproductrecommendationsgroup
        producer:
          partitionCount: 5
          errorChannelEnabled: true
      bindings.webmarketbasket.errors:
        destination: webmarketbasketerrors
        group: usproductrecommendationsgroup
        producer:
          partitionCount: 5
      bindings.error:
        destination: errorchannel
        group: usproductrecommendationsgroup
        producer:
          partitionCount: 5

I noticed a pull request https://github.com/spring-cloud/spring-cloud-stream/pull/1039 in spring-cloud-stream-binder-kafka:1.3.0.RELEASE, which creates PublishSubscribeChannel when errorChannelEnabled is set to true, also there is a testcase which checks whether bean is created for producer error channel.

when i check spring actuator url in my app http://localhost:8195/beans, "errorChannel" bean for global error channel is created, but "webmarketbasket.errors" bean is not created. when there is "org.springframework.messaging.MessageHandlingException", error message is sent to "errorchannel" kafka topic and stops processing remaining lines from the file. Kafka topic "webmarketbasketerrors" is never created. Can you help, please let me know if I have missed anything.

1

1 Answers

0
votes

You seem to be confusing two things.

Spring Integration Error Channel Support was for when you want to publish something to the global errorChannel which is bound to a destination via spring.cloud.stream.bindings.error.destination=myErrors.

The new support in 1.3 creates an error channel for each listener; it is pub/sub and it is bridged to the global errorChannel. Hence the message will also be published to ...bindings.error.destination (if configured; in your case, the destination is called errorChannel). The bean name for the dedicated error channel is webmarketbasket. usproductrecommendationsgroup.errors. There is no kafka topic bound to that channel by the framework.

You can consume from either the dedicated or global error channel if you want to handle the error yourself.