0
votes

I have this piece of a Java method I'm working on right now:

Observable<Map.Entry<String, ConstituentInfo>> obs = Observable.from(constituents.entrySet());

Subscriber<Map.Entry<String, ConstituentInfo>> sub = new Subscriber<Map.Entry<String, ConstituentInfo>>(){
    String securityBySymbol, company, symbol, companyName, marketPlace, countryName, tier, tierId;
    ConstituentInfo constituent;
    Integer compId;

    @Override
    public void onNext(Map.Entry<String, ConstituentInfo> entry) {
         logger.info("blah blah test");
    }

    @Override
    public void onCompleted() {
         logger.info("completed successfully");
    }

    @Override
    public void onError(Throwable throwable) {
         logger.error(throwable.getMessage());
         throwable.printStackTrace();
    }
};

obs.observeOn(Schedulers.io()).subscribe(sub);

The method essentially processes each entry in the Map.Entry, but this seems to be processing it sequentially (same thread). How would I go about making this process asynchronous without using the "parallel" operator (i.e. process entries concurrently)? I tried to run the code above and I am missing some results (some are not processed properly).

1

1 Answers

1
votes

Your observable is run on the main thread as it is reported and the Subscriber methods will be called by one worker given by Schedulers.io() so that is why it appears on one thread. Your code does not indicate any real work being done beyond logging by the subscriber so there is nothing to do asynchronously here.

Perhaps you meant to do this?

obs.subscribeOn(Schedulers.io()).subscribe(sub);

In terms of parallel processing if your last line was this:

obs.doOnNext(entry -> doWork(entry)).observeOn(Schedulers.io()).subscribe(sub);

Then you could have the doWork bit done asynchronously like this:

int numProcessors = Runtime.getRuntime().availableProcessors();
obs.buffer(Math.max(1, constituents.size()/numProcessors))
   .flatMap(list -> Observable.from(list)
                   .doOnNext(entry -> doWork(entry)
                   .subscribeOn(Schedulers.computation())
   .observeOn(Schedulers.io())
   .subscribe(sub);

You need to buffer up the work by processor otherwise there could be a lot of thread context switching going on.

I'm not sure why you are missing results. If you have a repeatable test case then paste the code here or report it to RxJava on github as an issue.