2
votes

I am trying to steer away from using blocking thread model and use reactive model for achieving high throughput. The use case I have is this: There is a huge number of incoming messages. For every message I need to do some I/O without blocking that thread. Here I am processing each message in a separate thread. If the application is killed, I need to finish the ongoing tasks and shutdown gracefully. I am using Thread.sleep below to mimic the intensive I/O operation. The code sample is below:

public class TestReactor {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Disposable task = Flux.range(1, 100).parallel().runOn(Schedulers.fromExecutor(executor)).doOnNext(message -> {
            System.out.println(Thread.currentThread().getName()+": processing " + message);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+": Done");
        })
        .sequential()
        .doOnError(e->e.printStackTrace())
        .doOnCancel(()->{
            System.out.println("disposing.....");
            executor.shutdown();
            try {
                executor.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        })
        .subscribe();

        Thread.sleep(4000);
        
        task.dispose();
        
        System.out.println("Disposed. Waiting for sometime before exit.");
        Thread.sleep(20000);
    }

}

When I run this, the flux seems to ignore the executor.shutdown() and errors out with interrupted exceptions. Is it possible to achieve my use case using flux?

Adding output: enter image description here

1
I've executed your code with RxJava `Flowable qnd I didn't recieve any arrorbubbles
@bubbles Please see the attached screenshot of my run. I updated the code by reducing the wait timesfalcon

1 Answers

0
votes

You made a great mistake: never use any thread manipulations when you are working with Reactive programming. It's a dirty hack. The clearest markers of bad design and code smell when you try to code on FRP are:

  • calling try-catch blocks inside the operators and side-effects functions(doOn). If you have a case with an exception - ok, it's fine. Call the onError operators and handle in it behaviour of your pipeline.
  • Using Thread.sleep and other concurrency API inside Mono/Flux. Weblfux API also have a great API to work with concurrency and multi-threading.

So, regarding the problem and its solution: you made bad design :) You don't need to handle any disposables or thread interrupting. You just have to remove it and add killswitch.

So, working code, that does what you want bellow:

public class TestReactor {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        Flux.generate(() -> 0,
                (state, sink) -> {
                    sink.next("current state = " + state);
                    if (state == 100) sink.complete();
                    return state + 1;
                }
        )
        .parallel()
        .runOn(Schedulers.fromExecutor(executor))
        .doOnNext(message -> {
            System.out.println(Thread.currentThread().getName() + ": processing " + message);
            System.out.println(Thread.currentThread().getName() + ": Done");
        })
        .sequential()
        .doOnError(e -> e.printStackTrace())
        .doOnCancel(() -> {
            System.out.println("disposing.....");
        })
        .subscribe();
    }

}