2
votes

I have an aggregator configured via the Java DSL in a Spring Integration flow, and I want to throw an exception that goes to the global error channel when a group timeout occurs.

The discard channel is no good for me because discard messages are at the group member level, rather than for the whole group.

I've tried using the application event publisher as follows, but the publisher object doesn't seem to get invoked:

            .aggregate(new Consumer<AggregatorSpec>() {
                @Override
                public void accept(AggregatorSpec aggregatorSpec) {
                    try {
                        aggregatorSpec
                        .outputProcessor(groupPublishStrategy())
                        .correlationStrategy(groupPublishStrategy())
                        .releaseStrategy(groupPublishStrategy())
                        .groupTimeout(groupTimeout)
                        .get().getT2().setApplicationEventPublisher(myGroupExpirationPublisher());
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            })

Is there a recommended way to get notification for this use case? Any ideas why the above doesn't seem to work?

I suppose I could extend the AggregatorSpec class to get the message handler configured the way I want, but I wanted to see if I could do this with stock SI classes.

1

1 Answers

2
votes

Have just tested and ApplicationEventPublisher is populated properly on the bean initialization phase.

The expireGroup(Object correlationKey, MessageGroup group) is enough big to demonstrate it here, but you can find its code on GitHub. So, MessageGroupExpiredEvent is always published when we reach this method. Of course if discardMessage(message); doesn't throw exception.

OTOH the exprireGroup() is reachable only in this case:

if (this.releaseStrategy.canRelease(groupNow)) {
    completeGroup(correlationKey, groupNow);
}
else {
    expireGroup(correlationKey, groupNow);
}

So, please, be sure that your groupPublishStrategy() has proper logic and doesn't return true when your group isn't completed yet.

Well, it really would be better if you debug AbstractCorrelatingMessageHandler for your use-case. If you are sure that your group isn't completed during some groupTimeout, the forceComplete(MessageGroup group) is a good place for you to start debugging.

Otherwise, please, share DEBUG logs for the org.springframework.integration category, when you think that an event has to be emitted.