1
votes

I build very long rxjava chain (with retrofit request) with a lot of operators: doOnNext, doOnError, switchIfEmpty, onErrorResumeNext, flatMap On some devices (Android 4.1 for example) it throws StackOverflowError exception when chain go by the longest route.

Are there some ways or best practices to optimize chains or prevent StackOverflowError?

Now I see only one way - break chain, and call second part from first onComplete(OnNext), but it is not reactive way, I think.

Yet another way - change threads with .subscribeOn(Schedulers.newThread()); operator. Seems not the best solution too.

My code: 1) code with subscribing

fastSearch(keyphrase)
    .onErrorResumeNext(throwable -> {
        return correctKeyphraseAndSearch(keyphrase);
    })
    .doOnNext(resultsDao -> {...})
    .subscribe(...)

2) helper methods

public static Observable<SearchResultsDao> fastSearch(final String keyphrase) {
    String SRD = "true";
    final HttpQueryParams params = new HttpQueryParams();
    //read from cache chain
    Observable<SearchResultsDao> cacheChain = getCache().fastSearch(keyphrase, SRD)
            .doOnNext(...)
            .doOnError(...)
            .onErrorResumeNext(new HandleNoCacheEntry<SearchResultsDao>(params)); //save some data to "params", and return Observable.empty
    //network request chain
    Observable<SearchResultsDao> networkChain = getApi().fastSearch(keyphrase, SRD)
            .retryWhen(new OnNewSessionRequired())
            .doOnNext(new WriteToCacheAction<SearchResultsDao>(params)); //save to cache
    //combine cache+network chains
    return cacheChain
            .switchIfEmpty(networkChain)
            .doOnNext(resultsDao -> resultsDao.setKeyphrase(keyphrase));
}

public static Observable<SearchResultsDao> correctKeyphraseAndSearch(final String keyphrase) {
    return mainDiv()
            .flatMap(str -> syntax(str, keyphrase, true))
            .flatMap(syntaxDao -> {
                //get corrected keyphrase from server
                StringBuilder newKeyphrase = ... ;
                //repeat search request with new keyphrase
                return fastSearch(newKeyphrase.toString());
            });
}

3) some comments:

mainDiv() and syntax() methods identical to fastSearch() method, but doing another requests to server

getCache().fastSearch() - create observable that reads data from own cache (retrofit-like: getCache() implements retrofit Api methods interface)

Stacktrace:

Uncaught Exception
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:442)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:305)
at java.util.concurrent.FutureTask.run(FutureTask.java:137)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1076)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:569)
at java.lang.Thread.run(Thread.java:856)
Caused by: java.lang.StackOverflowError
okhttp3.HttpUrl.newBuilder (HttpUrl.java:633)
retrofit2.RequestBuilder.addQueryParam (RequestBuilder.java:144)
retrofit2.ParameterHandler$Query.apply (ParameterHandler.java:109)
retrofit2.ServiceMethod.toRequest (ServiceMethod.java:108)
retrofit2.OkHttpCall.createRawCall (OkHttpCall.java:178)
retrofit2.OkHttpCall.execute (OkHttpCall.java:162)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:171)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar (OperatorMerge.java:368)
rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit (OperatorMerge.java:330)
rx.internal.operators.OperatorMerge$InnerSubscriber.onNext (OperatorMerge.java:807)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onError (OperatorSwitchIfEmpty.java:116)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OnSubscribeRedo$4$1.onError (OnSubscribeRedo.java:331)
rx.internal.operators.OperatorMerge$MergeSubscriber.reportError (OperatorMerge.java:243)
rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate (OperatorMerge.java:779)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop (OperatorMerge.java:540)
rx.internal.operators.OperatorMerge$MergeSubscriber.emit (OperatorMerge.java:529)
rx.internal.operators.OperatorMerge$InnerSubscriber.onError (OperatorMerge.java:813)
rx.Observable$ThrowObservable$1.call (Observable.java:10200)
rx.Observable$ThrowObservable$1.call (Observable.java:10190)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:307)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:289)
rx.internal.operators.NotificationLite.accept (NotificationLite.java:150)
rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext (SubjectSubscriptionManager.java:253)
rx.subjects.BehaviorSubject.onNext (BehaviorSubject.java:160)
rx.internal.operators.OnSubscribeRedo$2$1.onError (OnSubscribeRedo.java:242)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:43)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:38)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:173)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSubscribeOn$1.call (OperatorSubscribeOn.java:94)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:442)
2
kindly post your codepkgrover
99.99% that your code just wrong. Post it please.Divers
I added code to my question. Hope it helpsDmitry
Thanks for the source. It doesn't look like too deep to me, maybe the stacktrace itself could highlight what call structure causes the SO. Could you post the stacktrace in a gist?akarnokd
Added full stack traceDmitry

2 Answers

6
votes

Stackoverflows rarely happen with typical streams; without code I can't be sure what's wrong.

In earlier versions of RxJava, there were reentrancy problems in some operators that could result in StackOverflows but we haven't received any bug reports like it in some time now. Please make sure you are using the latest RxJava from the 1.x line (the RxAndroid plugin's dependency may need overriding as it is infrequently updated in this regard).

There are a few things you can do:

  • compact subsequent doOnNext and doOnError calls into doOnEach
  • add observeOn(Schedulers.computation()) occasionally
  • avoid chaining too many mergeWiths, collect up sources into a List and use merge(Iterable)
  • upgrade to RxJava 2.x which should have less stack depth (have not verified)

Edit:

Based on the stacktrace, my undestanding is that you have too many empty sequences and switchIfEmpty keeps deepening the stack when switching from one empty sequence to the next. As you found out, subscribeOn helps "restart" the stack. I'll investigate possible resolutions within RxJava itself.

0
votes

The cause of this problem was Jack compiler (https://source.android.com/source/jack.html) which I use in the project for lambdas support. I switched it off and now everything works fine