0
votes

I have an integration flow which needs to run one transformer or the other based on some condition and then post an http request with an outbound gateway.

@Bean
public IntegrationFlow messageFromKafka() {
    return flow -> flow
            .publishSubscribeChannel(s -> s
                    .subscribe(f1 -> f1
                            .<AttachmentEvent>filter(validator::isCondition1)
                            .transform(transformer1)
                    )
                    .subscribe(fl -> fl
                            .<AttachmentEvent>filter(validator::isCondition2)
                            .transform(transformer2)
                            .split()
                    )
            )
            .publishSubscribeChannel(s -> s
                    .subscribe(fl1 -> fl1
                            .transform(httpTransformer)
                            .<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
                                    .subFlowMapping("operation1", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
                                    )
                                    .subFlowMapping("operation2", sf -> sf
                                            .<String>filter(message -> isVendorStatusDescNotCancelled(message))
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
                                    )
                                    .subFlowMapping("operation3", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
                                    )
                            )
                    )
                    .subscribe(fl2 -> fl2
                            .handle(getKafkaHandler())
                    )
            );
}  

This is my attempt, however I am getting this error message "no output-channel or replyChannel header available" which I think I understand the why, but not sure how to achieve what I need.

Thanks.

1

1 Answers

0
votes

In the integration a conditional flows are handled using router pattern: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter

Although looks like your problem is fully not related to condition resolution.

I think each of your handle(getOAuth2Handler(...)) returns some value which you don't handle as a reply in those sub-flows. If you are not interested in that reply, consider to configure for those sub-flow a nullChannel after handle().