0
votes

I have a TCP inbound endpoint which references a TCP connector. This is a request-response endpoint. The TCP Client is a 3rd party application which sends requests on only one socket. This is how I have set up my TCP endpoint. Endpoint:

<tcp:inbound-endpoint exchange-pattern="request-response"
            responseTimeout="10000" doc:name="TCP" address="${Endpoint}" encoding="ISO-8859-1" connector-ref="TCP"/>

Connector:

<tcp:connector name="TCP" doc:name="TCP connector"
    clientSoTimeout="${Client_SO_Timeout}" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="${Server_SO_Timeout}" socketSoLinger="0"
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true">
            <receiver-threading-profile maxThreadsActive="${TCP_MaxThreadsActive}" maxThreadsIdle = "${TCP_MaxThreadsIdle}" />
    <reconnect-forever />
    <service-overrides messageReceiver="CustomMessageReceiver" />
    <tcp:custom-protocol ref="CustomLengthProtocol" />
</tcp:connector>

The flow is working fine. But when concurrent requests have to be processed, the last few requests are timing out. What I understand from this is that the messages are waiting at the receiver to be processed (since only one TCP session is used) until the previous request is completed by the mule flow.

In order to tune this, I am looking for a way to change the mule flow as below: After receiving the requests from the client, I need to send it to the mule-flow which may process it asynchronously and push the response to the same socket. Once the request is received at the endpoint, it does not need to wait for the previous request's flow to complete before processing the next request. There is no requirement to keep the sequence of request/responses from the mule flow. Is there a way to achieve this by extending the mule TCP endpoint functionality? This is similar to the Queued Asynchronous Flow processing strategy, except that the response has to be sent back to the original TCP socket.

1

1 Answers

0
votes

This is how I resolved this : (i) Instead of using a tcp request-response inbound endpoint, I am using a TCP inbound (one-way) and TCP outbound (one-way) (ii) The TCP-Inbound receives the request, the custom length protocol splits the message and provides it to the flow. (iii) Use a request-reply scope with VM endpoints (inbound & outbound) (iv) The vm endpoints direct the message to a separate flow which has a processingStrategy as "queued-asynchronous". I am planning to have the maxThreads set up on this flow level for thread-pooling. (v) The second flow performs the business logic and response is sent back to the main-flow to be sent across to the socket. (vi) Inorder to access the socket from the inbound endpoint, I have overriden the preRouteMuleMessage method in TCPMessageReceiver class and added an outbound property called "ClientSocket" to the mulemessage. I then propagate this property to the end of the main flow to the outbound endpoint. At the TCPOutbound endpoint, I created my own TCPMessageDispatcher and extended the doDispatch method. Instead of using a socket from the outbound endpoint's thread pool, I use the socket object that was shared as part of the mulemessage. Sample Flow:

<tcp:connector name="TCP" doc:name="TCP connector"
    clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0"
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true" keepSendSocketOpen="true">
    <receiver-threading-profile
        maxThreadsActive="1" maxThreadsIdle="1" />
    <reconnect-forever />
    <service-overrides messageReceiver="CustomMessageReceiver" />

    <tcp:custom-protocol ref="CustomLengthProtocol" />
</tcp:connector>

<tcp:connector name="TCP2" doc:name="TCP connector"
    clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0"
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true"
    keepSendSocketOpen="true">
    <receiver-threading-profile
        maxThreadsActive="1" maxThreadsIdle="1" />
    <reconnect-forever />
    <service-overrides dispatcherFactory="CustomMessageDispatcherFactory"/>
    <tcp:custom-protocol ref="CustomLengthProtocol" />

</tcp:connector>


<spring:beans>
    <spring:bean id="CustomLengthProtocol" name="CustomLengthProtocol"
        class="CustomLengthProtocol" />
</spring:beans>
<flow name="tcptestFlow" doc:name="tcptestFlow">
    <tcp:inbound-endpoint address="tcp://localhost:4444" 
        responseTimeout="100000"  doc:name="TCP" connector-ref="TCP" />
    <byte-array-to-string-transformer
        doc:name="Byte Array to String" />
    <logger level="INFO" category="Expression" doc:name="Logger" />

    <set-session-variable variableName="Socket"
        value="#[message.outboundProperties['ClientSocket']]"
        doc:name="Session Variable" />
    <logger message="#[payload] - #[Socket]" level="INFO" category="Request"
        doc:name="Logger" />

    <request-reply doc:name="Request-Reply">
        <vm:outbound-endpoint exchange-pattern="one-way"
            path="/qin" doc:name="VM" >
            <message-properties-transformer scope="outbound"> 
                <delete-message-property key="MULE_REPLYTO"></delete-message-property>  
            </message-properties-transformer> 
          </vm:outbound-endpoint>
        <vm:inbound-endpoint exchange-pattern="one-way"
            path="/qout" doc:name="VM" />
    </request-reply>
    <logger message="#[payload]" level="INFO" doc:name="Logger"
        category="Response" />
    <string-to-byte-array-transformer
        doc:name="String to Byte Array" />
    <tcp:outbound-endpoint address="tcp://localhost:4444" 
        responseTimeout="10000" doc:name="TCP" connector-ref="TCP2" />

</flow>
<flow name="tcptestFlow1" processingStrategy="queued-asynchronous"
    doc:name="tcptestFlow1">

    <vm:inbound-endpoint exchange-pattern="one-way"
        path="/qin" doc:name="VM" />
    <logger message="Inside VM Flow" level="INFO" doc:name="Logger" />
    <set-payload value="Appended Response - #[payload]"
        doc:name="Set Payload" />
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="/qout" doc:name="VM" />

</flow>