0
votes

Purpose

I created MyListener.java to monitor my Oracle Queue MY_QUEUE and MyConsumer.java implement my own MessageListener.onMessage functionality.
As soon as I enqueue some entries into MY_QUEUE I want the MessageListener to output "New Message..." onto the console.

Problem

The Queue entries will only be processed on the initial application start. If additional entries get enqueued while the application is already runnning the MessageListener.onMessage function won't get triggered.

Example

  1. Run Application with 5 entries already in queue. Output:

    Initialized...
    Sleeping...
    New Message...
    New Message...
    New Message...
    New Message...
    New Message...
    Sleeping...
    Sleeping...

  2. Quit Application and run the Application. Enqueue entries on runtime. Output:

    Initialized...
    Sleeping...
    Sleeping...
    Sleeping...
    Sleeping... (Queue entries got inserted about at this time)
    Sleeping...
    Sleeping...
    Sleeping...
    Sleeping...

  3. Quit Application and run the Application again (Entries from 2. are still in queue). Output:

    Initialized...
    Sleeping...
    New Message...
    New Message...
    New Message...
    New Message...
    New Message...
    Sleeping...
    Sleeping...

MyListener

package example;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class MyListener {

    private static final String QUEUE_NAME = "MY_QUEUE";
    private static final String QUEUE_USER = "myuser";
    private static final String QUEUE_PW = "mypassword";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

    public MyListener() throws JMSException {
        QueueConnectionFactory QFac = AQjmsFactory.getQueueConnectionFactory("xxx.xxx.xxx.xxx", "orcl", 1521, "thin");
        this.queueConnection = QFac.createQueueConnection(QUEUE_USER, QUEUE_PW);
        this.queueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public static void main(String[] args) {

        try {
            MyListener myListener = new MyListener();
            Queue queue = ((AQjmsSession) myListener.queueSession).getQueue(QUEUE_USER, QUEUE_NAME);

            MessageConsumer mq = ((AQjmsSession) myListener.queueSession).createReceiver(queue);
            MyConsumer mc = new MyConsumer();
            mq.setMessageListener(mc);

            myListener.queueConnection.start();

            System.out.println("Initialized...");

            while (true) {
                try {
                    System.out.println("Sleeping...");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            System.out.println("Application closed");
        }

    }

}

MyConsumer

package example;

import javax.jms.Message;
import javax.jms.MessageListener;

public class MyConsumer implements MessageListener{

    @Override
    public void onMessage(Message arg0) {
        System.out.println("New Message...");

    }

}

PL/SQL script to enqueue entries

DECLARE
   msg                  SYS.aq$_jms_text_message;
   enqueue_options      DBMS_AQ.ENQUEUE_OPTIONS_T;
   message_properties   DBMS_AQ.MESSAGE_PROPERTIES_T;
   message_handle       RAW (16);
   i                    NUMBER;
BEGIN
   msg := sys.aq$_jms_text_message.construct;
   msg.set_text ('Testmessage');
   enqueue_options.visibility := DBMS_AQ.immediate;
   message_properties.priority := 1;
   i := 0;

   WHILE i < 5
   LOOP
      DBMS_AQ.enqueue (queue_name           => 'MY_QUEUE',
                       enqueue_options      => enqueue_options,
                       message_properties   => message_properties,
                       payload              => msg,
                       msgid                => message_handle);
      i := i + 1;
   END LOOP;

   COMMIT;
END;

Additional Information

Database: Oracle 11g2
Java Runtime: 1.6
Maven Dependencies:

  • oracle-jdbc (11.2.0.4.0)
  • xdb (1.0)
  • aqapi (1.3)
  • jmscommon (1.3.1_02)

Can someone tell me why the onMessage function won't get triggered once I enqueue new entries on runtime?

EDIT: OK, I stopped using JMS and now use the old AQ dequeue approach to get my messages asynchronously. I might come back and try to figure out as to why it's not working with the code above but that's low priority right now.

1

1 Answers

0
votes

Essentially as soon as you create the AQjmsQueueReceiver and set its message listener the receive() method will exit and the AQjmsQueueReceiver will fall out of scope. I assume it's being invoked from the main method which also means the program will exit. You need to:

  1. Modify your application so that your JMS objects don't fall out of scope (because they will get garbage-collected).
  2. Prevent your program from exiting while its waiting for messages.