0
votes

I have a staging queue with two JMS connector in Mule Application for reading and writing on the queue. If I stage/enqueue 40 messages on the queue only 20 messages are getting dequeued remaining messages sit as pending messages for processing and doesn't dequeue until Mule App is not restarted.

Below is the code for connectors and mule flow. Can anybody suggest what the problem may be here? And Why message processing stops at 20 messages.

Let me know if more information is required.

<!-- PROCESSING BEAN WHICH HAS ALL STREAM INTERACTION AND ERROR HANDLING -->
        <spring:bean name="processorPR" class="net.prcins.esb.claims.vci.policereports.processing.PoliceReportProcessor" scope="prototype">
            <spring:property name="policeReportBO" ref="policeReportBO" />
        </spring:bean>

        <!-- This is for validation of inital transformation -->    
        <spring:bean name="receiverPR" class="net.prcins.esb.claims.vci.policereports.staging.PoliceReportReceiver" scope="prototype" />            


        <!-- PROCESSING BEAN WHICH HAS ALL BUSINESS LOGIC -->
        <spring:bean name="policeReportBO" class="net.prcins.esb.claims.vci.policereports.processing.PoliceReportBO" scope="prototype">     
            <spring:property name="rulesFileCovLogic_NE" value="${policeReport.rules.covLogic.NE}"/>
            <spring:property name="rulesFileCovLogic_NJ" value="${policeReport.rules.covLogic.NJ}"/>                        
        </spring:bean>



<jms:activemq-connector name="jmsConnectorStagingQReaderNormal"
                            brokerURL="${mule.activemq.broker.read.normal.url}" 
                            specification="1.1"
                            maxRedelivery="-1"
                            persistentDelivery="true"   
                            createMultipleTransactedReceivers="true"
                            numberOfConcurrentTransactedReceivers="10"        
                            connectionFactory-ref="connectionFactoryPR"
                            disableTemporaryReplyToDestinations="false">
        </jms:activemq-connector>


        <!-- JMS CONNECTOR FOR WRITING FROM STAGING QUEUE -->
        <jms:activemq-connector name="jmsConnectorStagingQWriter"
                            brokerURL="${mule.activemq.broker.write.url}" 
                            specification="1.1"
                            maxRedelivery="-1"
                            persistentDelivery="true"
                           createMultipleTransactedReceivers="true"
                            numberOfConcurrentTransactedReceivers="10" 
                            connectionFactory-ref="connectionFactoryPR"
                            disableTemporaryReplyToDestinations="false">
        </jms:activemq-connector>

    <jms:endpoint name="inboundProcessReaderNormalPR" queue="${policereport.queue.staging.name}" exchange-pattern="one-way" connector-ref="jmsConnectorStagingQReaderNormal" />


