I have some trouble understandig how subscribeOn/observeOn works in RxJava. I've created simple app with observable that emits solar system planet names, does some mapping and filtering and prints results.
As I understand, scheduling work to background thread is done via subscribeOn
operator (and it seems to work fine).
Observing on background thread also works fine with observeOn
operator.
But I have trouble in understanding, how to observe on calling thread (either if it is main thread or any other). It is easily done on Android with AndroidSchedulers.mainThread()
operator, but I don't know how to achieve this in pure java.
Here's my code:
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
System.out.println("Main thread: " + getCurrentThreadInfo());
Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
.map(in -> {
System.out.println("map on: " + getCurrentThreadInfo());
return in.toUpperCase();
})
.filter(in -> {
System.out.println("filter on: " + getCurrentThreadInfo());
return in.contains("A");
})
.subscribeOn(Schedulers.from(executor));
for (int i = 0; i < 5; i++) {
Thread thread = new Thread("Thread-" + i) {
@Override
public void run() {
stringObservable
.buffer(5)
.subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
}
};
thread.start();
}
}
private static String getCurrentThreadInfo() {
return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
}
}
Observable in created and work is subscribed on one of three thread from executor. This works as expected. But how to observe results on those dynamically created thread in for loop? Is there a way to create Scheduler from current thread?
Also, I've found out that after running this code, it never terminates and I don't know why? :(