0
votes

I've been attempting to build a flow that effectively, uses a database as a queue. The reason for this is that other processes are expected to read and reply to this message, and were designed this way. Unfortunately, i have no control over this other process and cannot make it respond to a different queuing system.

So the flow would work like this: HTTP Request to insert a record into a database -> Separate application (outside of mule) polls this database table for messages and responds with another message into a different table(this step may take >5 seconds to respond) -> read this new row and respond to original http request.

In this design, the request-reply scope always times out waiting for the reply to appear. (i manually set it to 20 seconds to show this fairly quickly)

Response timed out (20000ms) waiting for message response id "3e1a7750-ee13-11e6-ae40-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer

i've clearly missed something and can't seem to locate the correct documentation for this from mule. I hope one of the good users of this site may correct the error of my ways.

below is the flow and a sample of the view

 <flow name="mainFlow">
    <http:listener config-ref="HTTP_Listener_Configuration" path="hello" doc:name="HTTP"/>
    <cxf:jaxws-service doc:name="CXF" configuration-ref="CXF_Configuration" serviceClass="kansas.MuleTestServiceImpl"/>
    <request-reply doc:name="Request-Reply" timeout="20000">
        <db:insert config-ref="Oracle_Configuration" doc:name="Database">
            <db:parameterized-query><![CDATA[insert into tblRequest (id, correlationId, replyTo) values (#[message.id], #[message.correlationId], #[message.replyTo])]]></db:parameterized-query>
        </db:insert>
        <jms:inbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS">
            <jms:transaction action="JOIN_IF_POSSIBLE"/>
        </jms:inbound-endpoint>
    </request-reply>
    <logger message="payload is #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="databasePoller">
    <poll doc:name="Poll">
        <fixed-frequency-scheduler frequency="5000"/>
        <db:select config-ref="Oracle_Configuration" doc:name="Database">
            <db:parameterized-query><![CDATA[select id,correlationId,msgresponse,replyto from tblResponse]]></db:parameterized-query>
        </db:select>
    </poll>
    <foreach collection="#[payload]" doc:name="For Each">
        <set-variable variableName="storedPayload" value="#[payload]" doc:name="storePayload"/>
        <db:delete config-ref="Oracle_Configuration" doc:name="Database">
            <db:parameterized-query><![CDATA[delete from tblResponse where correlationId = #[storedPayload.correlationId]]]></db:parameterized-query>
        </db:delete>
        <set-payload value="#[flowVars.storedPayload]" doc:name="restorePayload"/>
        <message-properties-transformer overwrite="true" doc:name="Message Properties">
            <add-message-property key="MULE_CORRELATION_ID" value="#[payload.ID]"/>
            <add-message-property key="MULE_REPLYTO" value="#[payload.REPLYTO]"/>
        </message-properties-transformer>
        <set-payload value="#[payload.MSGRESPONSE]" doc:name="Set Payload"/>
        <jms:outbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"/>
        <logger level="INFO" doc:name="Logger"/>
    </foreach>
</flow>

enter image description here

EXCEPTION BELOW


Message : Response timed out (20000ms) waiting for message response id "b9a93d10-efa4-11e6-808b-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer Type : org.mule.api.routing.ResponseTimeoutException Code : MULE_ERROR--2 JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/routing/ResponseTimeoutException.html Payload : 1

Root Exception stack trace: org.mule.api.routing.ResponseTimeoutException: Response timed out (20000ms) waiting for message response id "b9a93d10-efa4-11e6-808b-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.receiveAsyncReply(AbstractAsyncRequestReplyRequester.java:283) at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:89)

2
Three things could help. First could you describe a bit more detailed what you would like to achieve. Second add the full log entries and third you can try to refactor. Move the DB part into a subflow that you call via a VM queue. Add a VM queue outbound (as exchange pattern request-reply) in the request reply scope and call the new flow with the VM and DB connector. That way Mule might be able to actually respond as it might not find the response (temp queue in jMS scenario) in your current configuration.brazo
Also that information might be more enlightening too: ricston.com/blog/…brazo

2 Answers

0
votes

I simplified the flow a bit to get it working. It might be a fitting solution if you instead of polling the DB in a separate flow trigger it with a VM queue.

<flow name="mainFlow">
    <http:listener config-ref="HTTP_Listener_Configuration"
        path="hello" doc:name="HTTP" />
    <dw:transform-message doc:name="Transform Message">
        <dw:set-payload>
          <![CDATA[%dw 1.0 %output application/java
          ---
          {
             name: "abc",
             euro: 130,
             usd: 123
           }]]></dw:set-payload>
    </dw:transform-message>
    <request-reply doc:name="Request-Reply" timeout="20000">
        <vm:outbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM">
             <message-properties-transformer scope="outbound"> <delete-message-property key="MULE_REPLYTO"/> </message-properties-transformer>
            </vm:outbound-endpoint>
        <vm:inbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
    </request-reply>
    <logger message="#[message.payloadAs(java.lang.String)]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="soFlow">
    <vm:inbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"/>
    <db:insert config-ref="MySQL_Configuration" doc:name="Database">
        <db:parameterized-query><![CDATA[insert into item (
           item_name,
           price_euro,
           price_usd) 
              values (#[payload.name], #[payload.euro], #[payload.usd])]]>              </db:parameterized-query>
    </db:insert>
    <vm:outbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
</flow>
<flow name="databasePoller">
    <vm:inbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
    <db:select config-ref="MySQL_Configuration" doc:name="Database">
        <db:parameterized-query><![CDATA[select item_name,
        price_euro,
        price_usd  from item]]></db:parameterized-query>
    </db:select>
    <vm:outbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
</flow>
0
votes

I believe i've solved this. I got this working completely with a HTTP request endpoint instead of a database poller. When reviewing the message, it appears that one difference was through my database poller, mule places 2 additional properties. MULE_CORRELATION_SEQUENCE and MULE_CORRELATION_GROUP_SIZE.

By removing these properties just before sending the message to jms, allowed the request-reply scope to correctly identify the response in the jms queue.

<remove-property propertyName="MULE_CORRELATION_SEQUENCE" doc:name="Property"/> <remove-property propertyName="MULE_CORRELATION_GROUP_SIZE" doc:name="Property"/>