My working environment is JDK 1.6 and RxJava 2
I want to make an Observable which emits an item that is a file line string read via BufferedReader as follows:
...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
return new ObservableSource<String> call() throws Exception {
return new ObservableSource<String>() {
public void subscribe(Observer<String> observer) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(filePath));
String line = null;
while ((line = reader.readLine()) != null) {
observer.onNext(line);
}
observer.onComplete();
... catching exception and close reader
}
}
}
}
});
I also want to make an Observer that observes the above Observable with one take(count) operator as follows:
fileLineObservable.take(2)
.subscribe(new Consumer<String>() {
public void onNext(String line) {
... do something with the file line string
}
});
I meet NullPointerException when executing the above code and I know why. The NPE is caused by that the second call of onNext leads to execute onComplete on the TakeObserver instance and inside the onComplete method, upstream.dispose that is not set(null) is called. The upstream variable of TakeObserver is supposed to be set with onSubscribe(Disposable disposable) when it subscribes an Observable.
How can I solve this problem? Should I implement my own Disposable class to set the upstream of TakeObserver?