I use spring integration components to wire my flow
- Message driven channel adapter picks messages from a weblogic queue.
Uses a message converter and drops the message to a channel.
This message from the channel is picked by a service activator.
The pojo on service activator, does some data processing from db and then generates a report.
For some reasons, I wanted to stop this consumer processing this request for reasons like (say pojo is taking too long to process)
What are my options?
- I tried these:
- Exposed the spring's message drive channel adapter through JMX.
- Used the stop method on message driven channel adapter.
- This stops message driven channel adapter from picking new messages which is good.
- But does not stop the one consumer already picked the message and currently in process.
- How can i stop this process which is taking too long in my pojo?
Appreciate any help.
Update - 2
This is my code.
<bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="destination" ref="requestQueue"/>
<property name="concurrentConsumers" value="5"/>
<property name="maxConcurrentConsumers" value="10"/>
<property name="connectionFactory" ref="connectionFactory"/>
<property name="taskExecutor" ref="threadPoolTaskExecutor"/>
<property name="sessionTransacted" value="true"/>
</bean>
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="50" />
<property name="queueCapacity" value="10" />
<property name="waitForTasksToCompleteOnShutdown" value="false"/>
</bean>
<!-- Channel where service activator drops message to -->
<si:channel id="jmsOutChannel" />
<!-- This bean code calls DB to get some data and build a report/-->
<bean id="simpleExecutor" class="com.poc.reports.executors.SimpleCode"/>
<!-- Initialize service activator -->
<si:service-activator id="activator" input-channel="jmsInChannel"
ref="simpleExecutor" method="execute" output-channel="jmsOutChannel">
</si:service-activator>
<!-- outbound adaptor for response Queue -->
<jms:outbound-channel-adapter id="jmsout"
channel="jmsOutChannel" destination="responseQueue" />
I see that transaction is being maintained as you mentioned.
But i see 5 to 10 threadPoolTaskExecutors being created since i have concurrent consumers = 5-10 (on adapter)
- Why should it create multiple threadPoolTaskExecutors? Should it not just use one threadpooltaskexecutor ?
- If i make concurrent consumer to 1, the it waits to pick up each message since its session transacted.
Appreciate your responses. Thank you.
Update - 3 - Maintaining a transaction within executor service when used with adapter works perfectly.
- Regarding killing a specific thread, I added a method named interruptNamedThread(String threadName) to executor service and exposed it via JMX.
- Triggered it and it sets the interrupt flag.
I could check the interrupt status and terminate the thread by returning.(only since my test code is running in a loop and has the opportunity to check for the interrupt status).
However it did not help my scenario.
- Since my thread does not run in a loop. It talks to DB and generates a report. End of thread. Returns to pool
- I wanted to end the thread when it takes too long to return from a DB call or third party library call (report generator).
- In this case it would not have a chance to check for the interrupt flag since its stuck in one of the call (DB or report generator).
In my use case, is restarting the component the only way to kill a thread?
Appreciate any suggestions.