2
votes

I am trying to use XD JMS source to read data from activeMq queue and just log it. My requirement is to read the queue at specific interval only, trying to implement throttling. I need my stream to process only 1 message per second, the queue may have messages at any rate i.e 20 message/sec etc.

The out of box JMS source implements message-driven-channel-adapter which reads data immediately from queue. So I created a custom module (polledJms) with below :

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <import resource="../../../common/jms-${provider}-infrastructure-context.xml"/>
    <int:channel id="output"/>
    <int-jms:inbound-channel-adapter id="jmsPolledSource"
            channel="output"
            destination-name="${destination}"
            connection-factory="connectionFactory">
    <int:poller fixed-rate="5000"/>
    </int-jms:inbound-channel-adapter>

Now when I post something on the queue its not picked up immmediately but after a delay. However that delay isnt consistent. I expect the delay to be always 5sec but sometimes its 10sec, 1minute etc. Not sure what I am doing wrong here.

My stream definition is as below :

"polledJms --destination=readQ | log"

I even tried cron expression in the xml instead of fixed-rate to read every 10sec in the poller and still see same behavior.

Is my custom module correct way to implement throttling on JMS queues or XD provides an out of box capability to do so which I overlooked. Please help.

1

1 Answers

1
votes

The <poller> has max-messages-per-poll option, which is Integer.MIN_VALUE by default meaning "read messages until they are in the MessageSource".

From other side fixed-rate means "start a new polling task after that time just after the start of the prevoius".

To make it "delayable" you should consider to use fixed-delay. In this case the new polling task will be started over that period only after the finish of the previous task.

Otherwise your custom module looks good.

From other side you can even configure an existing jms source with <delayer> in front of outputChannel.

UPDATE

fixed-delay means: start new polling task over the period after the finish of the previous one. Since your poller reads a message and sends it to the channel within the same Thread you should add for the polling task duration the message handling time. Because the polling thread is busy. So, yes: your message may not be polled each second.

From other side (using fixed-rate) you should keep in mind that Spring Integration uses TaskScheduler with 10 pool size by default. You can change it using META-INF/spring.integration.properties with the spring.integraton.taskScheduler.poolSize property. With your case of 100 messages in the queue and 10 concurrent threads for fixed-rate to poll only one message you can reach the thread pool starvation and end up with unexpected polling time results.