I have a staging queue with two JMS connector in Mule Application for reading and writing on the queue. If I stage/enqueue 40 messages on the queue only 20 messages are getting dequeued remaining messages sit as pending messages for processing and doesn't dequeue until Mule App is not restarted.
Below is the code for connectors and mule flow. Can anybody suggest what the problem may be here? And Why message processing stops at 20 messages.
Let me know if more information is required.
<!-- PROCESSING BEAN WHICH HAS ALL STREAM INTERACTION AND ERROR HANDLING -->
<spring:bean name="processorPR" class="net.prcins.esb.claims.vci.policereports.processing.PoliceReportProcessor" scope="prototype">
<spring:property name="policeReportBO" ref="policeReportBO" />
</spring:bean>
<!-- This is for validation of inital transformation -->
<spring:bean name="receiverPR" class="net.prcins.esb.claims.vci.policereports.staging.PoliceReportReceiver" scope="prototype" />
<!-- PROCESSING BEAN WHICH HAS ALL BUSINESS LOGIC -->
<spring:bean name="policeReportBO" class="net.prcins.esb.claims.vci.policereports.processing.PoliceReportBO" scope="prototype">
<spring:property name="rulesFileCovLogic_NE" value="${policeReport.rules.covLogic.NE}"/>
<spring:property name="rulesFileCovLogic_NJ" value="${policeReport.rules.covLogic.NJ}"/>
</spring:bean>
<jms:activemq-connector name="jmsConnectorStagingQReaderNormal"
brokerURL="${mule.activemq.broker.read.normal.url}"
specification="1.1"
maxRedelivery="-1"
persistentDelivery="true"
createMultipleTransactedReceivers="true"
numberOfConcurrentTransactedReceivers="10"
connectionFactory-ref="connectionFactoryPR"
disableTemporaryReplyToDestinations="false">
</jms:activemq-connector>
<!-- JMS CONNECTOR FOR WRITING FROM STAGING QUEUE -->
<jms:activemq-connector name="jmsConnectorStagingQWriter"
brokerURL="${mule.activemq.broker.write.url}"
specification="1.1"
maxRedelivery="-1"
persistentDelivery="true"
createMultipleTransactedReceivers="true"
numberOfConcurrentTransactedReceivers="10"
connectionFactory-ref="connectionFactoryPR"
disableTemporaryReplyToDestinations="false">
</jms:activemq-connector>
<jms:endpoint name="inboundProcessReaderNormalPR" queue="${policereport.queue.staging.name}" exchange-pattern="one-way" connector-ref="jmsConnectorStagingQReaderNormal" />
<flow name="pr-flow-stage-individual-message">
<http:inbound-endpoint host= "localhost"
port= "${policeReport.inbound.port}"
path= "claims/vci/policeReport/record"
exchange-pattern= "request-response"
>
</http:inbound-endpoint>
<object-to-string-transformer />
<logger message="PoliceRecord: SPLIT MESSAGE:#[payload]" level="ERROR" />
<transformer ref="XMLToRecord"></transformer>
<!-- COMPONENT TO PROCESS THE DATA -->
<!-- Component returns Invoice XML if all validations are successful, else it returns error XML containing error message, error code and original payload -->
<component>
<spring-object bean="receiverPR"></spring-object>
</component>
<transformer ref="RecordToXML"></transformer>
<object-to-string-transformer />
<!-- DECIDE SUCCESS/FAILURE AND PLACE ON QUEUE IF SUCCESS -->
<choice>
<when expression="//Record/Response/Status = 'ACCEPTED'" evaluator="xpath">
<jms:outbound-endpoint ref="inboundProcessReaderNormalPR" doc:name="JMS">
<jms:transaction action="NONE"/>
</jms:outbound-endpoint>
</when>
<otherwise>
<!-- Relevant endpoint for error handling -->
<logger message="Skipped staging message due to errors" level="ERROR" />
</otherwise>
</choice>
<logger message="PoliceReport: STAGED PAYLOAD:#[payload]" level="ERROR" />
<default-exception-strategy>
<logger message="PoliceReport: Exception recived in flow-stage-individual-message. Payload is :#[payload]" level="ERROR" />
</default-exception-strategy>
</flow>
<!-- ============================== PROCESSING =============================== -->
<!-- ====================================================================== -->
<!--=================== PROCESSING JOB FOR Police Report (NORMAL) ========================-->
<flow name="pr-flow-process-jms-input-normal" >
<jms:inbound-endpoint ref="inboundProcessReaderNormalPR"/>
<flow-ref name="pr-flow-process-jms-input-subbflow" />
</flow>
<!--=================== PROCESSING JOB FOR Police Report (DR) ========================-->
<flow name="pr-flow-process-jms-input-dr" >
<jms:inbound-endpoint ref="inboundProcessReaderDRPR" />
<flow-ref name="pr-flow-process-jms-input-subbflow" />
</flow>
<!--=================== PROCESSING JOB FOR PoliceReport (NE) ========================-->
<sub-flow name="pr-flow-process-jms-input-subbflow" >
<transformer ref="DomToXmlPR" doc:name="DOM to XML"/>
<transformer ref="XMLToRecord"></transformer>
<!-- COMPONENT TO PROCESS THE DATA. WILL PROCESS THE OBJECT AND RETURN ANOTHER OBJECT AS RESPONSE -->
<component>
<spring-object bean="processorPR"></spring-object>
</component>
<!-- Component above performs all business logic, including validations, creation/editing of claim, error handling etc.
Returns success/failure of claim import processing in the end -->
<transformer ref="RecordToXML"></transformer>
<object-to-string-transformer />
<!-- DECIDE SUCCESS OR FAILURE -->
<choice>
<when expression="//Record/Response/Status = 'ERROR' and contains(//Record/Response/Description, 'SysError')" evaluator="xpath">
<!-- DROP XFORMED BATCH FORMAT STRING ON THE JMS QUEUE
RETURNS RIGHT AWAY, NO WAITING -->
<logger message="PoliceReport: FAILED TO PROCESS MESSAGE :#[payload]" level="ERROR" />
<jms:outbound-endpoint queue="${policeReport.queue.errors.name}" exchange-pattern="one-way" connector-ref="policeReportJMSConnectorErrorQWriter" />
</when>
<otherwise>
<logger message="Messages processd without error :#[payload]" level="ERROR" />
</otherwise>
</choice>
<transformer ref="record2balancingXFormer" />
<object-to-string-transformer />
<splitter evaluator="xpath" expression="//Batch_Status" />
<!-- <logger message="OUTPUT OF SPLITTER IS: #[payload]" level="ERROR" /> -->
<custom-transformer class="net.prcins.esb.claims.vci.policereports.transformers.DocumentToString" />
<!-- Not sure why only splitter makes the payload workable for HTTP call. Without this, I keep getting error that the Rest Endpoint does not accept XML -->
<flow-ref name="pr-flow-balancing"/>
</sub-flow>
flow-process-jms-input-subbflow
to the client performing the initial HTTP call? – David Dossot