0
votes

I would like to parallelize execution without aggregating events at the end. I'm new to RxJava and trying to evaluate it if it would fit my needs.

I know how to do a normal non-parallel execution. For simplicity let's start with:

taskReader.stream() // returns Flowable<Task>
    .subscribe(this::processTask) // sends results to another micro service

This works nicely in an ordered, blocking way. However, I would like to parallelize this and execute all the tasks concurrently. The documentation basically tells you to use a flatMap, which makes sense, but I can't wrap my head around on how to run the whole thing. Let's see:

taskReader.stream() // returns Flowable<Task> 
    .flatMap(
        Flowable.just(task)
            .subscribeOn(Schedulers.computation())
            .map(this::processTask)
    )

This is what I need, but of course this does not start the Flowable as nothing is subscribed to it [1]. All my tasks are unrelated so I don't need to aggregate them into a top-level stream again, and I certainly don't care about their order.

How do you start a Flowable like that? I don't want to subscribe to the top level Flowable, as there's nothing else I need to do there.

Alternatively, the docs tell you to use parallel flows:

taskReader.stream() // returns Flowable<Task> 
    .parallel()
    .runOn(Schedulers.computation())
    .map(this::processTask)
    .sequential();

And again, I don't need to sequence anything at the end, as I don't care about the order. Also, don't want to subscribe to this as the work in the map is all I need.

What I would really like is this:

taskReader.stream() // returns Flowable<Task> 
    .flatMap(
        Flowable.just(task)
            .subscribeOn(Schedulers.computation())
            .subscribe(this::processTask)
    )

But this is not how RxJava works.

Help?

Some background: I'm receiving events from a queue (specifically AWS SQS) where events are unrelated to one another. For each one I will need to perform some I/O intensive work, then some CPU intensive work and finally send the results to a different system. I would like to run all these events in parallel, so I don't really need ordering just good scheduling for computation an io work. I thought RxJava would help me there but maybe I'm trying to use the wrong tool for the job as I'm struggling with the first step.

[1] This is what we get from the docs, unless I'm searching for it wrong. Is this really the only documentation we have - https://github.com/ReactiveX/RxJava/wiki? I would expect to have something like what Reactor has: https://projectreactor.io/docs/core/release/reference/

1
(gist.github.com/punitda/94a98ff1769d1fbbd242b37999e6a43c) maybe this what you are trying to do @Mike if i understand your question correctly. - PunitD

1 Answers

0
votes

Change processTask() to something similar Flowable stream,

public Flowable<ProcessTask> processTask() {
    return Flowable
        .create(
            e -> {
                //Put the complete code inside this block which you have currently inside your processTask() method
                //Whatever processed output you get pass it as shown below.
                e.onNext(//Pass <ProcessTask> object);
            },
            BackpressureStrategy.BUFFER //Choose any BackpressureStrategy which suits your requirement
        )
        .subscribeOn(Schedulers.io());
}

So Basically, processTask() will also return a stream of Flowable.

This is how you can use flatMap() now,

taskReader
    .stream() // returns Flowable<Task>
    .flatMap(task -> processTask(task))
    .subscribe(
        result -> //Processed task, result from e.onNext(),
        error -> //Error
    );

Hope this helps.