I have an integration flow which needs to run one transformer or the other based on some condition and then post an http request with an outbound gateway.
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}
This is my attempt, however I am getting this error message "no output-channel or replyChannel header available" which I think I understand the why, but not sure how to achieve what I need.
Thanks.