I'm using Mule 3.5.0 CE edition. I have 2 requirements :
- 0 message loss
- be able to manage small to large payload
To manage this with Mule I was thinking of using ActiveMQ BlobMessage for managing the payload and a reliable acquisition pattern. First of all I was wondering if this is the best approach ?
Here what I have created :
- fileconnector with streaming and a workdirectory
- AMQ connector with an URI to put my blobmessages in AMQ Jetty web server in persistent mode
- file endpoint picking the files
- component getting the input stream and creating a blobmessage
- jms endpoint sending the blobmessage created
However, in case of AMQ crash I'm losing messages...
I have some warnings in mule "Failure trying to remove file '...' from list of files under processing and the following error :
ERROR 2015-05-23 12:55:38,291 [[opx].File.receiver.01] org.mule.exception.DefaultMessagingExceptionStrategy: ******************************************************************************** Message : Cannot process event as "Active_MQ" is stopped Type : org.mule.api.lifecycle.LifecycleException Code : MULE_ERROR-70167 JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html ******************************************************************************** Exception stack is: 1. Cannot process event as "Active_MQ" is stopped (org.mule.api.lifecycle.LifecycleException) org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor:38 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html) ******************************************************************************** Root Exception stack trace: org.mule.api.lifecycle.LifecycleException: Cannot process event as "Active_MQ" is stopped at org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor.handleUnaccepted(ProcessIfStartedMessageProcessor.java:38) at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.endpoint.DefaultOutboundEndpoint.process(DefaultOutboundEndpoint.java:100) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.construct.DynamicPipelineMessageProcessor.process(DynamicPipelineMessageProcessor.java:54) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:51) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:40) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:109) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.construct.AbstractPipeline$3.process(AbstractPipeline.java:207) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule... ********************************************************************************
EDIT : Here is the configuration.
Flow :
<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616?jms.redeliveryPolicy.initialRedeliveryDelay=3000&jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/" validateConnections="true" maxRedelivery="-1" cacheJmsSessions="false" persistentDelivery="true" doc:name="Active MQ" >
<reconnect frequency="60000" count="20"/>
</jms:activemq-connector>
<file:connector name="File" workDirectory="/home/fs/workDirectory" autoDelete="true" streaming="true" validateConnections="true"/>
<flow name="ReceiveFromFS" processingStrategy="synchronous">
<file:inbound-endpoint path="/home/fs/in" pollingFrequency="5000" fileAge="1000" connector-ref="File"/>
<component class="mypackage.InputStreamToBlobMessage" doc:name="Java"/>
<jms:outbound-endpoint queue="queue1" connector-ref="Active_MQ" doc:name="JMS"/>
</flow>
My java component :
public class InputStreamToBlobMessage implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
MuleMessage muleMsg = eventContext.getMessage();
InputStream is = (InputStream) muleMsg.getPayload();
JmsConnector amqConnector = (JmsConnector) eventContext.getMuleContext().getRegistry().lookupConnector("Active_MQ");
BlobMessage bm = null;
if (amqConnector.isConnected())
{
ActiveMQSession session = (ActiveMQSession) amqConnector.getSession(false, false);
bm = session.createBlobMessage(is);
}
return bm;
}
}