I implemented a spring batch project that reads from a weblogic Jms queue (Custom Item Reader not message driven), then pass the Jms message data to an item writer (chunk = 1) where i call some APIs and write in DataBase.
However, i am trying to implement parallel Jms processing, reading in parallel Jms messages and passing them to the writer without waiting for the previous processes to complete.
I’ve used a DefaultMessageListenerContainer in a previous project and it offers a parallel consuming of jms messages, but in this project i have to use the spring batch framework.
- I tried using the easiest solution (multi-threaded step) but it didn’t work , JmsException : "invalid blocking receive when another receive is in progress" which means probably that my reader is statefull.
- I thought about using remote partitioning but then i have to read all messages and put the data into step execution contexts before calling the slave steps, which isn't really efficient if dealing with a large number of messages.
- I looked a little bit into remote chunking, i understand that it passes data via queue channels, but i can't seem to find the utility in reading from a Jms and putting messages in a local queue for slave workers.
How can I approach this?
My code:
@Bean
Step step1() {
return steps.get("step1").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader()).processor(processor()).writer(writer())
.listener(stepListener()).build();
}
@Bean
Job job(@Qualifier("step1") Step step1) {
return jobs.get("job").start(step1).build();
}
Jms Code :
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_RECEIVE));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_RECEIVE));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_RECEIVE));
consumer = vQueueSession.createConsumer(vQueue, "JMSCorrelationID IS NOT NULL");
}
@Override
public Message receiveMessages() throws NamingException, JMSException {
return consumer.receive(20000);
}
Item reader :
@Override
public Message read() throws Exception {
return jmsServiceReceiver.receiveMessages();
}
Thanks ! i'll appreciate the help :)