42
votes

I have some trouble understandig how subscribeOn/observeOn works in RxJava. I've created simple app with observable that emits solar system planet names, does some mapping and filtering and prints results.

As I understand, scheduling work to background thread is done via subscribeOn operator (and it seems to work fine).

Observing on background thread also works fine with observeOn operator.

But I have trouble in understanding, how to observe on calling thread (either if it is main thread or any other). It is easily done on Android with AndroidSchedulers.mainThread() operator, but I don't know how to achieve this in pure java.

Here's my code:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread("Thread-" + i) {
                @Override
                public void run() {
                    stringObservable
                            .buffer(5)
                            .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
                }
            };
            thread.start();
        }

    }

    private static String getCurrentThreadInfo() {
        return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
    }
}

Observable in created and work is subscribed on one of three thread from executor. This works as expected. But how to observe results on those dynamically created thread in for loop? Is there a way to create Scheduler from current thread?

Also, I've found out that after running this code, it never terminates and I don't know why? :(

2
The main thread in Android is not necessarily the calling thread. That's an important distinction here. It's more equivalent to the EDT in Java Swing.Tom G
Of course it is not always calling thread but usually it is used as thread on which results should be delivered. I should have been more precise.Filip Zymek

2 Answers

88
votes

To answer your question, let me start from beginning, this allows other people to understand what you already know.

Schedulers

Schedulers play the same role as Executors for Java. Briefly - they decide on which thread actions are executed.

Usually an Observable and operators execute in current thread. Sometimes you can pass Scheduler to Observable or operator as a parameter (e.g. Observable.timer()).

Additionally RxJava provides 2 operators to specify Scheduler:

  • subscribeOn - specify the Scheduler on which an Observable will operate
  • observeOn - specify the Scheduler on which an observer will observe this Observable

To understand them quickly, I use a the example code:

On all samples, I will use helper createObservable, which emits a name of thread on which the Observable operates:

 public static Observable<String> createObservable(){
        return Observable.create((Subscriber<? super String> subscriber) -> {
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            }
        );
    }

Without schedulers:

createObservable().subscribe(message -> {
        System.out.println("Case 1 Observable thread " + message);
        System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
    });
    //will print:
    //Case 1 Observable thread main
    //Case 1 Observer thread main

SubscribeOn:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 2 Observable thread " + message);
                System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 2 Observable thread RxNewThreadScheduler-1
            //Case 2 Observer thread RxNewThreadScheduler-1

SubscribeOn and ObserveOn:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 3 Observable thread " + message);
                System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 3 Observable thread RxNewThreadScheduler-2
            //Case 3 Observer thread RxNewThreadScheduler-1

ObserveOn:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 4 Observable thread " + message);
                System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 4 Observable thread main
            //Case 4 Observer thread RxNewThreadScheduler-1

Answer:

AndroidSchedulers.mainThread() returns a sheduler which delegates work to MessageQueue associated with main thread.
For this purpose it uses android.os.Looper.getMainLooper() and android.os.Handler.

In other words, if you would like to specify particular thread, you must provide means to schedule and perform tasks on thread.

Underneath it may use any kind of MQ for storing tasks and logic which loops the Quee and execute tasks.

In java, we have Executor which is designated for such tasks. RxJava can easily create Scheduler from such Executor.

Below is example which shows how you can observe on main thread (not particular useful but show all required parts).

public class RunCurrentThread implements Executor {

    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException {
        RunCurrentThread sample = new RunCurrentThread();
        sample.observerOnMain();
        sample.runLoop();
    }

    private void observerOnMain() {
        createObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.from(this))
                .subscribe(message -> {
                    System.out.println("Observable thread " + message);
                    System.out.println("Observer thread " + Thread.currentThread().getName());
                });
        ;
    }

    public Observable<String> createObservable() {
        return Observable.create((Subscriber<? super String> subscriber) -> {
                    subscriber.onNext(Thread.currentThread().getName());
                    subscriber.onCompleted();
                }
        );
    }

    private void runLoop() throws InterruptedException {
        while(!Thread.interrupted()){
            tasks.take().run();
        }
    }

    @Override
    public void execute(Runnable command) {
        tasks.add(command);
    }
}

And the last question, why your code does not terminate:

ThreadPoolExecutor uses non deamon threads by defult, thus your program does not end until they exist. You should use shutdown method to close the threads.

7
votes

Here's a simplified example updated for RxJava 2. It's the same concept as Marek's answer: an Executor that adds the runnables to a BlockingQueue that's being consumed on the caller's thread.

public class ThreadTest {

    @Test
    public void test() throws InterruptedException {

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

        System.out.println("Caller thread: " + Thread.currentThread().getName());

        Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Observable thread: " + Thread.currentThread().getName());
                return 1;
            }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.from(new Executor() {
                @Override
                public void execute(@NonNull Runnable runnable) {
                    tasks.add(runnable);
                }
            }))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    System.out.println("Observer thread: " + Thread.currentThread().getName());
                }
            });
        tasks.take().run();
    }

}

// Output: 
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main