3
votes

I've been tracking down some strange behavior and I've finally managed to isolate it, and I believe that it is a bug. To reproduce the behaviour I have created 4 test flows (in 4 different mule files):

<flow name="testhttpFlow">
    <http:listener config-ref="HTTP_Listener_AMIAB" path="/testsend" allowedMethods="GET, POST" doc:name="HTTP"/>
    <amqp:outbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" routingKey="masterMeep" exchangeType="direct" responseTimeout="10000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9"/>
</flow>

<flow name="testsendFlow" >
    <amqp:inbound-endpoint responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP-0-9" connector-ref="connector.amqp.mule.default" exchangeName="AMQP.DEFAULT.EXCHANGE" exchangeType="direct" queueDurable="true" queueName="masterMeep"/>
    <set-payload value="#[new java.util.ArrayList()]" doc:name="Set Payload to Collection"/>
    <scripting:component doc:name="JavaScript">
        <scripting:script engine="JavaScript"><![CDATA[payload.add("something1");
        payload.add("something2");
        result=payload;]]>
    </scripting:script>
    </scripting:component>
    <set-variable variableName="payloadCollection" value="#[payload]" doc:name="Set payloadCollection"/>
    <foreach doc:name="For Each">
        <choice doc:name="Choice">
            <when expression="payload == 'something1'">
                <amqp:outbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" routingKey="meep" exchangeType="direct" responseTimeout="60000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9 meep1"/>
            </when>
            <when expression="payload == 'something2'">
                <amqp:outbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" routingKey="meep2" exchangeType="direct" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP-0-9 meep2" connector-ref="connector.amqp.mule.default"/>
            </when>
            <otherwise>
                <logger level="INFO" doc:name="Logger"/>
            </otherwise>
        </choice>
        <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="To String"/>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        <scripting:transformer doc:name="Replace array element">
            <scripting:script engine="JavaScript"><![CDATA[var payload = message.getPayload();
                var payloadCollection = message.getInvocationProperty('payloadCollection');
                var counter = message.getInvocationProperty('counter');
                payloadCollection.set(counter - 1, payload);
                result = payload;]]></scripting:script>
        </scripting:transformer>
    </foreach>
    <logger level="INFO" doc:name="Logger" message="#[payload]"/>
</flow>

<flow name="testreceiveFlow">
    <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="meep"  exchangeType="direct" queueDurable="true" responseTimeout="60000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Set Payload"/>
    <set-variable variableName="subPayload" value="#[payload]" doc:name="Variable"/>
   <http:request config-ref="HTTP_Request_Polestar" path="/api/query/ServiceProxyConfigurationLookup" method="GET" doc:name="HTTP Get ConfigXml" followRedirects="true">
        <http:request-builder>
            <http:query-param paramName="ApiName" value="customer"/>
            <http:query-param paramName="Action" value="GET"/>
            <http:query-param paramName="Section" value="TestGPO"/>
        </http:request-builder>
    </http:request>
    <json:json-to-object-transformer doc:name="JSON to Object" returnClass="java.lang.Object"/>
    <set-variable variableName="serviceProxyConfigurationLookup" value="#[payload]" doc:name="Variable"/>
    <set-payload value="#[payload[0].ServiceProxyConfiguration.ConfigurationList + &quot;\nTestReceive\n&quot; + flowVars.subPayload]" doc:name="Set Payload"/>
    <logger level="INFO" doc:name="Logger" message="&quot;Configuration XML =\n #[payload]&quot;"/>
</flow>

<flow name="testreceiveFlow2">
    <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="meep2"  exchangeType="direct" queueDurable="true" responseTimeout="60000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Set Payload"/>
    <set-variable variableName="subPayload" value="#[payload]" doc:name="Variable"/>
   <http:request config-ref="HTTP_Request_Polestar" path="/api/query/ServiceProxyConfigurationLookup" method="GET" doc:name="HTTP Get ConfigXml" followRedirects="true">
        <http:request-builder>
            <http:query-param paramName="ApiName" value="customer"/>
            <http:query-param paramName="Action" value="GET"/>
            <http:query-param paramName="Section" value="TestGPO"/>
        </http:request-builder>
    </http:request>
    <json:json-to-object-transformer doc:name="JSON to Object" returnClass="java.lang.Object"/>
    <set-variable variableName="serviceProxyConfigurationLookup" value="#[payload]" doc:name="Variable"/>
    <set-payload value="#[payload[0].ServiceProxyConfiguration.ConfigurationList + &quot;\nTestReceive2\n&quot; + flowVars.subPayload]" doc:name="Set Payload"/>
    <logger level="INFO" doc:name="Logger" message="&quot;Configuration XML =\n #[payload]&quot;"/>
</flow>

Originally, there was no testhttp flow. The endpoint connector in testsend flow was an HTTP connector, and that worked perfectly. Immediately I added the additional flow on the front, moving the HTTP connector to that flow, and adding the additional request-response AMQP (which is therefore held waiting for the inner AMQP request-reponses to complete) the inner AMQP request-response connections stopped working.

Now, they just hang on response, until the timeout occurs, then return with NullPayload.

This doesn't seem like correct bahaviour to me, and feels like some sort of bug to do with have two "layers" of AMQP Request-Response going on at one time.

Has anyone got this pattern working correctly?

Thanks!

It seems that the nested AMQP request-response interaction prevents the first one to work. To be sure, can you send a test message with a payload != something1 and something2 so the flow will go to the otherwise block and there will be no nested AMQP interaction?David Dossot
Thanks David. Yes, I did that during my testing, but have just done it again by inserting an additional element "somethingElse" into position 0 in the collection. That hits the logger in the otherwise block. Also if I remove the initial AMQP connector (and replace it with a VM or Subflow) everything works as it should too. So it definitely seems to be nested AMQP Request-Responses which are the problem.Nich Overend
That definitely sounds like a bug IMO... I suggest you report it: github.com/mulesoft/mule-transport-amqp/issuesDavid Dossot
Thanks David. Have done so.Nich Overend
I reported the bug as suggested, but there's no indication that anyone has even seen the report a month later, so I don't know if it's done any good.Nich Overend