I am observing the lines produced by a NetworkResource, wrapping it in an Observable.create. Here is the code, missing try/catch and cancellation for simplicity:
fun linesOf(resource: NetworkResource): Observable<String> =
Observable.create { emitter ->
while (!emitter.isDisposed) {
val line = resource.readLine()
Log.i(TAG, "Emitting: $line")
emitter.onNext(line)
}
}
The problem is that later I want to turn it into a Flowable using observable.toFlowable(LATEST) to add backpressure in case my consumer can't keep up, but depending on how I do it, the consumer stops receiving items after item 128.
A) this way everything works:
val resource = ...
linesOf(resource)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.toFlowable(BackpressureStrategy.LATEST)
.subscribe { Log.i(TAG, "Consuming: $it") }
B) here the consumer gets stuck after 128 items (but the emitting continues):
val resource = ...
linesOf(resource)
.toFlowable(BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128
In option A) everything works without any issues, and I can see the Emitting: ... log side by side with the Consuming: ... log.
In option B) I can see the Emitting: ... log message happily emitting new lines, but I stop seeing the Consuming: ... log message after item 128, even though the emitting continues.
Question: Can someone help me understand why this happens?