I have a DSL-based flow that uses split to iterate over a list of objects and send out a Kafka message:
.transform(...)
.split()
.channel(KAFKA_OUT_CHANNEL)
After all the messages have been sent out, I need to call a service and I also need to log how many messages were processed.
I understand that an approach is to use a publishSubscribeChannel where the first subscribe does that actual Kafka sending and then the aggregate execute the service call:
.transform(...)
.split().
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)))
I'm having problems figuring out how to actually do the .aggregate part within a pubSubChannel using the DSL. So far I have tried:
.subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)
.subscribe(f -> f.aggregate(c -> c.processor( ?? ))))
Any pointers?