2
votes

I'm using Mule 3.5.0 CE edition. I have 2 requirements :

  • 0 message loss
  • be able to manage small to large payload

To manage this with Mule I was thinking of using ActiveMQ BlobMessage for managing the payload and a reliable acquisition pattern. First of all I was wondering if this is the best approach ?

Here what I have created :

  • fileconnector with streaming and a workdirectory
  • AMQ connector with an URI to put my blobmessages in AMQ Jetty web server in persistent mode
  • file endpoint picking the files
  • component getting the input stream and creating a blobmessage
  • jms endpoint sending the blobmessage created

However, in case of AMQ crash I'm losing messages...

I have some warnings in mule "Failure trying to remove file '...' from list of files under processing and the following error :

ERROR 2015-05-23 12:55:38,291 [[opx].File.receiver.01] org.mule.exception.DefaultMessagingExceptionStrategy: 
********************************************************************************
Message               : Cannot process event as "Active_MQ" is stopped
Type                  : org.mule.api.lifecycle.LifecycleException
Code                  : MULE_ERROR-70167
JavaDoc               : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html
********************************************************************************
Exception stack is:
1. Cannot process event as "Active_MQ" is stopped (org.mule.api.lifecycle.LifecycleException)
  org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor:38 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html)
********************************************************************************
Root Exception stack trace:
org.mule.api.lifecycle.LifecycleException: Cannot process event as "Active_MQ" is stopped
    at org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor.handleUnaccepted(ProcessIfStartedMessageProcessor.java:38)
    at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:44)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.endpoint.DefaultOutboundEndpoint.process(DefaultOutboundEndpoint.java:100)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.construct.DynamicPipelineMessageProcessor.process(DynamicPipelineMessageProcessor.java:54)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:51)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:40)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:109)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.construct.AbstractPipeline$3.process(AbstractPipeline.java:207)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule...
********************************************************************************

EDIT : Here is the configuration.

Flow :

<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616?jms.redeliveryPolicy.initialRedeliveryDelay=3000&amp;jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/"  validateConnections="true" maxRedelivery="-1" cacheJmsSessions="false" persistentDelivery="true"   doc:name="Active MQ" >
    <reconnect frequency="60000" count="20"/>
</jms:activemq-connector>
<file:connector name="File" workDirectory="/home/fs/workDirectory" autoDelete="true" streaming="true"  validateConnections="true"/>
<flow name="ReceiveFromFS" processingStrategy="synchronous">
    <file:inbound-endpoint path="/home/fs/in" pollingFrequency="5000" fileAge="1000" connector-ref="File"/>
    <component class="mypackage.InputStreamToBlobMessage" doc:name="Java"/>
    <jms:outbound-endpoint queue="queue1" connector-ref="Active_MQ" doc:name="JMS"/>
</flow>

My java component :

public class InputStreamToBlobMessage implements Callable {

@Override
public Object onCall(MuleEventContext eventContext) throws Exception {

    MuleMessage muleMsg = eventContext.getMessage();
    InputStream is = (InputStream) muleMsg.getPayload();
    JmsConnector amqConnector = (JmsConnector) eventContext.getMuleContext().getRegistry().lookupConnector("Active_MQ");
    BlobMessage bm = null;
    if (amqConnector.isConnected())
    {
        ActiveMQSession session = (ActiveMQSession) amqConnector.getSession(false, false);
        bm = session.createBlobMessage(is);
    }

    return bm;
}

}

2
Can you add in your configurations please.Sudarshan
Just updated the question with the configuration. Regarding the java component, after creating the blob message I would have like to close the stream but if I do so I have an error stream closed...Astr0
Thanks, I did'nt know that ActiveMQ had this capability and this looks a little beyond me, however I am upvoting the question and looking forward to a solutionSudarshan

2 Answers

0
votes

In this case it looks like your reconnection attempts have been exhausted and that is why your connector stays at the 'stopped' state.

Please try and replace your reconnection strategy with

<reconnect-forever frequency="60000" />
0
votes

Here the config I used to make it works :

<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616" connectionFactory-ref="connectionFactory" validateConnections="true" maxRedelivery="-1" persistentDelivery="true" doc:name="Active MQ">
</jms:activemq-connector>
    <spring:beans>
    <spring:bean name="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" doc:name="Bean">
        <spring:property name="redeliveryPolicy">
            <spring:bean name="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
                <spring:property name="redeliveryDelay" value="60000"/>
                <spring:property name="maximumRedeliveries" value="20"/>
                <spring:property name="initialRedeliveryDelay" value="10000"/>
            </spring:bean>
        </spring:property>
        <spring:property name="blobTransferPolicy">
            <spring:bean name="blobTransferPolicy" class="org.apache.activemq.blob.BlobTransferPolicy">
                <spring:property name="defaultUploadUrl" value="http://localhost:8161/fileserver/"/>
            </spring:bean>
        </spring:property>
    </spring:bean>
</spring:beans>
<file:connector name="File" workDirectory="/home/fs/workDirectory" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>
<flow name="ReceiveFromFS" processingStrategy="synchronous">
  <file:inbound-endpoint path="/home/fs/in" pollingFrequency="5000" fileAge="1000" connector-ref="File"/>
  <component class="mypackage.InputStreamToBlobMessage" doc:name="Java"/>
    <jms:outbound-endpoint queue="queue1" connector-ref="Active_MQ" doc:name="JMS">
        <jms:transaction action="ALWAYS_BEGIN"/>
    </jms:outbound-endpoint>
</flow>

However, when I was using a connectionFactory in MuleStudio, it's was not working properly and did manage the retries. Bug ?? Using the same config in Mule embedded in Tomcat worked fine.

One more thing, in Mule 3.6 the JMS sessions are cached by default so the Session can be accessed the way I did in the component or cacheJmsSessions="false" has to be used.

Voilà :)