I have a void listen function that listens to server pushed data. I need to create an observable and observer so i can work with the data using onNext, onComplete and onError.
0
votes
2 Answers
1
votes
You might want to look into using a BehaviourSubject
private final BehaviorSubject<YourImmutableDataClass> mServerObservable = BehaviorSubject.create();
private void update(YourImmutableDataClass next) {
mServerObservable.onNext(next);
}
public Observable<YourImmutableDataClass> observe() {
return mServerObservable.distinctUntilChanged();
}
0
votes
This is some crude guesswork below.
If you are trying to pipe through as in an indefinite stream, rxjava 1.x is not making it easy with backpressure issues but Rxjava2 has a better Observable.create(..target) where you could likely have your listen() implementation to call the target's onnext/onerror/oncomplete.
Of course, there is much code to add for when the subscriber unsubscribes (if that) so that the listeners can be removed. But that's a start.
Demo of the elements of a possible solution:
CAREFUL: this is not bulletproof code, the listeners list is not thread-safe. I just kept it light for now.
package tests.rxjava2;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class TestRxjava2Basics {
static void p(Object msg) {
System.out.println(Thread.currentThread().getName()+"]: "+msg);
}
static void w(long delay) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
List<Consumer<String>> listeners = new ArrayList<>(); //NOT THREADSAFE!!!!!
Consumer<String> c1 = s -> p("consumer permanent: "+s);
listeners.add(c1);
Thread machinegun = new Thread(() -> {
while(!Thread.interrupted()) {
listeners.forEach(c -> c.accept(""+System.currentTimeMillis()));
try {
Thread.sleep(200);
} catch (InterruptedException e) {
break;
}
}
}, "gun");
machinegun.start();
// for(int i=0; i<5; i++) {
// final int fi = i;
// Consumer<String> c = s -> p("consumer adapter "+fi+": "+s);
// listeners.add(c);
// Thread.sleep(1000);
//
// listeners.remove(c);
// Thread.sleep(1000);
// }
//equivalent in RX:
for(int i=0; i<5; i++) {
final int fi = i;
Disposable disp = Observable.create(tgt -> {
Consumer<String> c = s -> {
p("consumer adapter "+fi+": "+s);
tgt.onNext(s);
};
tgt.setCancellable(() -> {
p("cancelling consumer adapter "+fi);
listeners.remove(c);
});
listeners.add(c);
})
.doOnNext(s -> p("onnext "+fi+": "+s))
.subscribe();
Thread.sleep(1000);
disp.dispose();
Thread.sleep(1000);
}
machinegun.interrupt();
}
}