3
votes

I tried googling this but results are all about what's changed between rxjava 1 and 2... less than helpful, lol.

I'm new to Rx Java and am working through it. I'm able to create an observable pretty easily and all subscribers are getting the right calls. I just have a question:

Object myObject = something1;

I create the observable and all subscribers and they get myObject in the onNext() method.

But, somewhere down the line, we do "myObject = something2".

How do I notify the subscribers that myObject has changed?

Here's how I'm creating the observable:

Observable<MyObject> myObservable =  Observable.create(new ObservableOnSubscribe<MyObject>() {
            @Override
            public void subscribe(ObservableEmitter<MyObject> e) throws Exception {
                MyObject myObject = someObject1;

                e.onNext(myObject);
            }
        });

Here's how I'm subscribing:

    @Override
public void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    myObservable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(mDisposableObserver);
}

private DisposableObserver<MyObject> mDisposableObserver = new DisposableObserver<MyObject>() {
    @Override
    public void onNext(MyObject myObject) {
        // Do UI stuff with 'myObject' variables
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

Side Note: If anybody has links or suggestions to really good Rx Java 2 tutorials (like... super beginner through god-status), I'd greatly appreciate it

1

1 Answers

3
votes

You can't get notified straight after your object has been changed, if you dont observe for the "setters" of the object which notify somehow if the object has been changed. Using Observables from android architecture library fixes this issue.

Anyway, you can poll your Object for changes and notify the subscribers if the object has been changed. Using repeatWhen and distintUntilChanged makes sure that you dont get your data emitted if those are not changed. Example (untested) may be:

Observable<MyObject> myObservable = Observable.just(yourObject)
                    .repeatWhen(o -> o.concatMap(v -> Observable.timer(1000, TimeUnit.MILLISECONDS)))
                    .distinctUntilChanged();


myObservable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(d -> Log.d("Data", "Got Object "+ d), e -> Log.e("Error", "Received Error:" + e);

Another better attempt is (like Observables from architecture components) that you modify your Object/Model to observe for changes.

That means that you create a Publish Subject inside your model which gets notified if the setter has been called emitting itself as value. Subscribing to this subject means that all subscribers who subscribe to this subject gets notified as soon as the setter has been called with the object which has been changed.

public static class ModelClass {

private PublishSubject<ModelClass> changeObservable = PublishSubject.create();
private String field1;

public String getField1() {
    return field1;
}

public void setField1(String field1) {
    this.field1 = field1;
    changeObservable.onNext(this);
}

public Observable<ModelClass> getModelChanges() {
    return changeObservable;
}
}




@Override
  public void onCreate(@Nullable Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  ModelClass myModel = new ModelClass ();
  myModel.getModelChanges().subscribe(c -> Log.d("Change:" + c));

  myModel.setField1("1");
  myModel.setField1("2");
  myModel.setField1("3");
 }