I have an application that required persistent TCP connections. So I set some appropriate attributes on the Tcp connector to support this. The issue I am having is if an external application goes down, and restarts, Mule will continue trying to send the messages to the socket and not try to re-establish connection.
<mule ...>
<tcp:connector name="tcpConn" doc:name="TCP connector" keepAlive="true" keepSendSocketOpen="true" reuseAddress="true" validateConnections="true">
<reconnect-forever frequency="2000" />
<tcp:direct-protocol payloadOnly="true"/>
</tcp:connector>
<flow name="ExampleFlow1" doc:name="ExampleFlow1">
<vm:inbound-endpoint exchange-pattern="one-way" path="msg.in" doc:name="VM">
<vm:transaction action="ALWAYS_BEGIN"/>
</vm:inbound-endpoint>
<tcp:outbound-endpoint exchange-pattern="one-way" host="127.0.0.1" port="50002" responseTimeout="10000" doc:name="TCP"/>
</flow>
<flow name="example1" doc:name="example1">
<tcp:inbound-endpoint exchange-pattern="one-way" host="0.0.0.0" port="50001" responseTimeout="10000" doc:name="TCP"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="msg.in" doc:name="VM">
</vm:outbound-endpoint>
</flow>
</mule>
Here is my error log:
ERROR 2014-04-30 17:11:35,436 [[chatroomexample].connector.VM.mule.default.receiver.04] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=tcp://127.0.0.1:50002, connector=TcpConnector
{
name=tcpConn
lifecycle=start
this=198bdbc
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[tcp]
serviceOverrides=<none>
}
, name='endpoint.tcp.127.0.0.1.50002', mep=ONE_WAY, properties={}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: byte[]
Type : org.mule.api.transport.DispatchException
Code : MULE_ERROR--2
Payload : [B@7cfbc3
JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
********************************************************************************
Exception stack is:
1. Connection reset by peer: socket write error (java.net.SocketException)
java.net.SocketOutputStream:-2 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=tcp://127.0.0.1:50002, connector=TcpConnector
{
name=tcpConn
lifecycle=start
this=198bdbc
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[tcp]
serviceOverrides=<none>
}
, name='endpoint.tcp.127.0.0.1.50002', mep=ONE_WAY, properties={}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: byte[] (org.mule.api.transport.DispatchException)
org.mule.transport.AbstractMessageDispatcher:109 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
at java.io.BufferedOutputStream.flush(Unknown Source)
at org.mule.transport.tcp.TcpMessageDispatcher.write(TcpMessageDispatcher.java:129)
at org.mule.transport.tcp.TcpMessageDispatcher.dispatchToSocket(TcpMessageDispatcher.java:122)
at org.mule.transport.tcp.TcpMessageDispatcher.doDispatch(TcpMessageDispatcher.java:50)
at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:99)
at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2627)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:101)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.endpoint.outbound.OutboundResponsePropertiesMessageProcessor.process(OutboundResponsePropertiesMessageProcessor.java:39)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:50)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:47)
at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:20)
at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:58)
at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:48)
at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:54)
at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:44)
at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:44)
at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:52)
at org.mule.execution.TransactionalExecutionTemplate.execute(TransactionalExecutionTemplate.java:69)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor.process(EndpointTransactionalInterceptingMessageProcessor.java:56)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.Messa...
********************************************************************************
Here's the idea:
- A simple TCP application that connects to Mule ESB at port 50001 and sends a hello world message.
- Mule ESB receives the message and puts it in a VM queue. The VM queue is because I am looking to implement a reliability messaging pattern as discussed in Mule in Action, 2nd Ed.
- We pop the message off the queue and send it transactionally to the outbound endpoint at 127.0.0.1:50002, where I have a netcat type of application to simply listen and echo messages. I set a transaction because I need to ensure the message is delivered.
- The first time we start up all the applications, the messages go through. Great!
- I restart the netcat application (from step 3).
- I send another message into Mule ESB at port 50001, and now I get the above exception.
Here is what I tried:
- Setting the reconnection strategy in the TCP Connector, but I either don't understand it or it is not doing what it is supposed to do, which is to attempt reconnection to the external TCP applications.
- Using an Until Successful processing scope. We reattempt delivery until the retry count is exhausted but I don't observe any attempts to reconnect to the external TCP application. We are continuously writing to a broken pipe.
- No transactions. This yields an expected result of dropping the message after printing a stack trace.
Is this functionality supported out of the box, or would I need to say, implement a custom RetryPolicy & RetryPolicyTemplate to apply onto my TCP Connector? If I need to implement a custom one, how would I get a hold of the broken socket and re-establish connection?
TIA!