0
votes

I implemented a spring batch project that reads from a weblogic Jms queue (Custom Item Reader not message driven), then pass the Jms message data to an item writer (chunk = 1) where i call some APIs and write in DataBase.

However, i am trying to implement parallel Jms processing, reading in parallel Jms messages and passing them to the writer without waiting for the previous processes to complete.

I’ve used a DefaultMessageListenerContainer in a previous project and it offers a parallel consuming of jms messages, but in this project i have to use the spring batch framework.

  • I tried using the easiest solution (multi-threaded step) but it didn’t work , JmsException : "invalid blocking receive when another receive is in progress" which means probably that my reader is statefull.
  • I thought about using remote partitioning but then i have to read all messages and put the data into step execution contexts before calling the slave steps, which isn't really efficient if dealing with a large number of messages.
  • I looked a little bit into remote chunking, i understand that it passes data via queue channels, but i can't seem to find the utility in reading from a Jms and putting messages in a local queue for slave workers.

How can I approach this?

My code:

 @Bean
    Step step1() {
        return steps.get("step1").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
                .reader(reader()).processor(processor()).writer(writer())
                .listener(stepListener()).build();
    }

    @Bean
    Job job(@Qualifier("step1") Step step1) {
        return jobs.get("job").start(step1).build();
    }

Jms Code :

  @Override
    public void initQueueConnection() throws NamingException, JMSException {

        Hashtable<String, String> properties = new Hashtable<String, String>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_RECEIVE));
        InitialContext vInitialContext = new InitialContext(properties);

        QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
                .lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_RECEIVE));

        vQueueConnection = vQueueConnectionFactory.createQueueConnection();
        vQueueConnection.start();

        vQueueSession = vQueueConnection.createQueueSession(false, 0);
        Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_RECEIVE));
        consumer = vQueueSession.createConsumer(vQueue, "JMSCorrelationID IS NOT NULL");

    }

    @Override
    public Message receiveMessages() throws NamingException, JMSException {

        return consumer.receive(20000);

    }

Item reader :

@Override
    public Message read() throws Exception {

        return jmsServiceReceiver.receiveMessages();
    }

Thanks ! i'll appreciate the help :)

2

2 Answers

1
votes

There's a BatchMessageListenerContainer in the spring-batch-infrastructure-tests sub project.

https://github.com/spring-projects/spring-batch/blob/d8fc58338d3b059b67b5f777adc132d2564d7402/spring-batch-infrastructure-tests/src/main/java/org/springframework/batch/container/jms/BatchMessageListenerContainer.java

Message listener container adapted for intercepting the message reception with advice provided through configuration. To enable batching of messages in a single transaction, use the TransactionInterceptor and the RepeatOperationsInterceptor in the advice chain (with or without a transaction manager set in the base class). Instead of receiving a single message and processing it, the container will then use a RepeatOperations to receive multiple messages in the same thread. Use with a RepeatOperations and a transaction interceptor. If the transaction interceptor uses XA then use an XA connection factory, or else the TransactionAwareConnectionFactoryProxy to synchronize the JMS session with the ongoing transaction (opening up the possibility of duplicate messages after a failure). In the latter case you will not need to provide a transaction manager in the base class - it only gets on the way and prevents the JMS session from synchronizing with the database transaction.

Perhaps you could adapt it for your use case.

0
votes

I was able to do so with a multithreaded step :

  // Jobs et Steps
@Bean
Step stepDetectionIncoherencesLiq(@Autowired StepBuilderFactory steps) {

    int threadSize = Integer.parseInt(env.getProperty(PropertyConstant.THREAD_POOL_SIZE));

    return steps.get("stepDetectionIncoherencesLiq").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
            .reader(reader()).processor(processor()).writer(writer())
            .readerIsTransactionalQueue()
            .faultTolerant()
            .taskExecutor(taskExecutor())
            .throttleLimit(threadSize)
            .listener(stepListener())
            .build();
}

And a jmsItemReader with jmsTemplate instead of creating session and connections explicitly, it manages connections so i dont have the jms exception anymore:( JmsException : "invalid blocking receive when another receive is in progress" )

 @Bean
    public JmsItemReader<Message> reader() {
        JmsItemReader<Message> itemReader = new JmsItemReader<>();
        itemReader.setItemType(Message.class);
        itemReader.setJmsTemplate(jmsTemplate());
        return itemReader;
    }