I am writing a Java component for an enterprise level product and want to leverage a particular feature of Oracle 11g databases, Active Queues. The exact scenario i nwant to accomplish is - 1. write a message to the oracle active queue/queue table on commit 2. read that message from the queue with a JMS consumer
I followed the demo and tutorial at http://docs.oracle.com/cd/B28359_01/java.111/b31224/streamsaq.htm
and in particular, I'd like to focus on the enqueue part of the code -
// Create the actual AQMessage instance:
AQMessage mesg = AQFactory.createAQMessage(msgprop);
// and add a payload:
byte[] rawPayload = new byte[500];
for (int i = 0; i < rawPayload.length; i++) {
rawPayload[i] = 'b';
}
mesg.setPayload(new RAW(rawPayload));
AQEnqueueOptions opt = new AQEnqueueOptions();
opt.setRetrieveMessageId(true);
opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);
// execute the actual enqueue operation:
conn.enqueue(queueName, opt, mesg);
This works just fine for me, because we want to make sure the message is only visible to the consumers when the transaction is committed.
The problem - In the demo we create queues of payload-type RAW
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE_TABLE( "+
" QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+
" QUEUE_PAYLOAD_TYPE => 'RAW', "+
" COMPATIBLE => '10.0'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE( "+
" QUEUE_NAME => '"+USERNAME+".RAW_SINGLE_QUEUE', "+
" QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
"END; ");
by using queues created in RAW I am able to enqueue messages to the queue, however JMS consumers fail to subscribe to the queue throwing an (null pointer) exception where the consumer expects a parameter for the expected type. In short this code throws a null pointer exception on init.
Properties env = new Properties();
env.load(new FileInputStream(new File("jndi.properties")));
Context ctx = new InitialContext(env);
ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(connectionFactoryName);
Connection connection = connFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue);
JNDI.properties
java.naming.factory.initial = oracle.jms.AQjmsInitialContextFactory
java.naming.security.principal = username
java.naming.security.credentials = password
db_url = jdbc:oracle:thin:@host:port:dbname
I get a similar exception when trying to setup consumers in Camel.
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<!-- this camel route will read incoming messages from Oracle -->
<route>
<from uri="oracleQueue:queue:RAW_SINGLE_QUEUE" />
<to uri="WebSphereMQ:queue:myWebSphereQueue" />
</route>
</camelContext>
<bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
<constructor-arg index="0">
<value>oracle db URL</value>
</constructor-arg>
<constructor-arg index="1" type="java.util.Properties">
<value></value>
</constructor-arg>
</bean>
<bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory">
<ref bean="connectionFactoryOracleAQQueue" />
</property>
<property name="username">
<value>username</value>
</property>
<property name="password">
<value>password</value>
</property>
</bean>
<bean id="oracleQueue" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="oracleQueueCredentials" />
</bean>
With some research I figured the queue payload type might be the issue. Therefore, I changed the queue table create script and used JMS message as the payload type
doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
+ " QUEUE_TABLE => 'RAW_SINGLE_QUEUE_TABLE', "
+ " QUEUE_PAYLOAD_TYPE => 'SYS.AQ$_JMS_MESSAGE', " +
" COMPATIBLE => '10.0'); " + "END; ");
In this case the JMS Consumers are able to connect, but the enqueue code now fails - ORA-25215: user_data type and queue type do not match
The question is how can I enqueue messages, visible only on commit, from a Java producer and be able to consume with camel or generic JMS consumer?
constraints (to filter out some of the answers already on the net) - Cannot use PL/SQL, spring transactions, JTA. I've seen examples like How to enqueue a JMS message into Oracle AQ using Java where the queue table is created with the SYS.AQ$_JMS_MESSAGE type but the example producer is a JMS MessageProducer rather than the one in the oracle guide. I am not trying to enqueue JMS messages (AQJmsMessage), rather use the AQMessage type as explained in the Oracle guide, and to use the visible on commit option.
My feeling is that if the issue is based on a mismatch of payload types only, then there must be some configuration on the consumer side to specify the payload type, or on the producer side to be able to write messages in a way JMS consumers will understand. Is there a way to accomplish this?