0
votes

I have JMS in my mule flow where producer reads records from cache, put in queue and consumer consumes messages and do further processing. Following is the flow for understanding.

Service 1 (Read data from file) -> Service 2 (put each line in cache) -> JMS Service 3 (Producer Read data from cache line by line and put in queue) and Consumer read from queue -> Service 4

In above flow, from JMS component, flow becomes asynch hence as soon as producer puts all records in queue response goes back to client saying process completed but it is possible that consumer still going to consume messages.

I want to hold process from producer to send back response until consumer consumes all the messages.

Any idea on this how to achieve?

3

3 Answers

0
votes

Since the async takes the copy of the exact thread and process independently, it may be possible that the producer putting the message in the queue as fast as before the consumer actually able to consume it.
One way I can think to hold the process of putting the message into the queue is by putting a sleep() before it.
You can use a Groovy component and use sleep() in it to hold the flow or slow down the process.
for example, if you put the following:

<scripting:component doc:name="Groovy">
  <scripting:script engine="Groovy"><![CDATA[
    sleep(10000);
    return message.payload;]]>
  </scripting:script>
</scripting:component>

before the putting the message into the queue, the process will slow down a bit and will hold the flow for 10000 ms till on the other side the consumer actually consume it.

0
votes

Polling for completion status as described above may work OK but there's still a risk of some transactions not being completed after wait time, or waiting long after all messages have been processed.

Depending on the end goal of this exercise, you could perhaps leverage Mule batch, which already implements the splitting of the inbound request into individual messages, processing the messages in one or multiple consumer threads, keeping track of the chunks processed and remaining, and reporting the results / executing final steps once all data is processed.

If you can't use batch and need to reassemble the processed messages into a single list or map, you may be able to get the Collection Aggregator do the job of tracking the messages by correlation ID and setting the timeouts.

The crude DIY way to implement it is to build some sort of dispatcher logic for the JMS publishing component. It will submit all messages to JMS then wait for each consumer / worker thread to respond back (via a separate JMS queue) with completion message with the same correlation ID. The dispatcher will then track all submitted / processed messages in the in-memory or persistent storage and respond back once the last message in the batch has been acknowledged, or by pre-defined timeout. Which is very close to what Mule batch already does.

Cheers! Dima

0
votes

You can use exchange pattern value as request-response so that flow will wait for response from JMS.