2
votes

I have a Integration flow like that:

@Bean
public IntegrationFlow inboundRequestFlow()
{
    return IntegrationFlows.from( inboundRequestGateway() )
        .log( ... )
        .filter(  requestValidator,
            spec -> spec.discardChannel( invalidRequestChannel() ) )
        .bridge( spec -> spec.requiresReply( false ) )
        .channel( asyncFlowChannel )
        .wireTap(
            //sends a NO_CONTENT reply if the request is ok
            flow -> flow.enrichHeaders( spec -> spec.header( HttpHeaders.STATUS_CODE, HttpStatus.NO_CONTENT ).defaultOverwrite( true ) )
                .transform( payload -> "" )
                .channel( inboundGatewayReplyChannel() )
        ).get();

}

It receives an request on an http gateway, validates it and, if everything is ok, sends the request to 'asyncFlowChannel' and reply the inbound gateway with 204.

'asyncFlowChannel' is the start point for another IntegrationFlow that runs on an Executor channel:

@Bean
public IntegrationFlow outboundFlow()
{
    return IntegrationFlows.from( asyncFlowChannel)
        .log( ... )
        .transform( ... )
        .transform( ... )
        .split(... )            
        .resequence( ... )
        .enrichHeaders( ... )
        .log( ... )
        .transform( ... )
        .handle( this.outboundSOAPGateway() )
        .log( .. )
        .handle( ... )
        .bridge( spec -> spec.requiresReply( false ) )
        .channel( anotherAsyncFlowChannel )
        .get();
}

If a exception occurs on my outboundGateway (because of a network-related IO error, or a error response ) i want to log the error and take the appropriate measures. But i cant set a error channel on the outboundSOAPGateway and the inboundRequestGateway on the starting flow already received it's reply.

The only clue of the error i get is this log:

10:19:53.002 WARN [outbound-flow-0] org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=..., headers=...]

My question is: what is the proper way to treat errors on an outbound gateway in an asynchronous flow where the inboundGateway that starts the flow already received it's reply?

2

2 Answers

3
votes

Any MessageHandler endpoint can be supplied with the AbstractRequestHandlerAdvice. One of them is ExpressionEvaluatingRequestHandlerAdvice where you can catch an exception and send it into the failureChannel: https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice.

For that purpose the .handle( this.outboundSOAPGateway() ) can be supplied with the second argument, for example:

.handle((GenericHandler<?>) (p, h) -> {
                    throw new RuntimeException("intentional");
                }, e -> e.advice(retryAdvice()))

In this case I use

@Bean
public RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
    requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
    return requestHandlerRetryAdvice;
}

But the same is applied for the ExpressionEvaluatingRequestHandlerAdvice.

BTW that retryAdvice() may do the trick for your as well. See its ErrorMessageSendingRecoverer.

1
votes

Instead of .channel( asyncFlowChannel ), use

.gateway(asyncFlowChannel, e -> e.errorChannel(...))