2
votes

I read messages from RabbitMQ with the pika python library. Reading messages in the loop is done by

connection = rpc.connect()
channel = connection.channel()
channel.basic_consume(rpc.consumeCallback, queue=FromQueue, no_ack=Ack)
channel.start_consuming()

This works fine. But I also have the need to read one single message, which I do with:

method, properties, body = channel.basic_get(queue=FromQueue)
rpc.consumeCallback(Channel=channel,Method=method, Properties=properties,Body=body)

But when there is no message in the queue, the script is craching. How do I implement the get_empty() method described here ?

3
The channel.start_consuming is blocking. How can you call channel.basic_get? Are you using separate threads?noxdafox
No, I use either one of them. It's a parameter which decides which one is used.cwhisperer

3 Answers

1
votes

I solved it temporarily with a check on the response like:

method, properties, body = channel.basic_get(queue=FromQueue)
if(method == None):
    ## queue is empty
0
votes

you can check empty in body like this:

def callback(ch, method, properties, body):
    decodeBodyInfo = body.decode('utf-8')
    if decodeBodyInfo != '':
        cacheResult = decodeBodyInfo
        ch.stop_consuming()

That so simple and easy to use :D

0
votes

In case you're using the channel.consume generator in a for loop, you can set the inactivity_timeout parameter.

From the pika docs,

:param float inactivity_timeout: if a number is given (in seconds), will cause the
method to yield (None, None, None) after the given period of inactivity; this 
permits for pseudo-regular maintenance activities to be carried out by the user 
while waiting for messages to arrive. If None is given (default), then the method 
blocks until the next event arrives. NOTE that timing granularity is limited by the 
timer resolution of the underlying implementation.NEW in pika 0.10.0.

so changing your code to something like this might help

        for method_frame, properties, body in channel.consume(queue, inactivity_timeout=120):

            # break of the loop after 2 min of inactivity (no new item fetched)
            if method_frame is None
                break

Don't forget to properly handle the channel and the connection after exiting the loop