2
votes

I have an application that required persistent TCP connections. So I set some appropriate attributes on the Tcp connector to support this. The issue I am having is if an external application goes down, and restarts, Mule will continue trying to send the messages to the socket and not try to re-establish connection.

<mule ...>
    <tcp:connector name="tcpConn" doc:name="TCP connector" keepAlive="true" keepSendSocketOpen="true" reuseAddress="true"  validateConnections="true">
        <reconnect-forever frequency="2000" />
        <tcp:direct-protocol payloadOnly="true"/>
    </tcp:connector>
    <flow name="ExampleFlow1" doc:name="ExampleFlow1">
        <vm:inbound-endpoint exchange-pattern="one-way" path="msg.in" doc:name="VM">
           <vm:transaction action="ALWAYS_BEGIN"/>
        </vm:inbound-endpoint>
        <tcp:outbound-endpoint exchange-pattern="one-way" host="127.0.0.1" port="50002" responseTimeout="10000" doc:name="TCP"/>
    </flow>

    <flow name="example1" doc:name="example1">
        <tcp:inbound-endpoint  exchange-pattern="one-way" host="0.0.0.0" port="50001" responseTimeout="10000" doc:name="TCP"/>
        <vm:outbound-endpoint exchange-pattern="one-way" path="msg.in" doc:name="VM">
        </vm:outbound-endpoint>
    </flow>
</mule>

Here is my error log:

ERROR 2014-04-30 17:11:35,436 [[chatroomexample].connector.VM.mule.default.receiver.04] org.mule.exception.DefaultMessagingExceptionStrategy: 
********************************************************************************
Message               : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=tcp://127.0.0.1:50002, connector=TcpConnector
{
  name=tcpConn
  lifecycle=start
  this=198bdbc
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[tcp]
  serviceOverrides=<none>
}
,  name='endpoint.tcp.127.0.0.1.50002', mep=ONE_WAY, properties={}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: byte[]
Type                  : org.mule.api.transport.DispatchException
Code                  : MULE_ERROR--2
Payload               : [B@7cfbc3
JavaDoc               : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
********************************************************************************
Exception stack is:
1. Connection reset by peer: socket write error (java.net.SocketException)
  java.net.SocketOutputStream:-2 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=tcp://127.0.0.1:50002, connector=TcpConnector
{
  name=tcpConn
  lifecycle=start
  this=198bdbc
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[tcp]
  serviceOverrides=<none>
}
,  name='endpoint.tcp.127.0.0.1.50002', mep=ONE_WAY, properties={}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: byte[] (org.mule.api.transport.DispatchException)
  org.mule.transport.AbstractMessageDispatcher:109 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.flush(Unknown Source)
    at org.mule.transport.tcp.TcpMessageDispatcher.write(TcpMessageDispatcher.java:129)
    at org.mule.transport.tcp.TcpMessageDispatcher.dispatchToSocket(TcpMessageDispatcher.java:122)
    at org.mule.transport.tcp.TcpMessageDispatcher.doDispatch(TcpMessageDispatcher.java:50)
    at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:99)
    at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2627)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:101)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.endpoint.outbound.OutboundResponsePropertiesMessageProcessor.process(OutboundResponsePropertiesMessageProcessor.java:39)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:50)
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:47)
    at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:20)
    at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:58)
    at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:48)
    at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:54)
    at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:44)
    at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:44)
    at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:52)
    at org.mule.execution.TransactionalExecutionTemplate.execute(TransactionalExecutionTemplate.java:69)
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor.process(EndpointTransactionalInterceptingMessageProcessor.java:56)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.Messa...
********************************************************************************

Here's the idea:

  1. A simple TCP application that connects to Mule ESB at port 50001 and sends a hello world message.
  2. Mule ESB receives the message and puts it in a VM queue. The VM queue is because I am looking to implement a reliability messaging pattern as discussed in Mule in Action, 2nd Ed.
  3. We pop the message off the queue and send it transactionally to the outbound endpoint at 127.0.0.1:50002, where I have a netcat type of application to simply listen and echo messages. I set a transaction because I need to ensure the message is delivered.
  4. The first time we start up all the applications, the messages go through. Great!
  5. I restart the netcat application (from step 3).
  6. I send another message into Mule ESB at port 50001, and now I get the above exception.

Here is what I tried:

  1. Setting the reconnection strategy in the TCP Connector, but I either don't understand it or it is not doing what it is supposed to do, which is to attempt reconnection to the external TCP applications.
  2. Using an Until Successful processing scope. We reattempt delivery until the retry count is exhausted but I don't observe any attempts to reconnect to the external TCP application. We are continuously writing to a broken pipe.
  3. No transactions. This yields an expected result of dropping the message after printing a stack trace.

Is this functionality supported out of the box, or would I need to say, implement a custom RetryPolicy & RetryPolicyTemplate to apply onto my TCP Connector? If I need to implement a custom one, how would I get a hold of the broken socket and re-establish connection?

TIA!

1

1 Answers

0
votes

I solved the issue...kind of...by creating my own custom TcpConnector, TcpMessageDispatcher, and TcpMessageDispatcherFactory classes, as per Ian Gil Ragudo's suggestion

And here is how I define my connector:

    <spring:bean id="dispatcherFactory" class="com.mycompany.mule.tcp.MyTcpMessageDispatcherFactory"/>
    <spring:bean id="protocol" class="org.mule.transport.tcp.protocols.DirectProtocol"/>
    <custom-connector name="myTcpConn" class="com.mycompany.mule.tcp.MyTcpConnector" >
        <spring:property name="dispatcherFactory" ref="dispatcherFactory" />
        <spring:property name="tcpProtocol" ref="protocol"/>
        <spring:property name="keepAlive" value="true"/>
        <spring:property name="keepSendSocketOpen" value="true"/>
        <spring:property name="reuseAddress" value="true"/>
    </custom-connector>

 <tcp:endpoint connector-ref="myTcpConn" exchange-pattern="one-way"  host="0.0.0.0" port="8081" name="TcpEndpoint" responseTimeout="10000" doc:name="TCP"/>

Unfortunately because of the visibility of the methods and instance variables, I had to resolve to copy+pasta the classes, update the casts, and only edited this bit in the MyTcpMessageDispatcher class:

    @Override
    protected synchronized void doDispatch(MuleEvent event) throws Exception
    {
        Socket socket = connector.getSocket(endpoint);
        try 
        {
            dispatchToSocket(socket, event);
        }
        catch (SocketException e)
        {
            System.err.println(e.toString());
            socket.close();
            throw e;
        }
        finally 
        {
            connector.releaseSocket(socket, endpoint);
        }
    }

Known issue: When the external application to which the outbound endpoint goes down, and then comes back up, I seems to lose one message, even with transactions configured. It seems that it still successfully writes to the socket one last time before throwing socket write exceptions.