1
votes

I am using spring webflux to call one service from another via Schedulers.elastic()

Mono<Integer> anaNotificationCountObservable = wrapWithRetryForFlux(wrapWithTimeoutForFlux(
                notificationServiceMediatorFlux.getANANotificationCountForUser(userId).subscribeOn(reactor.core.scheduler.Schedulers.elastic())
        )).onErrorReturn(0);

In main thread i am setting one InhertitableThreadLocal variable and in the child thread I am trying to access it and it is working fine.

This is my class for storing threadlocal

@Component
public class RequestCorrelation {

    public static final String CORRELATION_ID = "correlation-id";

    private InheritableThreadLocal<String> id = new InheritableThreadLocal<>();

    public String getId() { 
       return id.get(); 
    }

    public void setId(final String correlationId) { 
       id.set(correlationId); 
    }

    public void removeCorrelationId() {
       id.remove();
    }
}

Now the issue is first time its working fine meaning the value i am setting in threadlocal is passed to other services.

But second time also, it is using old id(generated in last request).

I tried using Schedulers.newSingle() instead of elastic(), then its working fine. So think since elastic() is re-using threads, thats why it is not able to clear / or it is re-using.

How should i resolve issue. I am setting thread local in my filter and clearing the same in myfiler

requestCorrelation.setId(UUID.randomUUID().toString());
chain.doFilter(req,res)
requestCorrelation.removeCorrelationId();
1

1 Answers

1
votes

You should never tie resources or information to a particular thread when leveraging a reactor pipeline. Reactor is itself scheduling agnostic; developers using your library can choose to schedule work on another scheduler - if you decide to force a scheduling model you might lose performance benefits.

Instead you can store data inside the reactor context. This is a map-like structure that’s tied to the subscriber and independent of the scheduling arrangement.

This is how projects like spring security and micrometer store information that usually belongs in a threadlocal.