2
votes

I was wondering how I could concurrently process lines of text with RxJava. Right now, what I have is an observable from an entry set, and a subscriber that on each entry, processes the entry in the onNext call. The subscriber subscribes to the observable with this line of code

obs.observeOn(Schedulers.io()).subscribe(sub);

But when I run it, it runs as slowly as the sequential version, and seems to be processing it sequentially. How would I make this concurrent?

1

1 Answers

3
votes

Your observeOn(Schedulers.io()) call means that all emissions will be observed on that one thread. You want to get them onto their own threads.

Here I use flatMap to create a new observable for each item emitted from the source. Inside the mapping function I have to defer the processing work until subscription, else the entire chain is blocked while processing completes. I also have to ensure that subscription happens on a new thread via subscribeOn.

Random r = new Random();
Observable.from(new String[]{"First", "Second", "Third", "Fourth", "Fifth"})
    .flatMap(new Func1<String, Observable<String>>() {
        public Observable<String> call(final String s) {
            return Observable.defer(new Func0<Observable<String>>() {
                public Observable<String> call() {
                    Thread.sleep(r.nextInt(1000));
                    return Observable.just(s);
                }
            }).subscribeOn(Schedulers.newThread());
        }
    })
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println("Observed " + s + " on thread " + Thread.currentThread().getId());
        }
    });

This gives me output like (note out-of-order and on different threads - ie, processed in parallel):

Observed Fourth on thread 17
Observed Second on thread 15
Observed Fifth on thread 18
Observed First on thread 14
Observed Third on thread 16