I am trying to learn about how to build IntegrationFlows as units, and join them up.
I set up a very simple processing integration flow:
IntegrationFlow processingFlow = f -> f
.<String>handle((p, h) -> process(p))
.log();
flowContext.registration(processingFlow)
.id("testProcessing")
.autoStartup(false)
.register();
Processing is very simple:
public String process(String process) {
return process + " has been processed";
}
Then I compose a flow from a source, using .gateway()
to join the source to the processing:
MessageChannel beginningChannel = MessageChannels.direct("beginning").get();
StandardIntegrationFlow composedFlow = IntegrationFlows
.from(beginningChannel)
.gateway(processingFlow)
.log()
.get();
flowContext.registration(composedFlow)
.id("testComposed")
.autoStartup(false)
.addBean(processingFlow)
.register();
Then I start the flow and send a couple of messages:
composedFlow.start();
beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());
The logging handler confirms the handle method has been called for the first message, but the main thread then sits idle, and the second message is never processed.
Is this not the correct way to compose integration flow from building blocks? Doing so with channels requires registering the channel as a bean, and I'm trying to do all this dynamically.
@Bean
forIntegrationFlow
doesn’t work for you? Any chances to share with me a simple project to play with? – Artem Bilan