<flow name="pr-flow-stage-individual-message">
        <http:inbound-endpoint  host=   "localhost" 
                                    port=   "${policeReport.inbound.port}" 
                                    path=   "claims/vci/policeReport/record"
                                    exchange-pattern=   "request-response"
                                    >

         </http:inbound-endpoint>
         <object-to-string-transformer />
        <logger message="PoliceRecord: SPLIT MESSAGE:#[payload]" level="ERROR" /> 
        <transformer ref="XMLToRecord"></transformer>

        <!--  COMPONENT TO PROCESS THE DATA -->
        <!-- Component returns Invoice XML if all validations are successful, else it returns error XML containing error message, error code and original payload -->
        <component>
            <spring-object bean="receiverPR"></spring-object>
        </component>
        <transformer ref="RecordToXML"></transformer>
        <object-to-string-transformer />
        <!-- DECIDE SUCCESS/FAILURE AND PLACE ON QUEUE IF SUCCESS --> 
         <choice>
               <when expression="//Record/Response/Status = 'ACCEPTED'" evaluator="xpath">
                <jms:outbound-endpoint  ref="inboundProcessReaderNormalPR" doc:name="JMS">
                            <jms:transaction action="NONE"/>
                </jms:outbound-endpoint>
            </when>
            <otherwise>
                <!-- Relevant endpoint for error handling -->
                <logger message="Skipped staging message due to errors" level="ERROR" /> 
            </otherwise>
         </choice>

        <logger message="PoliceReport: STAGED PAYLOAD:#[payload]" level="ERROR" />

      <default-exception-strategy>
            <logger message="PoliceReport: Exception recived in flow-stage-individual-message. Payload is :#[payload]" level="ERROR" />
      </default-exception-strategy>

    </flow>

    <!-- ============================== PROCESSING =============================== -->
     <!-- ====================================================================== -->
    <!--===================  PROCESSING JOB FOR Police Report (NORMAL)  ========================-->
        <flow name="pr-flow-process-jms-input-normal" > 
          <jms:inbound-endpoint    ref="inboundProcessReaderNormalPR"/> 
          <flow-ref name="pr-flow-process-jms-input-subbflow" />
        </flow>

        <!--===================  PROCESSING JOB FOR Police Report (DR)  ========================-->

        <flow name="pr-flow-process-jms-input-dr" > 
          <jms:inbound-endpoint ref="inboundProcessReaderDRPR" /> 
        <flow-ref name="pr-flow-process-jms-input-subbflow" />
        </flow>

        <!--===================  PROCESSING JOB FOR PoliceReport (NE)  ========================-->
       <sub-flow name="pr-flow-process-jms-input-subbflow" > 

        <transformer  ref="DomToXmlPR" doc:name="DOM to XML"/>    
        <transformer ref="XMLToRecord"></transformer>

          <!--  COMPONENT TO PROCESS THE DATA. WILL PROCESS THE OBJECT AND RETURN ANOTHER OBJECT AS RESPONSE -->
        <component> 
            <spring-object bean="processorPR"></spring-object>
        </component>

        <!--  Component above performs all business logic, including validations, creation/editing of claim, error handling etc. 
        Returns success/failure of claim import processing in the end -->

        <transformer ref="RecordToXML"></transformer>
        <object-to-string-transformer />

        <!-- DECIDE SUCCESS OR FAILURE  -->
        <choice>
              <when expression="//Record/Response/Status = 'ERROR' and contains(//Record/Response/Description, 'SysError')" evaluator="xpath">
             <!-- DROP XFORMED BATCH FORMAT STRING ON THE JMS QUEUE
                RETURNS RIGHT AWAY, NO WAITING -->
                <logger message="PoliceReport: FAILED TO PROCESS MESSAGE :#[payload]" level="ERROR" />
                <jms:outbound-endpoint queue="${policeReport.queue.errors.name}" exchange-pattern="one-way" connector-ref="policeReportJMSConnectorErrorQWriter" />
            </when>
             <otherwise>
                     <logger message="Messages processd without error :#[payload]" level="ERROR" />
             </otherwise>
        </choice>
        <transformer ref="record2balancingXFormer" />
        <object-to-string-transformer />
        <splitter evaluator="xpath" expression="//Batch_Status"  />
        <!-- <logger message="OUTPUT OF SPLITTER IS: #[payload]" level="ERROR" /> --> 
        <custom-transformer class="net.prcins.esb.claims.vci.policereports.transformers.DocumentToString" />
        <!--  Not sure why only splitter makes the payload workable for HTTP call. Without this, I keep getting error that the Rest Endpoint does not accept XML -->
        <flow-ref name="pr-flow-balancing"/>

      </sub-flow>
1
Can you share the Mule config as well? Especially the flow that processes the JMS messages.David Dossot
@David, I have updated the questions above with mule config along with reading and writing flow to the queue.user2026435
@David, Thanks for looking into this. We are using pooled component for multithreading processing. We will have number client request that needs to be handled simultaneously. Will component will provide this multithreaded behavior? Also, I tried both of these options but it doesn't seem to help. I don't know why processing stops at 20 and rest messages remain pending in the queue. Not sure where this magical number (20) is coming from :) . Do you have any other suggestions here that we can try?user2026435
If your component is thread safe, there is no reason for pooling them. A single instance can process all the requests concurrently. I've made a 3rd suggestion. Can you update your question with the current state of your config? Also is it your intention to return the result of flow-process-jms-input-subbflow to the client performing the initial HTTP call?David Dossot
@David, Components are not thread safe. So I have them scoped as prototype as they are stateful beans. I have added all three suggestions and updated the code. Initially I staged 40 messages and all 40 were dequeued from queue. Then, I staged 40 more and then only 20 were dequeued. Not sure, Why it is behaving like this? Let me know if you have any other suggestion. Thanks for your response.user2026435

1 Answers

0
votes

Not sure what's going on but there are things that can introduce unnecessary threading/contention in your config that worry me a little:

  • The pooled-component can act as a bottleneck if you have less available instances than concurrent threads. Are you sure you really need pooled-component? What is the justification for using them instead of standard components?
  • flow-process-jms-input-subbflow is a private flow, not a sub-flow, and uses the default processing strategy. This means that the flow-ref call in flow-process-jms-input-normal will actually transfer execution to another thread. Unless it's really what you want to happen, I suggest you turn flow-process-jms-input-subbflow into a sub-flow, that way it'll be executed on the same thread that received the JMS message in <jms:inbound-endpoint ref="inboundProcessReaderNormal" />.
  • Reference the global endpoint inboundProcessReaderNormal in jms:outbound-endpoint instead of redefining it.

It's quite possible these changes won't solve the issue but at least it will simplify the execution model and will help reasoning better about what's going on.