I am working on spring boot app using spring-cloud-stream:1.3.0.RELEASE, spring-cloud-stream-binder-kafka:1.3.0.RELEASE. I am using spring integration dsl to split the lines in a file and beanio to convert lines to json, requirement is to write successful json messages to a kafka topic and write error messages to different kafka topic . Below is the configuration in application.yml.
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
bindings.webmarketbasket:
destination: webmarketbasket
group: usproductrecommendationsgroup
producer:
partitionCount: 5
errorChannelEnabled: true
bindings.webmarketbasket.errors:
destination: webmarketbasketerrors
group: usproductrecommendationsgroup
producer:
partitionCount: 5
bindings.error:
destination: errorchannel
group: usproductrecommendationsgroup
producer:
partitionCount: 5
I noticed a pull request https://github.com/spring-cloud/spring-cloud-stream/pull/1039 in spring-cloud-stream-binder-kafka:1.3.0.RELEASE, which creates PublishSubscribeChannel when errorChannelEnabled is set to true, also there is a testcase which checks whether bean is created for producer error channel.
when i check spring actuator url in my app http://localhost:8195/beans, "errorChannel" bean for global error channel is created, but "webmarketbasket.errors" bean is not created. when there is "org.springframework.messaging.MessageHandlingException", error message is sent to "errorchannel" kafka topic and stops processing remaining lines from the file. Kafka topic "webmarketbasketerrors" is never created. Can you help, please let me know if I have missed anything.