3
votes

If an exception occurs in a spring-integration webflux flow, the exception itself (with stacktrace) is sent back to the caller as payload through MessagePublishingErrorHandler, which uses an error channel from the "errorChannel" header, not the default error channel.

How can I set up an error handler similar to WebExceptionHandler? I want to produce an Http status code and possibly a DefaultErrorAttributes object as response.

Simply defining a flow that starts from the errorChannel doesn't work, the error message won't end up there. I tried to define my own fluxErrorChannel, but it appears that it is also not used as error channel, the errors do not end up in my errorFlow:

@Bean
public IntegrationFlow fooRestFlow() {
    return IntegrationFlows.from(
            WebFlux.inboundGateway("/foo")
                    .requestMapping(r -> r.methods(HttpMethod.POST))
                    .requestPayloadType(Map.class)
                    .errorChannel(fluxErrorChannel()))
            .channel(bazFlow().getInputChannel())
            .get();
}

@Bean
public MessageChannel fluxErrorChannel() {
    return MessageChannels.flux().get();
}

@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlows.from(fluxErrorChannel())
            .transform(source -> source)
            .enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
            .get();
}

@Bean
public IntegrationFlow bazFlow() {
    return f -> f.split(Map.class, map -> map.get("items"))
            .channel(MessageChannels.flux())
            .<String>handle((p, h) -> throw new RuntimeException())
            .aggregate();
}

UPDATE

In MessagingGatewaySupport.doSendAndReceiveMessageReactive my error channel defined on the WebFlux.inboundGateway is never used to set the error channel, rather the error channel is always the replyChannel which is being created here:

FutureReplyChannel replyChannel = new FutureReplyChannel();

Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
    .setReplyChannel(replyChannel)
    .setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
    .setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
    .setErrorChannel(replyChannel)
    .build();

The error channel is ultimately being reset to the originalErrorChannelHandler in Mono.fromFuture, but that error channel is ˋnullˋ in my case. Also, the onErrorResume lambda is never invoked:

return Mono.fromFuture(replyChannel.messageFuture)
    .doOnSubscribe(s -> {
        if (!error && this.countsEnabled) {
            this.messageCount.incrementAndGet();
        }
    })
    .<Message<?>>map(replyMessage ->
        MessageBuilder.fromMessage(replyMessage)
            .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
            .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
            .build())
    .onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));

How is this intended to work?

1
In the error handling section of the reference docs.spring.io/spring-integration/docs/current/reference/html/… WebFlux errors are not mentioned - are they different from sync/async?dschulten

1 Answers

1
votes

It's a bug; the ErrorMessage created for the exception by the error handler is sent to the errorChannel header (which has to be the replyChannel so the gateway gets the result). The gateway should then invoke the error flow (if present) and return the result of that.

https://jira.spring.io/browse/INT-4541