0
votes

My requirements are stated below: I have to develop a wrapper service on top a queue,so i was just going through some message Queue like (ActiveMQ,Apollo,Kafka). But decided to proceed with ActiveMQ to match our usecases.Now the requirement are as follows:

1) A restful api through which different publisher will publish to queue,based on clientId queue will be selected.

2) Consumer will consume message through restful api and will consume message in batches. say consumer as for something like give me 10 message from queue. Now the service should provide 10 message if there is 10 message or if message number is less or zero it will send accordingly. After receiving the message the client will process with the message and send back acknowledgement through different res-full uri. upon receiving that acknowledgement,the MQService should commit or rollback message from the queue. In order to this in the MQService layer, i have used a cached,where im keeping the JMS connection and session object till acknowledgemnt is received or ttl expire.

In-order to retrieve message in batches and send back to client, i have created a multi-threaded consumer,so that for 5 batch message request,the service layer will create 5 thread each having different connection and session object( as stated in ActiveMQ multiple consumer http://activemq.apache.org/multiple-consumers-on-a-queue.html)

Basic use-case:

MQ(BROKER)[A] --> Wrapper(MQService)[B]-->Client [C]

Note:[B] is a restfull service having JMS consumer implemented in it.It keeps the connection and session object in cache.

[C] request to [B] to give 3 message [B] must fetch 3 message if available in queue,wrap it in batchmsgFormat and send it to [C] [C] process the message and send acknowledgemnt suces/failed to [B] through /send-ack uri. Upon receiving Ack from [C], [B] will commit the Jms session and close the session and connection object. Also it will evict those from the cache.

The above work-flow is working fine with single message fetching.
But the queue hungs up on JMS MesageConsumer.receive() when try to fetch message with mutilple consumer using multithreading. ...

Here the JMS Consumer code in MQService layer: ----------------------------------------------

    public BatchMessageFormat getConsumeMsg(final String clientId, final Integer batchSize) throws Exception {

    BatchMessageFormat batchmsgFormat = new BatchMessageFormat();
            List<MessageFormat> msgdetails = new ArrayList<MessageFormat>();
            List<Future<MessageFormat>> futuremsgdetails = new ArrayList<Future<MessageFormat>>();

            if (batchSize != null) {
                Integer msgCount = getMsgCount(clientId, batchSize);
                for (int batchconnect = 0; batchconnect <msgCount; batchconnect++) {
                    FutureTask<MessageFormat> task = new FutureTask<MessageFormat>(new Callable<MessageFormat>() {

                        @Override
                        public MessageFormat call() throws Exception {
                            MessageFormat msg=consumeBatchMsg(clientId,batchSize);
                            return msg;
                        }
                    });

                    futuremsgdetails.add(task);
                    Thread t = new Thread(task);
                    t.start();
                }   

                for(Future<MessageFormat> msg:futuremsgdetails){
                    msgdetails.add(msg.get());
                }
                batchmsgFormat.setMsgDetails(msgdetails);
return batchmsgFormat
}

Message fetching:

private  MessageFormat consumeBatchMsg(String clientId, Integer batchSize) throws JMSException, IOException{

MessageFormat msgFormat= new MessageFormat();
        Connection qC = ConnectionUtil.getConnection();
        qC.start();
        Session session = qC.createSession(true, -1);
        Destination destination = createQueue(clientId, session);
        MessageConsumer consumer = session.createConsumer(destination);

        Message message = consumer.receive(2000);
        if (message!=null || message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            msgFormat.setMessageID(textMessage.getJMSMessageID());
            msgFormat.setMessage(textMessage.getText());

            CacheObject cacheValue = new CacheObject();
            cacheValue.setConnection(qC);
            cacheValue.setSession(session);
            cacheValue.setJmsQueue(destination);
            MQCache.instance().add(textMessage.getJMSMessageID(),cacheValue);

        }
        consumer.close();                       

    return msgFormat;
    }

Acknowledgement and session closing:

public String getACK(String clientId,String msgId,String ack)throws JMSException{

        if (MQCache.instance().get(msgId) != null) {
            Connection connection = MQCache.instance().get(msgId).getConnection();
            Session session = MQCache.instance().get(msgId).getSession();
            Destination destination = MQCache.instance().get(msgId).getJmsQueue();
            MessageConsumer consumer = session.createConsumer(destination);
            if (ack.equalsIgnoreCase("SUCCESS")) {
                session.commit();
            } else {
                session.rollback();
            }

            session.close();
            connection.close();
            MQCache.instance().evictCache(msgId);
            return "Accepted";
        } else {
            return "Rejected";
        }

    }

Does anyone worked on similar scenario or can you pls throw some light? Is there any other way to implement this batch mesage fetching as well as client failure handling?

2

2 Answers

0
votes

I'll give a few pointers to help to code this logic better.

I'm assuming you are using pure JMS 1.1 as much as possible. Ensure that you have one place where you get the connection from the pool or create a connection. You need not do that inside a thread. You can do that outside. Sessions must be created inside a thread and shouldn't be shared. This will impact the logic in the function consumeBatchMsg().

Secondly, its simpler to use one thread to consume all the messages of the given batchSize. I see that you are using transacted session. So you can do one commit after getting all the messages of the batchSize.

If you really want to take the complicated route of having multiple consumers on a queue (probably little better performance), you can using CountDownLatch or CyclicBarrier of Java and set it to batchSize to trigger. Once all the threads have received the messages, it can commit and close the sessions in the respective threads. Never let the session instance go out of the context of the thread that created it.

0
votes

Try after setting the prefetch limit to 0 as below:

ConnectionFactory connectionFactory
        = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0");