1
votes

I am using the SpringData MongoDB Reactive Streams driver with code that does something like this:

reactiveMongoOperations.changeStream(changeStreamOptions, MyObject.class)
    .parallel()
    .runOn(Schedulers.newParallel("my-scheduler", 4))
    .map(ChangeStreamEvent::getBody)
    .flatMap(o -> {
        reactiveMongoOperations.findAndModify(query, update, options, MyObject.class)
    })
    .subscribe(this::process)

I would expected everything to execute in my-scheduler. What actually happens is that the flatMap operation does execute in my-scheduler, while the code in my process() method does not.

Can someone please explain why this is so - is this a bug or am I doing something wrong? How can I get all the operations defined in the Flux to execute on the same scheduler?

1
what thread does the process execute in? could it be that the mongo driver changes the threads in findAndModify? - Simon Baslé
@SimonBaslé, the process method executes in a generic thread ("Thread-nn"). If I add another runOn() after the flatMap() call then process will execute in my-scheduler. I don't think I should need to do that though. - user1585916

1 Answers

0
votes

runOn() specifies the scheduler which is used to run each "rail" of the parallel thread. It doesn't effect subscribers.

If you want to specify a scheduler for subscribers, then you should specify that using subscribeOn() on the original Flux (before the parallel() call.)