I currently have a Rabbit MQ Broker setup with a direct exchange and I'm using Spring Integration to produce and consume message from the queues. I'm using the chain of responsibility pattern where I pass the consumed message from one POJO to another using SI. Now to do this I have used a Spring Integration inbound channel adapter. This requires that it has a reference to a poller.
How can I remove the poller so that it works not on a polling mechanism but using callbacks so that as soon as a message is available it will fetch it from the queue while still being able to use the chain of responsibility pattern provided by SI.
I've tried changing the inboud channel adapter to a Spring Rabbit inbound channel adapter but that has no way of then forwarding the message to my POJO. The following is not supported.
ref="eventConsumer" method="onReceiveEvent"
I've got some simple config defined below, how can I achieve the same thing without the need for polling?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<import resource="classpath:datasource-config.xml" />
<int:logging-channel-adapter id="logger" level="DEBUG" log-full-message="true" />
<int:channel id="eventChannel">
<int:interceptors>
<int:wire-tap channel="logger" />
</int:interceptors>
</int:channel>
<int:channel id="errorChannel">
<int:interceptors>
<int:wire-tap channel="logger" />
</int:interceptors>
</int:channel>
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="${rabbitmq.host}" />
<property name="port" value="${rabbitmq.port}" />
<property name="username" value="${rabbitmq.username}" />
<property name="password" value="${rabbitmq.password}" />
<property name="virtualHost" value="${rabbitmq.events.virtual.host}" />
</bean>
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg name="connectionFactory" ref="connectionFactory" />
<property name="exchange" value="${rabbitmq.events.exchange.name}" />
<property name="routingKey" value="${rabbitmq.events.routing.key}" />
</bean>
<bean id="pollingInterval" class="java.lang.String">
<constructor-arg value="${rabbitmq.message.polling.interval}" />
</bean>
<int:poller id="rabbitConsumerPoller" fixed-rate="#{pollingInterval}" error-channel="errorChannel" />
<bean id="eventConsumer" class="com.idna.events.consumer.EventsConsumer">
<property name="rabbitTemplate" ref="rabbitTemplate"/>
<property name="consumerDao" ref="eventsConsumerDao" />
<property name="queueName" value="${rabbitmq.events.queue.name}" />
</bean>
<int:inbound-channel-adapter id="inboundChannelAdapter" channel="eventChannel" ref="eventConsumer" method="onReceiveEvent">
<int:poller ref="rabbitConsumerPoller"/>
</int:inbound-channel-adapter>
</beans>
Thanks!