6
votes

This questions is related to Return immediately in spring web flux but I don't think it's the same (at least the answer there is not satisfactory for me).

I have a function returning a Mono that when invoked starts a long-running job. This function is invoked when a call is made to a Spring Webflux HTTP API. Here's an example:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
    val longRunningJob : Mono<Job> = startNewJob(jobId)
    longRunningJob.map { job ->
        val jobUri = generateJobUri(request, job.id)
        ResponseEntity.created(jobURI).build<Unit>()
    }
}

The problem with the code above is that "201 Created" is created after the long running job is completed. I want to kick-off the longRunningJob in the background and return "201 Created" immediately.

I could perhaps do something like this:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {

    startNewJob(jobId)
        .subscribeOn(Schedulers.newSingle("thread"))
        .subscribe()

    val jobUri = generateJobUri(request, job.id)
    val response = ResponseEntity.created(jobURI).build<Unit>()
    Mono.just(response)
}

But it doesn't seem very idiomatic to me to have to call subscribe() manually (e.g. intellij is complaining that I call subscribe() in non-blocking scope). Isn't there a better way to compose the two "streams" without using an explicit subscribe? If so how do I modify the startNewJob function above to achieve this?

2

2 Answers

5
votes

AFAIK, using one of the subscribe methods is the only way to really start a job in the background with its own lifecycle (not tied to the returned publisher).

If you were to use one of the operators to combine the job publisher and the response publisher (e.g. zip or merge), then the lifecycle of the job publisher would be tied to the response publisher, which is not what you want for a background job.

One thing you might want to consider is kicking off the background job within the response publisher stream, rather than directly in the method body. e.g. via doOnSubscibe or from an operator upstream of the response.

This would tie the start of the background job to the onSubscribe events of the response publisher, but still allow it to complete in the background.

Also note, that if you want to be able to cancel the background job (e.g. maybe during application shutdown), you'll need to save the Disposable returned from subscribe so you can later call dispose on it. This might be better done from some type of BackgroundJobManager that could keep track of all the jobs running.

-1
votes
private static final Scheduler backgroundTaskScheduler = Schedulers.newParallel("backgroundTaskScheduler", 2);
backgroundTaskScheduler.schedule(() -> doBackgroundJob());