1
votes

I am observing the lines produced by a NetworkResource, wrapping it in an Observable.create. Here is the code, missing try/catch and cancellation for simplicity:

fun linesOf(resource: NetworkResource): Observable<String> =
        Observable.create { emitter ->
            while (!emitter.isDisposed) {
                val line = resource.readLine()
                Log.i(TAG, "Emitting: $line")
                emitter.onNext(line)
            }
        }

The problem is that later I want to turn it into a Flowable using observable.toFlowable(LATEST) to add backpressure in case my consumer can't keep up, but depending on how I do it, the consumer stops receiving items after item 128.

A) this way everything works:

val resource = ...
linesOf(resource)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribe { Log.i(TAG, "Consuming: $it") }

B) here the consumer gets stuck after 128 items (but the emitting continues):

val resource = ...
linesOf(resource)
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128

In option A) everything works without any issues, and I can see the Emitting: ... log side by side with the Consuming: ... log.

In option B) I can see the Emitting: ... log message happily emitting new lines, but I stop seeing the Consuming: ... log message after item 128, even though the emitting continues.

Question: Can someone help me understand why this happens?

1

1 Answers

5
votes

First of all, you are using the wrong type and wrong operator. Using Flowable removes the need for conversion. Using Flowable.generate gets you backpressure:

Flowable.generate(emitter -> {
    String line = resource.readLine();
    if (line == null) {
        emitter.onComplete();
    } else {
        emitter.onNext(line);
    }
});

Second, the reason your version hangs is due to a same pool deadlock caused by subscribeOn. Requests from downstream are scheduled behind your eager emission loop and can not take effect, stopping the emission at the default 128 elements. Use Flowable.subscribeOn(scheduler, false) to avoid this case.