2
votes

I want to consume the messages in my Storm Spout from a rabbitMq Queue.

Now , we are using Spring AMQP to send and receive messages from RabbitMq asynchronously.

Spring AMQP provides mechanism(either creating a listener or using annotation @RabbitListner) to read message from the queue .

The problem is I can have a Listener to read the message from the Queue. But how do I send this message to my Storm Spout which is running on storm cluster ?

The topology will start a cluster, but in my nextTuple() method of my spout , I need to read message from this Queue. Can Spring AMQP be used here ?

I have a listener configured to read message from the queue:

@RabbitListener(queues = "queueName")
public void processMessage(QueueMessage message) {

} 

How can the above message received at the listener be sent to my spout running on a cluster .

Alternatively , how can a spout's nextTuple() method have this method inside it ? Is it possible

I am using Java as a language here.

1

1 Answers

2
votes

You can read messages on-demand (rather than being message-driven) by using one of the RabbitTemplate receive or receiveAndConvert methods.

By default, they will return null if there is no message in the queue.

EDIT:

If you set the receiveTimeout (available in version 1.5 or above), the receive methods will block for that time (it uses an async consumer internally and does not poll).

But it's still not as efficient as the listener because a new consumer is created for each method; to use a listener you would need to use some internal blocking mechanism in nextTuple() (e.g. a BlockingQueue) to wait for messages.