1
votes

I am producing to an ActiveMQ instance with a Camel ActiveMQ component:

<camel:log message="YMA_IN" />
<camel:inOnly uri="activemqBroker:queue:queue.test" id="activemqBrokerTestQueue"/>
<camel:log message="YMA_OUT" />

The only jms configuration on the endpoint is TTL and a pooled connection factory.

<amq:connectionFactory id="jmsConnectionFactory" brokerURL="${config.jms.broker.url}" />

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
    <property name="maxConnections" value="15" />
    <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="timeToLive" value="${config.jms.time.to.live}" />
</bean>

<!-- Broker configuration -->
<bean id="activemqBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig"/>
</bean>  

If the broker is down, exchanges remain stuck on the inonly (logging IN but not OUT), seemingly indefinitely until the broker is back up. No exception, no timeout etc...

Is this the expected behaviour if the broker is down? I was expecting an exception? Is there some configuration I am missing?

Camel 2.10.1

3

3 Answers

1
votes

There is also options to use synchronous sends, then the client will detect the error sooner (and not have to wait for a timeout) and be able to throw an exception that Camel can detect. See some details at

0
votes

In fact this seems to be the expected behaviour: timeouts need to be configured on the broker URL itself.

The following answer shows how to configure it:

https://stackoverflow.com/a/15416704/609452

Then a simple try/catch of JMSException works

0
votes

With this config I got Camel to fail messages once ActiveMQ broker goes down:

    ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory();
    mqConnectionFactory.setUseAsyncSend(true);
    mqConnectionFactory.setCloseTimeout(closeTimeout);
    mqConnectionFactory.setBrokerURL(brokerUrl);
    mqConnectionFactory.setUserName(userName);
    mqConnectionFactory.setPassword(password());

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setMaxConnections(maxConnections);
    pooledConnectionFactory.setConnectionFactory(mqConnectionFactory);

    JmsConfiguration jmsConfiguration = new JmsConfiguration();
    jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
    jmsConfiguration.setTransacted(false);

    ActiveMQComponent amqComponent = new ActiveMQComponent();
    amqComponent.setConfiguration(jmsConfiguration);