2
votes

I have a thread that uses gateway (void) to send a message to both (pub/sub):

  • barrier, to hold the thread during the real execution (requires-reply="true" timeout="XXXX", output-channel="nullChannel"

and to

  • splitter which next sends splits as messages to the service activator (direct channel) with poller and a thread executor for the actual processing/execution

How to properly configure handling the exceptions that might be thrown by the executor threads and catch them in the below catch block:

try {
    gateway.trigger()  
} catch (ReplyRequiredException e) {
    //fine here
} catch (Throwable t) {
    // catch every exception here... or somehow configure these exceptions to discard the thread that waits on the barrier and throw below business exception
    throw new SomeExecutionFailedException()
}

EDIT

<!--gateway.trigger()—> 
<int:gateway id=“gateway"
             service-interface="com.Gateway"
             default-request-channel=“channel1"
             default-reply-timeout="0"/>

<int:publish-subscribe-channel id=“channel1"/>

<int:splitter input-channel=“channel1" output-channel=“channel2"
              order="1">
    <bean class=“com.Splitter"/>
</int:splitter>

<int:barrier id=“barrier" input-channel=“channel1"
    output-channel="nullChannel"
    correlation-strategy-expression=“'XXX’” <!--hack here-->
    requires-reply="true"
    timeout=“40000"
    order="2">
</int:barrier>

<int:channel id=“channel2">
    <int:queue capacity="30"/>
</int:channel>

<!— actual processing/execution —>
<int:service-activator id=“executionAct" input-channel=“channel2"
                       output-channel=“channel3" ref=“executionService">
    <int:poller fixed-rate="111" time-unit="MILLISECONDS" max-messages-per-poll="22"
                task-executor=“exec"/>
</int:service-activator>

<bean id=“executionService" class=“com.SomeExecService"/>

<bean id=“exec" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="threadFactory" ref=“execFactory"/>
...
    <property name="rejectedExecutionHandler">
        <bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy"/>
    </property>
</bean>

<bean id=“execFactory"
      class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
...
</bean>

<int:channel id=“channel3"/>

<int:chain input-channel=“channel3" output-channel=“channel4">
...
    <int:aggregator
            group-timeout=“30000"
            discard-channel=“discardChannel" release-strategy=“com.ReleaseStrategy"
            send-partial-result-on-expiry="false">
        <bean class="com.Aggregator"/>
    </int:aggregator>
</int:chain>

<int:channel id=“discardChannel”/>

<int:channel id=“channel4"/>

<!— processing done - wake up barrier —>
<int:service-activator id=“barrierReleaseAct" input-channel=“channel4" output-channel="nullChannel">
     <bean class="com.ServiceThatSendsXXXMessageToChannel5ToReleaseBarrier"/>
</int:service-activator>

<int:channel id=“channel5"/>

<int:outbound-channel-adapter channel=“channel5"
                              ref=“barrier" method="trigger"/>
1

1 Answers

1
votes

You need to provide much more information, configuration etc.

What releases the barrier, and when?

Do you want to propagate the exception(s) to the main thread?

What if multiple splits fail, etc, etc.

The general answer is sending a message with a Throwable payload to the barrier's trigger method will release the thread by throwing a MessagingException with the Throwable as its cause. The gateway unwraps the MessagingException and throws the cause (which is the original payload sent to the barrier's trigger method).

EDIT

I have added a pull request to the barrier sample app to show one technique of collecting exceptions on the async threads and causing the barrier to throw a consolidated exception back to the gateway caller.