0
votes

I am using Apache camel for routing between ActiveMQ and camel HTTP endpoints. The route is defined in such a way that take data from camel-cxf webservice to the ActiveMQ and post these data to a tomcat which is a HTTP endpoint then once the http uri hit , camel get response from tomcat server.

Can we acknowledge the queue from the camel only after the successful routing, I mean acknowledge AMQ after the LOG_log_4 process done? If this is possible, then I can make this route without any loss of data in the queue even the http link broke down.

I found this link JMS ACKNOWLEDGE_MODE. If I need to use Acknowledge mode CLIENT_ACKNOWLEDGE then do I need to write consumer service at tomcat web app site to acknowledge the msg received success.

I am using Acknowledgemodename as AUTO_ACKNOWLEDGE.

Detail route diagram as follow:-

enter image description here

Spring-DSL application-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:cxf="http://camel.apache.org/schema/cxf"
    xmlns:jaxrs="http://cxf.apache.org/jaxrs"
    xmlns:osgi="http://www.springframework.org/schema/osgi"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans.xsd       http://www.springframework.org/schema/osgi       http://www.springframework.org/schema/osgi/spring-osgi.xsd       http://camel.apache.org/schema/spring       http://camel.apache.org/schema/spring/camel-spring.xsd       http://cxf.apache.org/jaxrs       http://cxf.apache.org/schemas/jaxrs.xsd       http://camel.apache.org/schema/cxf       http://camel.apache.org/schema/cxf/camel-cxf.xsd">
    <cxf:rsServer address="http://localhost:9000/employeeservice"
        id="restService" serviceClass="com.javainuse.beans.EmployeeServiceResource"/>
    <bean class="com.javainuse.beans.CamelProcessor" id="processor"/>
    <bean class="com.javainuse.beans.CamelAMQResponseProcessor" id="camelAMQResponseProcessor"/>
    <camelContext id="camelId" xmlns="http://camel.apache.org/schema/spring">
        <camel:route id="_route1">
            <camel:from id="_fromrest_1" uri="cxfrs://bean://restService"/>
            <camel:process id="_processrest_1" ref="processor"/>
            <camel:to id="_to1" uri="amq:queue:queue1x"/>
            <camel:log id="_log1" message="hit ws route 1"/>
        </camel:route>
        <camel:route id="_route2">
            <camel:from id="_from2" uri="amq:queue:queue1x?concurrentConsumers=2"/>
            <camel:log id="_log2" message="hit amq route 2"/>
            <camel:log id="_log3" message="Message returned from the queue is ${body}"/>
            <camel:setBody id="_setBody2">
                <simple>${body}</simple>
            </camel:setBody>
            <setHeader headerName="CamelHttpMethod" id="_setHeader1">
                <constant>PUT</constant>
            </setHeader>
            <setHeader headerName="Cookie" id="_setHeader2">
                <constant>JSESSIONID=Z2rLMNdkmpz72nG2FTj5SuLHn_rbZxHzs7wbw3LG</constant>
            </setHeader>
            <setHeader headerName="Authorization" id="_setHeader3">
                <constant>eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJQQVNTV09SRC9hcHBhZG1pbiIsImF1dGgiOiJERUYtcm9sZS1hcHBhZG1pbiIsImV4cCI6MTUzNTQzNjg3Mn0.AqfYVoAFsh6XDiIGGVO8lRpGyb1HcdP2HN1rUWtfDbYNJ6rdsnLnT5nrqmtPSR7YfUui7pWKxEF96NcLKppQAg</constant>
            </setHeader>
            <camel:to id="_to2" uri="http:localhost:8080/api/outboundmsg/"/>
            <camel:log id="_log4" message="Message returned from iConnect is ${body} with header ${in.headers.CamelHttpResponseCode}"/>
        </camel:route>
    </camelContext>
</beans>

ActiveMQ config applicationContext-mq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:cxf="http://camel.apache.org/schema/cxf"
    xmlns:osgi="http://www.springframework.org/schema/osgi"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd       http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf-2.8.3.xsd">
    <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq">
        <property name="connectionFactory" ref="singleCF"/>
        <property name="useSingleConnection" value="true"/>
        <property name="usePooledConnection" value="false"/>
        <property name="preserveMessageQos" value="true"/>
        <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"></property>
    </bean>
    <bean
        class="org.springframework.jms.connection.SingleConnectionFactory" id="singleCF">
        <property name="targetConnectionFactory" ref="AMQCF"/>
    </bean>
    <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF">
        <property name="userName" value="admin"/>
        <property name="password" value="admin"/>
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="AMQCF" />
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
    </bean>
</beans>

Thank you in advance.

1

1 Answers

0
votes

Broker receives the acknowledgement as soon as it is consumed by the consumer. i.e. In your case <camel:from id="_from2" uri="amq:queue:queue1x?concurrentConsumers=2"/>

Your requirement is not clear, however If you need to implement request reply over queues, you can set replyTo param to queue and push resoponses received from tomcat to that replyTo queue. Or, Let me know the exact requirement.