Purpose
I created MyListener.java
to monitor my Oracle Queue MY_QUEUE and MyConsumer.java
implement my own MessageListener.onMessage
functionality.
As soon as I enqueue some entries into MY_QUEUE I want the MessageListener to output "New Message..." onto the console.
Problem
The Queue entries will only be processed on the initial application start. If additional entries get enqueued while the application is already runnning the MessageListener.onMessage function won't get triggered.
Example
Run Application with 5 entries already in queue. Output:
Initialized...
Sleeping...
New Message...
New Message...
New Message...
New Message...
New Message...
Sleeping...
Sleeping...Quit Application and run the Application. Enqueue entries on runtime. Output:
Initialized...
Sleeping...
Sleeping...
Sleeping...
Sleeping... (Queue entries got inserted about at this time)
Sleeping...
Sleeping...
Sleeping...
Sleeping...Quit Application and run the Application again (Entries from 2. are still in queue). Output:
Initialized...
Sleeping...
New Message...
New Message...
New Message...
New Message...
New Message...
Sleeping...
Sleeping...
MyListener
package example;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class MyListener {
private static final String QUEUE_NAME = "MY_QUEUE";
private static final String QUEUE_USER = "myuser";
private static final String QUEUE_PW = "mypassword";
private QueueConnection queueConnection;
private QueueSession queueSession;
public MyListener() throws JMSException {
QueueConnectionFactory QFac = AQjmsFactory.getQueueConnectionFactory("xxx.xxx.xxx.xxx", "orcl", 1521, "thin");
this.queueConnection = QFac.createQueueConnection(QUEUE_USER, QUEUE_PW);
this.queueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
}
public static void main(String[] args) {
try {
MyListener myListener = new MyListener();
Queue queue = ((AQjmsSession) myListener.queueSession).getQueue(QUEUE_USER, QUEUE_NAME);
MessageConsumer mq = ((AQjmsSession) myListener.queueSession).createReceiver(queue);
MyConsumer mc = new MyConsumer();
mq.setMessageListener(mc);
myListener.queueConnection.start();
System.out.println("Initialized...");
while (true) {
try {
System.out.println("Sleeping...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
System.out.println("Application closed");
}
}
}
MyConsumer
package example;
import javax.jms.Message;
import javax.jms.MessageListener;
public class MyConsumer implements MessageListener{
@Override
public void onMessage(Message arg0) {
System.out.println("New Message...");
}
}
PL/SQL script to enqueue entries
DECLARE
msg SYS.aq$_jms_text_message;
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (16);
i NUMBER;
BEGIN
msg := sys.aq$_jms_text_message.construct;
msg.set_text ('Testmessage');
enqueue_options.visibility := DBMS_AQ.immediate;
message_properties.priority := 1;
i := 0;
WHILE i < 5
LOOP
DBMS_AQ.enqueue (queue_name => 'MY_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle);
i := i + 1;
END LOOP;
COMMIT;
END;
Additional Information
Database: Oracle 11g2
Java Runtime: 1.6
Maven Dependencies:
- oracle-jdbc (11.2.0.4.0)
- xdb (1.0)
- aqapi (1.3)
- jmscommon (1.3.1_02)
Can someone tell me why the onMessage function won't get triggered once I enqueue new entries on runtime?
EDIT: OK, I stopped using JMS and now use the old AQ dequeue approach to get my messages asynchronously. I might come back and try to figure out as to why it's not working with the code above but that's low priority right now.