2
votes

I'm trying to test some stuff with spring-integration using the DSL. This is only a test so far, the flow is simple:

  • create some messages
  • process (log) them in parallel
  • aggregate them
  • log the aggregate

Apart from the aggregator, it is working fine:

@Bean
public IntegrationFlow integrationFlow() {
    return IntegrationFlows
            .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS)))
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .handle((GenericHandler<Integer>) (payload, headers) -> {
                System.out.println("\t delaying message:" + payload + " on thread "
                        + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    System.err.println(e.getMessage());
                }
                return payload;
            })
            .handle(this::logMessage)
            .aggregate(a ->
                    a.releaseStrategy(g -> g.size()>10)
                     .outputProcessor(g ->
                             g.getMessages()
                                     .stream()
                                     .map(e -> e.getPayload().toString())
                                     .collect(Collectors.joining(",")))

                     )
            .handle(this::logMessage)
            .get();

}

If I leave out the .aggregate(..), part, the sample is working.

Withe the aggregator, I get the following exception:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.faboo.test.ParallelIntegrationApplication$$Lambda$9/1341404543@6fe1b4fb) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

As far as I understand, it complains that there is no output from the aggregator?

The full source can be found here: hithub

1

1 Answers

6
votes

The problem is the handle() before the aggregator - it produces no result so there is nothing to aggregate...

        .handle(this::logMessage)
        .aggregate(a ->

Presumably logMessage(Message<?>) has a void return type.

If you want to log before the aggregator, use a wireTap, or change logMessage to return the Message<?> after logging.

        .wireTap(sf -> sf.handle(this::logMessage))