I build very long rxjava chain (with retrofit request) with a lot of operators: doOnNext, doOnError, switchIfEmpty, onErrorResumeNext, flatMap On some devices (Android 4.1 for example) it throws StackOverflowError exception when chain go by the longest route.
Are there some ways or best practices to optimize chains or prevent StackOverflowError?
Now I see only one way - break chain, and call second part from first onComplete(OnNext), but it is not reactive way, I think.
Yet another way - change threads with .subscribeOn(Schedulers.newThread()); operator. Seems not the best solution too.
My code: 1) code with subscribing
fastSearch(keyphrase)
.onErrorResumeNext(throwable -> {
return correctKeyphraseAndSearch(keyphrase);
})
.doOnNext(resultsDao -> {...})
.subscribe(...)
2) helper methods
public static Observable<SearchResultsDao> fastSearch(final String keyphrase) {
String SRD = "true";
final HttpQueryParams params = new HttpQueryParams();
//read from cache chain
Observable<SearchResultsDao> cacheChain = getCache().fastSearch(keyphrase, SRD)
.doOnNext(...)
.doOnError(...)
.onErrorResumeNext(new HandleNoCacheEntry<SearchResultsDao>(params)); //save some data to "params", and return Observable.empty
//network request chain
Observable<SearchResultsDao> networkChain = getApi().fastSearch(keyphrase, SRD)
.retryWhen(new OnNewSessionRequired())
.doOnNext(new WriteToCacheAction<SearchResultsDao>(params)); //save to cache
//combine cache+network chains
return cacheChain
.switchIfEmpty(networkChain)
.doOnNext(resultsDao -> resultsDao.setKeyphrase(keyphrase));
}
public static Observable<SearchResultsDao> correctKeyphraseAndSearch(final String keyphrase) {
return mainDiv()
.flatMap(str -> syntax(str, keyphrase, true))
.flatMap(syntaxDao -> {
//get corrected keyphrase from server
StringBuilder newKeyphrase = ... ;
//repeat search request with new keyphrase
return fastSearch(newKeyphrase.toString());
});
}
3) some comments:
mainDiv() and syntax() methods identical to fastSearch() method, but doing another requests to server
getCache().fastSearch() - create observable that reads data from own cache (retrofit-like: getCache() implements retrofit Api methods interface)
Stacktrace:
Uncaught Exception
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:442)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:305)
at java.util.concurrent.FutureTask.run(FutureTask.java:137)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1076)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:569)
at java.lang.Thread.run(Thread.java:856)
Caused by: java.lang.StackOverflowError
okhttp3.HttpUrl.newBuilder (HttpUrl.java:633)
retrofit2.RequestBuilder.addQueryParam (RequestBuilder.java:144)
retrofit2.ParameterHandler$Query.apply (ParameterHandler.java:109)
retrofit2.ServiceMethod.toRequest (ServiceMethod.java:108)
retrofit2.OkHttpCall.createRawCall (OkHttpCall.java:178)
retrofit2.OkHttpCall.execute (OkHttpCall.java:162)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:171)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar (OperatorMerge.java:368)
rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit (OperatorMerge.java:330)
rx.internal.operators.OperatorMerge$InnerSubscriber.onNext (OperatorMerge.java:807)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onError (OperatorSwitchIfEmpty.java:116)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OnSubscribeRedo$4$1.onError (OnSubscribeRedo.java:331)
rx.internal.operators.OperatorMerge$MergeSubscriber.reportError (OperatorMerge.java:243)
rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate (OperatorMerge.java:779)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop (OperatorMerge.java:540)
rx.internal.operators.OperatorMerge$MergeSubscriber.emit (OperatorMerge.java:529)
rx.internal.operators.OperatorMerge$InnerSubscriber.onError (OperatorMerge.java:813)
rx.Observable$ThrowObservable$1.call (Observable.java:10200)
rx.Observable$ThrowObservable$1.call (Observable.java:10190)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:307)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:289)
rx.internal.operators.NotificationLite.accept (NotificationLite.java:150)
rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext (SubjectSubscriptionManager.java:253)
rx.subjects.BehaviorSubject.onNext (BehaviorSubject.java:160)
rx.internal.operators.OnSubscribeRedo$2$1.onError (OnSubscribeRedo.java:242)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:43)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:38)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:173)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSubscribeOn$1.call (OperatorSubscribeOn.java:94)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:442)