3
votes

I'm writing an emailer webservice for my company. One of the main requirements is guaranteed delivery, so we have a thin HTTP layer over the JMS transport, using a persistent QPid queue.

One of the issues I'm running into is the handling of errors during processing. If I simply roll back the transaction when there's an error, the message goes to the head of the queue. If the error is pervasive enough, this can lock up the entire queue until someone intervenes manually, and we would like to avoid this by posting back to the head so that messages can be processed in the meantime.

However, therein lie my problems. First, while AMQP has a mechanism to "reject and requeue" a message atomically instead of acknowledging it, JMS doesn't seem to have any analog for this feature, so the only way to access it is by casting, which ties me to a specific JMS implementation. Further, the JMS transport for CXF doesn't seem to have any means of overriding or injecting behavior at the transport level, which means I'm stuck either writing a bytecode agent or changing the code and recompiling just to get the behavior I want.

To work around the issue, I've toyed with the idea of implementing a fault handler in CXF that simply reconstructs the JMS message from the CXF message, and re-queues it. But then I can't use the transacted session, because the fault causes a rollback which I can't override, and then I'll wind up with a copy of the message on the head (from the rollback) and on the tail (from the re-queue). And I can't use CLIENT_ACKNOWLEDGE, because the JMS transport acknowledges the message before submitting it for processing, which means if the server goes down at the wrong time, I could lose a message.

So basically, as long as I'm stuck accepting the default behavior of the JMS transport, it seems impossible to get the behavior I want (requeueing of failed messages) without compromising data integrity.

A coworker has suggested eschewing the JMS transport entirely, and invoking the queue directly. The service implementation would then be a skeleton class that exists solely to put messages on the queue, and another process would implement a message listener. To me, this solution is sub-optimal because I lose the elegance of an agnostic web service, and I lose some scalability by coupling my implementation to the underlying technology.

I've also considered just writing a CXF transport for AMQP using the RabbitMQ client library. It would take longer but it would be something our company could continue using going forward, and perhaps something that could be contributed back to the CXF project. That said, I'm not wild about this idea either because of the amount of time involved writing, running and testing the code.

Here's my beans.xml for CXF:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxrs="http://cxf.apache.org/jaxrs"
    xmlns:jms="http://cxf.apache.org/transports/jms"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans     http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/util      http://www.springframework.org/schema/util/spring-util.xsd
    http://www.springframework.org/schema/context   http://www.springframework.org/schema/context/spring-context.xsd
    http://cxf.apache.org/jaxrs                     http://cxf.apache.org/schemas/jaxrs.xsd
    http://cxf.apache.org/transports/jms            http://cxf.apache.org/schemas/configuration/jms.xsd">

    <import resource="classpath:META-INF/cxf/cxf.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
    <import resource="classpath:META-INF/cxf/cxf-servlet.xml" />

    <context:component-scan base-package="com.edo" />

    <bean id="jmsConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
        <constructor-arg name="broker" value="tcp://localhost:5672"/>
        <constructor-arg name="username" value="guest"/>
        <constructor-arg name="password" value="guest"/>
        <constructor-arg name="clientName" value=""/>
        <constructor-arg name="virtualHost" value=""/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:explicitQosEnabled="true" p:deliveryMode="1" p:timeToLive="5000" p:connectionFactory-ref="jmsConnectionFactory" p:sessionTransacted="false" p:sessionAcknowledgeModeName="CLIENT_ACKNOWLEDGE" />
    <bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration" p:connectionFactory-ref="jmsConnectionFactory" p:wrapInSingleConnectionFactory="false" p:jmsTemplate-ref="jmsTemplate" p:timeToLive="500000" p:sessionTransacted="false" p:concurrentConsumers="1" p:maxSuspendedContinuations="0" p:maxConcurrentConsumers="1" />

    <jms:destination id="jms-destination-bean" name="{http://test.jms.jaxrs.edo.com/}HelloWorldImpl.jms-destination">
        <jms:address jndiConnectionFactoryName="ConnectionFactory" jmsDestinationName="TestQueue">
            <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
            <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:5672"/>
        </jms:address>
        <jms:jmsConfig-ref>jmsConfig</jms:jmsConfig-ref>
    </jms:destination>

    <jaxrs:server id="helloWorld" address="/HelloWorld" transportId="http://cxf.apache.org/transports/jms">
      <jaxrs:serviceBeans>
        <ref bean="helloWorldBean"/>
      </jaxrs:serviceBeans>
      <jaxrs:inInterceptors>
        <bean class="com.edo.jaxrs.jms.test.FlowControlInInterceptor" p:periodMs="1000" p:permitsPerPeriod="18" />
      </jaxrs:inInterceptors>
      <jaxrs:providers>
        <bean class="org.apache.cxf.jaxrs.provider.JSONProvider">
          <property name="produceMediaTypes" ref="jsonTypes"/>
          <property name="consumeMediaTypes" ref="jsonTypes"/>
        </bean>
      </jaxrs:providers>
    </jaxrs:server>

    <bean id="http-jms-config" class="com.edo.jaxrs.jms.test.HttpOverJmsConfig" 
        p:jmsFactory-ref="jmsConnectionFactory" 
        p:jmsDestinationName="TestQueue" />

    <util:list id="jsonTypes">
      <value>application/json</value>
      <value>application/jettison</value>
    </util:list>

</beans>

Is there something simple I'm missing? Or is there a better way to go about this that would sidestep the problem?

1

1 Answers

0
votes

So - I'm taking my coworker's advice and not using the JMS transport for the web service. Instead, we're going to create a thin web service layer over Spring Integration. This should allow us the granularity of control we need without unnecessarily exposing the messaging layer.