0
votes

My working environment is JDK 1.6 and RxJava 2

I want to make an Observable which emits an item that is a file line string read via BufferedReader as follows:

...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
    return new ObservableSource<String> call() throws Exception {
        return new ObservableSource<String>() {
            public void subscribe(Observer<String> observer) {
                BufferedReader reader = null;
                try {
                    reader = new BufferedReader(new FileReader(filePath));
                    String line = null;
                    while ((line = reader.readLine()) != null) {
                        observer.onNext(line);
                    }
                    observer.onComplete();

                    ... catching exception and close reader
                }
            }
        }
    }
});

I also want to make an Observer that observes the above Observable with one take(count) operator as follows:

fileLineObservable.take(2)
                  .subscribe(new Consumer<String>() {
                       public void onNext(String line) {
                           ... do something with the file line string
                       }
                  });

I meet NullPointerException when executing the above code and I know why. The NPE is caused by that the second call of onNext leads to execute onComplete on the TakeObserver instance and inside the onComplete method, upstream.dispose that is not set(null) is called. The upstream variable of TakeObserver is supposed to be set with onSubscribe(Disposable disposable) when it subscribes an Observable.

How can I solve this problem? Should I implement my own Disposable class to set the upstream of TakeObserver?

1
What do you want to achieve? Read two lines of the file and then stop?skywall
@skywall I have data files that consists of first two lines with metadata and the rest of the file is actual data so I need to read the first two lines before processing the actual data.ParkCheolu

1 Answers

0
votes

What about this solution?

Observable<String> observableFile2(Path path) {
        return Observable.using(
                () -> Files.newBufferedReader(path),
                reader -> {
                    return Observable.fromIterable(() -> {
                        return new Iterator<>() {
                            private String nextLine = null;

                            @Override
                            public boolean hasNext() {
                                try {
                                    nextLine = reader.readLine();
                                    return nextLine != null;
                                } catch (Exception ex) {
                                    return false;
                                }
                            }

                            @Override
                            public String next() {
                                if (nextLine != null) {
                                    return nextLine;
                                }
                                throw new IllegalStateException("nextLine can not be null.");
                            }
                        };
                    });
                },
                BufferedReader::close
        );
    }
  • Observable#using makes sure, that the BufferedReader is closed properly on disposable / onError
  • Observable#fromIterable wraps the readLine calls and handles onComplete for us.

Testing

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")

Tests

@Test
void name() {
    observableFile2(hello).take(2)
            .test()
            .assertValues("line0", "line1")
            .assertComplete();
}

@Test
void name2() {
    observableFile2(hello).take(10)
            .test()
            .assertValues("line0", "line1", "line2", "line3")
            .assertComplete();
}

@Test
void name3() {
    observableFile2(hello2)
            .test()
            .assertComplete();
}