1
votes

Using spring reactive WebClient, I consume an API and in case of response with 500 status I need to retry with exponential backoff. But in Mono class, I don't see any retryBackoff with Predicate as input parameter.

This is the kind of function I search for:

public final Mono<T> retryBackoff(Predicate<? super Throwable> retryMatcher, long numRetries, Duration firstBackoff)

Right now my implementation is as following (I don't have retry with backOff mechanism):

client.sendRequest()
    .retry(e -> ((RestClientException) e).getStatus() == 500)
    .subscribe();
3

3 Answers

4
votes

You might want to have a look at the reactor-extra module in the reactor-addons project. In Maven you can do:

<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
    <version>3.2.3.RELEASE</version>
</dependency>

And then use it like this:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryWhen(Retry.onlyIf(ctx -> ctx.exception() instanceof RestClientException)
                    .exponentialBackoff(firstBackoff, maxBackoff)
                    .retryMax(maxRetries))
0
votes

I'm not sure, what spring version you are using, in 2.1.4 I have this:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryBackoff(numretries, firstBackoff, maxBackoff, jitterFactor);

... so that's exactly what you want, right?

0
votes

I'm currently trying it with Kotlin Coroutines + Spring WebFlux:

It seems the following is not working:

suspend fun ClientResponse.asResponse(): ServerResponse =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
        .retryWhen { 
            Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error("Retry for {}", it.exception()) }
        )
        .awaitSingle()