0
votes

I am trying to learn about how to build IntegrationFlows as units, and join them up.

I set up a very simple processing integration flow:


IntegrationFlow processingFlow = f -> f
                .<String>handle((p, h) -> process(p))
                .log();

        flowContext.registration(processingFlow)
                .id("testProcessing")
                .autoStartup(false)
                .register();

Processing is very simple:

   public String process(String process) {
            return process + " has been processed";
    }

Then I compose a flow from a source, using .gateway() to join the source to the processing:

  MessageChannel beginningChannel = MessageChannels.direct("beginning").get();

        StandardIntegrationFlow composedFlow = IntegrationFlows
                .from(beginningChannel)
                .gateway(processingFlow)
                .log()
                .get();

        flowContext.registration(composedFlow)
                .id("testComposed")
                .autoStartup(false)
                .addBean(processingFlow)
                .register();

Then I start the flow and send a couple of messages:

 composedFlow.start();

 beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
 beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());

The logging handler confirms the handle method has been called for the first message, but the main thread then sits idle, and the second message is never processed.

Is this not the correct way to compose integration flow from building blocks? Doing so with channels requires registering the channel as a bean, and I'm trying to do all this dynamically.

1
Is there a point to do that programmatically? Why regular @Bean for IntegrationFlow doesn’t work for you? Any chances to share with me a simple project to play with?Artem Bilan
I'm exploring the DSL and dynamic flow creation - there might be workarounds, but I prefer the syntax (certainly it's easier for those on the team unfamiliar with Spring to follow).bmarcj

1 Answers

1
votes

It must be logAndReply() in the processingFlow. See their JavaDocs for difference. The log() in the end of flow makes it one-way. That’s why you are blocked since gateway waits for reply, but there is no one according your current flow definition. Unfortunately we can’t determine that from the framework level: there might be cases when you indeed don’t return according some routing or filtering logic. The gateway can be configured with a reply timeout. By default it is an infinite.