0
votes

I have a DSL-based flow that uses split to iterate over a list of objects and send out a Kafka message:

.transform(...)
.split()
.channel(KAFKA_OUT_CHANNEL)

After all the messages have been sent out, I need to call a service and I also need to log how many messages were processed. I understand that an approach is to use a publishSubscribeChannel where the first subscribe does that actual Kafka sending and then the aggregate execute the service call:

.transform(...)
.split().
.publishSubscribeChannel(pubSub -> pubSub
        .subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)))

I'm having problems figuring out how to actually do the .aggregate part within a pubSubChannel using the DSL. So far I have tried:

.subscribe(f ->  f.channel(KAFKA_OUT_CHANNEL)
.subscribe(f -> f.aggregate(c -> c.processor( ?? ))))

Any pointers?

2

2 Answers

0
votes

The AbstractMessageSplitter has an applySequence = true by default:

/**
 * Set the applySequence flag to the specified value. Defaults to true.
 * @param applySequence true to apply sequence information.
 */
public void setApplySequence(boolean applySequence) {

With that we have in the messages these headers:

if (this.applySequence) {
    builder.pushSequenceDetails(correlationId, sequenceNumber, sequenceSize);
}

The Aggregator's default correlation strategy is really based on the IntegrationMessageHeaderAccessor.CORRELATION_ID header. This way it gathers messages with the same correlationKey into the same MessageGroup. The default ReleaseStrategy is based on the MessageGroup and that sequenceSize header comparison. In the end the default MessageGroupProcessor just collect all the messages in the group into a single message with the Collection as a payload. In other words the default behavior of the aggregator is absolutely opposite to the splitter.

I don't know what the output you are going to do from the aggregator, but you don't need any other logic to configure there - correlation and releasing logic should be based on the default state.

You can find enough info in the Reference Manual.

0
votes

It depends what you want after the aggregation - if you just want a list of payloads, just use aggregate()...

@SpringBootApplication
public class So51059703Application {

    public static void main(String[] args) {
        SpringApplication.run(So51059703Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                    Arrays.asList("a", "b", "c")));
        };
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .split()
                .publishSubscribeChannel(p -> p
                        .subscribe(f1 -> f1.handle(System.out::println))
                        .subscribe(f2 -> f2
                                .aggregate()
                                .handle(System.out::println)));
    }

}

If you just want the count:

@SpringBootApplication
public class So51059703Application {

    public static void main(String[] args) {
        SpringApplication.run(So51059703Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                    Arrays.asList("a", "b", "c")));
        };
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .split()
                .publishSubscribeChannel(p -> p
                        .subscribe(f1 -> f1.handle(System.out::println))
                        .subscribe(f2 -> f2
                                .aggregate(c -> c
                                        .processor(processor(), "reduce"))
                                .handle(System.out::println)));
    }

    @Bean
    public Object processor() {
        return new Object() {

            public int reduce(List<Message<?>> messages) {
                return messages.size();
            }

        };
    }

}