1
votes

When running the code snippet at the bottom, a failure occurs inside the CompositeException. The underlying cause of the failure is

Caused by: java.lang.RuntimeException: Duplicate found in causal chain so cropping to prevent loop ...

It would seem that the instance of CompositeException is being reused by each invocation of flatMap which is why it is seeing the same exception twice.

The expected behaviour is that first element emitted from the flux will be transformed into an error signal, causing the remaining elements to be cancelled and a Flux with an error signal being propagated down the signal chain.

    Flux.just(1, 2, 3)
            .flatMap(i -> RxReactiveStreams.toPublisher(Observable.error(new NullPointerException())))
            .subscribe();

Can anyone explain why this behaviour is occurring and what can be done to mitigate it?

RxJava Version: 1.3.8

Full stack trace

rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError

    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:187)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
    at rx.internal.operators.OperatorSerialize$1.onError(OperatorSerialize.java:52)
    at rx.observers.SerializedObserver.onError(SerializedObserver.java:152)
    at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.subscribe(Observable.java:10423)
    at rx.Observable.subscribe(Observable.java:10390)
    at rx.internal.reactivestreams.PublisherAdapter.subscribe(PublisherAdapter.java:35)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:418)
    at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
    at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8264)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8428)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8235)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8162)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8080)
    at uk.sky.ovp.mvpdb.service.service.strategy.common.executors.DownstreamExecutorIntegrationTest.nam2e(DownstreamExecutorIntegrationTest.java:76)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:184)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:180)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:127)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:229)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:197)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:211)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:191)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: rx.exceptions.CompositeException: 2 exceptions occurred. 
    ... 75 more
Caused by: rx.exceptions.CompositeException$CompositeExceptionCausalChain: Chain of Causes for CompositeException In Order Received =>
    at rx.exceptions.CompositeException.getCause(CompositeException.java:129)
    at java.base/java.lang.Throwable.printEnclosedStackTrace(Throwable.java:711)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:671)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:725)
    at com.intellij.junit5.JUnit5TestExecutionListener.getTrace(JUnit5TestExecutionListener.java:304)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:289)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:245)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:183)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:171)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.lambda$executionFinished$10(TestExecutionListenerRegistry.java:109)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.lambda$notifyEach$1(TestExecutionListenerRegistry.java:67)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.notifyEach(TestExecutionListenerRegistry.java:65)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.access$200(TestExecutionListenerRegistry.java:32)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.executionFinished(TestExecutionListenerRegistry.java:108)
    at org.junit.platform.launcher.core.ExecutionListenerAdapter.executionFinished(ExecutionListenerAdapter.java:56)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.reportCompletion(NodeTestTask.java:179)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:85)
    ... 32 more
Caused by: java.lang.NullPointerException
    at uk.sky.ovp.mvpdb.service.service.strategy.common.executors.DownstreamExecutorIntegrationTest.lambda$nam2e$1(DownstreamExecutorIntegrationTest.java:75)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378)
    at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
    at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8264)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8428)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8235)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8162)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8080)
    at uk.sky.ovp.mvpdb.service.service.strategy.common.executors.DownstreamExecutorIntegrationTest.nam2e(DownstreamExecutorIntegrationTest.java:76)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:184)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:180)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:127)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    ... 32 more
Caused by: java.lang.RuntimeException: Duplicate found in causal chain so cropping to prevent loop ...
    at rx.exceptions.CompositeException.getCause(CompositeException.java:145)
    at java.base/java.lang.Throwable.printEnclosedStackTrace(Throwable.java:711)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:671)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:725)
    at com.intellij.junit5.JUnit5TestExecutionListener.getTrace(JUnit5TestExecutionListener.java:304)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:289)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:245)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:183)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:171)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.lambda$executionFinished$10(TestExecutionListenerRegistry.java:109)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.lambda$notifyEach$1(TestExecutionListenerRegistry.java:67)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.notifyEach(TestExecutionListenerRegistry.java:65)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.access$200(TestExecutionListenerRegistry.java:32)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.executionFinished(TestExecutionListenerRegistry.java:108)
    at org.junit.platform.launcher.core.ExecutionListenerAdapter.executionFinished(ExecutionListenerAdapter.java:56)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.reportCompletion(NodeTestTask.java:179)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:85)
    ... 32 more
1

1 Answers

0
votes

I think using RxReactiveStreams.toPublisher() is wrong here. If you want to experiment with error happening in the stream, you should map it to Observable.error() and see how it goes.

From this similar thread with the same exception:

What's happening is that your onError implementation in a Subscriber is throwing an unchecked exception which is against the Observable contract and this aborts the observable processing throwing an OnErrorFailedException in the observeOn scheduler.

This can be seen in your stack trace:

rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:187)

As a sidenote, RxJava 1 was end-of-lifed in 2018 and it shouldn't be used anymore anyway.