1
votes

I'm using the AMQP endpoint and have got it working (with one caveat mentioned here [Possible bug with Mule AMQP transport 3.6.2 community) in other places in my Mule application.

I'm trying to build a reliable SMTP queue but having problems.

I made the assumption that if the SMTP connector couldn't perform, it would throw an exception, so by setting message acknowledgement to MANUAL, and catching SMTP exceptions and rejecting (and requeuing) the message in the exception strategy the system should deal with SMTP outages.

My code (slightly simplified) looks like this:

<amqp:connector name="connector.amqp.mule.default" ackMode="MANUAL" doc:name="AMQP Connector" validateConnections="true" host="${amiab-rabbitmq-hostname}" port="${amiab-rabbitmq-portnumber}" password="${amiab-rabbitmq-password}" username="${amiab-rabbitmq-username}"/>
<smtp:connector name="smtpConnector" contentType="text/html" doc:name="SMTPConnector"/>
<flow name="utility-smtpFlow">
  <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="utilitySMTP" exchangeType="direct"  responseTimeout="10000"  doc:name="AMQP-0-9" queueDurable="true"/>
    <logger message="Recevied SMTP Message from Queue" level="INFO" doc:name="Logger"/>
    <smtp:outbound-endpoint host="${smtp-hostname}" user="${smtp-username}" password="{smtp-password}" to="#[flowVars.smtpTo]" subject="Testing Message" responseTimeout="10000" doc:name="SMTP"/>
    <amqp:acknowledge-message doc:name="AMQP-0-9 Acknowledge Message"/>
    <catch-exception-strategy doc:name="Catch Exception Strategy">
        <logger message="SMTP Error, Requeuing" level="INFO" doc:name="Logger"/>
        <amqp:reject-message requeue="true" doc:name="AMQP-0-9 Reject Message"/>
    </catch-exception-strategy>

When my SMTP connector throws an exception, the exception strategy is correctly instigated, but then AMQP throws an error as shown below:

INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default response transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,387 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,389 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,446 [[amiab-esb-utility].utility-smtpFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: Recevied SMTP Message from Queue
INFO  2015-06-18 16:32:22,457 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.email.transformers.ObjectToMimeMessage
INFO  2015-06-18 16:32:22,481 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'smtpConnector.dispatcher.404056326'. Object is: SmtpMessageDispatcher
ERROR 2015-06-18 16:32:22,568 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.exception.CatchMessagingExceptionStrategy: 
********************************************************************************
Message               : Unable to connect to mail transport.
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. mail.lyrical.co.uk (java.net.UnknownHostException)
  java.net.AbstractPlainSocketImpl:178 (null)
2. Unknown SMTP host: mail.lyrical.co.uk (javax.mail.MessagingException)
  com.sun.mail.smtp.SMTPTransport:1704 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/mail/MessagingException.html)
3. Unable to connect to mail transport. (org.mule.api.endpoint.EndpointException)
  org.mule.transport.email.SmtpMessageDispatcher:67 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/endpoint/EndpointException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.UnknownHostException: mail.lyrical.co.uk
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2015-06-17 15:25:47,190 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.api.processor.LoggerMessageProcessor: SMTP Error, Requeuing
WARN  2015-06-17 15:25:47,193 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MessageReceiverConsumer: Received shutdown signal for consumer tag: amq.ctag-NZ96mnj_oscnpJKAPy16ng, the message receiver will try to restart.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=90)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[?:?]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[?:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[?:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Connecting clusterizable message receiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,196 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Starting clusterizable message receiver
INFO  2015-06-17 15:25:47,203 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Started subscription: amq.ctag-sCCRCnOWKyREyip26pSYDA on channel: AMQChannel(amqp://[email protected]:5672/,3)

Have I missed something obvious, or am I misunderstanding how this should be used? Very grateful if anyone can point me in the right direction.

1
Do you mind showing a little more of the log lines, for ex. from the "Recevied SMTP Message from Queue" line included? If you could add a plain <logger level="INFO" /> right after it as well, we would then see the message properties (including delivery tag). Thanks.David Dossot
Thanks David. I've edited the error log to show the complete log.Nich Overend
I have a theory where acknowledge-message is actually called even in case of exception. Do you mind adding a logger right before it to see if this theory is correct?David Dossot
You're right! It's called immediately. This is a threading compartment issue. It acknowledges the message instantly before the SMTP component has a chance to get going and throw the exception (on a new thread). The SMTP component is not working in a request-response fashion and holding up the flow until it's finished. Any suggestions to get around that?Nich Overend
Got it! Force "synchronous" processing strategy on the main flow, and everything works as it should, except that it re-queues and then is re-delivered immediately in a loop. Is it possible to re-queue with an instruction for the AMQP broker to hold the message for a period of time before re-delivery?Nich Overend

1 Answers

1
votes

As you have noted in your comment, the smtp:outbound-endpoint being a one-way endpoint, its dispatch operation is performed in another thread, thus it occurs in parallel with the execution of the flow. Therefore, amqp:acknowledge-message gets called anyway, independently of the success or failure of the smtp:outbound-endpoint operation.

You can get around this issue by setting processingStrategy="synchronous" on the flow: this will force all the endpoint interactions to be performed on the inbound thread.

Shameless plug: this is discussed in details in chapter 11.2 of Mule in Action, Second Edition.