1
votes

I'm learning Rx-java and came accross a small problem. I'm trying to get a list of Object and pass it to Observable.from() so I can work on it.

Problem : this list needs to be get on another thread (http calls inside), so Observable.from(getList()) gives me nothing.

I've tried things like :

Observable.create(new Observable.OnSubscribe>() { @Override public void call(Subscriber> subscriber) { subscriber.onNext(getList()); subscriber.onCompleted(); } }).Subscribe ...

But this subscribe on an Iterable, the onNext only passes the full list instead of every OIbject in the list. What am I missing ? How do I do that ? Thanks

Update

Here is what I'm trying to do (and how I use toSortedList) :

Observable.from(getList())
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .toSortedList(new Func2<Object, Object, Integer>() {
      @Override
      public Integer call(Object left, Object right) {
        if (left.equals(right))
          return 0;
        return left.getLabel(pm).compareToIgnoreCase(right.getLabel(pm));
      }
    })
    .subscribe(new Observer<List<Object>>() {
//On next sends a List of objects I'm using
}

Problem is that getList() needs to be called on another Thread (and it returns an Iterable, not an Observable). Maybe I'm not using your first solution correctly dwursteisen.

UPDATE 2

Here's something I don't understand why it's not working :

Observable.just(0)
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(new Func1<Integer, Observable<Object>>() {
      @Override
      public Observable<Object> call(Integer integer) {
        return Observable.from(getList());
      }
    })
    .toSortedList(new Func2<Object, Object, Integer>() {
      @Override
      public Integer call(Object left, Object right) {
        if (left.equals(right))
          return 0;
        return left.getLabel(pm).compareToIgnoreCase(right.getLabel(pm));
      }
    })
    .subscribe( ... )

Or with replacing the flatMap call by a subscribe and doing everything in the onNext, it still calls getList() on the mainThread ...

2

2 Answers

2
votes

As you want to use a specific thread, you can use subscribeOn method to specify which "tread" (scheduler) to use.

sourceObservable.subscribeOn(httpScheduler)
                .flatMap(Observable::from)
                .subscribe();

if it doesn't work, you can still use your solution. But just iterate on your list :

Observable.create(new Observable.OnSubscribe<CategorizedActivityInfoWrapper>() {
  @Override
  public void call(Subscriber<? super CategorizedActivityInfoWrapper> subscriber) {
     for(o : getList()) {
         subscriber.onNext(o);
     }

     subscriber.onCompleted();
  }
}).subscribe ...

UPDATE

Observable.from(getList()).subscribe();

can be written like this too :

    List objs = getList();
    Observable.from(objs).subscribe();

So, getList() will be called in the current thread. You won't be abble to control in which thread getList() will be called

0
votes

You need to place your async computation before observeOn (and optionally after subscribeOn):

public class AsyncStartOtherThread {
    public static void main(String[] args) throws Exception {
        CountDownLatch cdl = new CountDownLatch(1);
        Observable.just(1).subscribeOn(Schedulers.newThread())
        .map(e -> {
            System.out.println(Thread.currentThread());
            return Arrays.asList(1, 2, 3, 4, 5);
        })
        .observeOn(Schedulers.computation())
        .doOnNext(e -> System.out.println(Thread.currentThread()))
        .flatMap(Observable::from)
        .toSortedList((a, b) -> Integer.compare(b, a))
        .subscribe(System.out::println, Throwable::printStackTrace, () -> { System.out.println("Done"); cdl.countDown(); });
        ;
        cdl.await();
    }
}