3
votes

Spring integration tcp gateway can be setup as follows:

<!-- Server side -->

<int-ip:tcp-connection-factory id="crLfServer"
    type="server"
    port="${availableServerSocket}"/>

<int-ip:tcp-inbound-gateway id="gatewayCrLf"
    connection-factory="crLfServer"
    request-channel="serverBytes2StringChannel"
    error-channel="errorChannel"
    reply-timeout="10000" />

<int:channel id="toSA" />

<int:service-activator input-channel="toSA"
    ref="echoService"
    method="test"/>

<bean id="echoService"
    class="org.springframework.integration.samples.tcpclientserver.EchoService" />
<int:object-to-string-transformer id="serverBytes2String"
    input-channel="serverBytes2StringChannel"
    output-channel="toSA"/>
<int:transformer id="errorHandler"
    input-channel="errorChannel"
    expression="Error processing payload"/>

Notice the reply-timeout which is set as 10 seconds.

Does it mean that the TCP server will call the service and can wait for a maximum of 10 seconds? If the service does not reply within 10 seconds, Does the TCP server will send the message to errorChannel which in turn sends the client error message "Error processing payload"?

When I tested the TCP Server with a service that takes 20 seconds, client is taking 20 seconds to get the response. I am not seeing error message.

Can you please help in understanding the reply-timeout in TCP inbound-gateway?

Thanks

UPDATE: Thanks for Artem to help out with this issue. Best way to solve this problem is with the following config:

<beans>
    <int-ip:tcp-connection-factory id="crLfServer" type="server" port="${availableServerSocket}"/>
    <int-ip:tcp-inbound-gateway id="gatewayCrLf" connection-factory="crLfServer" request-channel="requestChannel" error-channel="errorChannel" reply-timeout="5000" />
    <int:service-activator input-channel="requestChannel" ref="gateway" requires-reply="true"/>
    <int:gateway id="gateway" default-request-channel="timeoutChannel" default-reply-timeout="5000" />
    <int:object-to-string-transformer id="serverBytes2String" input-channel="timeoutChannel" output-channel="serviceChannel"/>
    <int:channel id="timeoutChannel">
        <int:dispatcher task-executor="executor"/>
    </int:channel>
    <bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="5" />
        <property name="maxPoolSize" value="10" />
        <property name="queueCapacity" value="25" />
    </bean>
    <int:service-activator input-channel="serviceChannel" ref="echoService" method="test"/>
    <bean id="echoService" class="org.springframework.integration.samples.tcpclientserver.EchoService" />
    <int:transformer id="errorHandler" input-channel="errorChannel" expression="payload.failedMessage.payload + ' errorHandleMsg: may be timeout error'"/>
</beans>

Thanks

1

1 Answers

1
votes

Well, actually we should on that attribute a description like we have in other similar places, e.g. HTTP Inbound Gateway:

        <xsd:attribute name="reply-timeout" type="xsd:string">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[
Used to set the receiveTimeout on the underlying MessagingTemplate instance
(org.springframework.integration.core.MessagingTemplate) for receiving messages
from the reply channel. If not specified this property will default to "1000"
(1 second).
                        ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>

That timeout means how much to wait for reply from downstream flow. But! That is possible if you flow is shifted to another thread somewhere. Otherwise everything is performed in the caller's Thread and therefore the wait time isn't deterministic.

Anyway we return null there after timeout without reply. And it is reflected in the TcpInboundGateway:

Message<?> reply = this.sendAndReceiveMessage(message);
if (reply == null) {
    if (logger.isDebugEnabled()) {
        logger.debug("null reply received for " + message + " nothing to send");
    }
    return false;
}

We can reconsider a logic in the TcpInboundGateway for :

if (reply == null && this.errorOnTimeout) {
    if (object instanceof Message) {
        error = new MessageTimeoutException((Message<?>) object, "No reply received within timeout");
    }
    else {
        error = new MessageTimeoutException("No reply received within timeout");
    }
}

But seems for me it really would be better on to rely on the timeout from the client.

UPDATE

I think we can overcome the limitation and meet you requirements with the midflow <gateway>:

<gateway id="gateway" default-request-channel="timeoutChannel" default-reply-timeout="10000"/>

<channel id="timeoutChannel">
    <dispatcher task-executor="executor"/>
</channel>

<service-activator input-channel="requestChannel"
                   ref="gateway"
                   requires-reply="true"/>  

So, the <service-activator> calls <gateway> and waits for reply from there. Requiring the last one, of course, to end up with the ReplyRequiredException, which you can convert into desired MessageTimeoutException in your error flow on the error-channel="errorChannel".

The timeoutChannel is an executor one, making our default-reply-timeout="10000" very useful because we shift a message on the gateway into separate thread immediately and move right from there into reply waiting process wrapped with that timeout on the CountDonwLatch.

Hope that is clear.