18
votes

I'm working on an application whose workflow is managed by passing messages in SQS, using boto.

My SQS queue is growing gradually, and I have no way to check how many elements it is supposed to contain.

Now I have a daemon that periodically polls the queue, and checks if i have a fixed-size set of elements. For example, consider the following "queue":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]

Now I want to check if I have "msg1_comp1", "msg2_comp1" and "msg3_comp1" in the queue together at some point in time, but I don't know the size of the queue.

After looking through the API, it seems you can either get only 1 element, or a fixed number of elements in the queue, but not all:

>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10

A suggestion proposed in the answers would be to get for example 10 messages in a loop until I get nothing back, but messages in SQS have a visibility timeout, meaning that if I poll elements from the queue, they won't be really removed, they will only be invisible for a short period of time.

Is there a simple way to get all messages in the queue, without knowing how many there are?

6

6 Answers

22
votes

I've been working with AWS SQS queues to provide instant notifications, so I need to be processing all of the messages in real time. The following code will help you to efficiently dequeue (all) messages and handle any errors when removing.

Note: to remove messages off the queue you need to delete them. I'm using the updated boto3 AWS python SDK, json library, and the following default values:

import boto3
import json

region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
    messages_to_delete = []
    for message in queue.receive_messages(
            MaxNumberOfMessages=max_queue_messages):
        # process message body
        body = json.loads(message.body)
        message_bodies.append(body)
        # add message to delete
        messages_to_delete.append({
            'Id': message.message_id,
            'ReceiptHandle': message.receipt_handle
        })

    # if you don't receive any notifications the
    # messages_to_delete list will be empty
    if len(messages_to_delete) == 0:
        break
    # delete messages to remove them from SQS queue
    # handle any errors
    else:
        delete_response = queue.delete_messages(
                Entries=messages_to_delete)
21
votes

Put your call to q.get_messages(n) inside while loop:

all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
    all_messages.extend(rs)
    rs=q.get_messages(10)

Additionally, dump won't support more than 10 messages either:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
    """Utility function to dump the messages in a queue to a file
    NOTE: Page size must be < 10 else SQS errors"""
8
votes

My understanding is that the distributed nature of the SQS service pretty much makes your design unworkable. Every time you call get_messages you are talking to a different set of servers, which will have some but not all of your messages. Thus it is not possible to 'check in from time to time' to set if a particular group of messages are ready, and then just accept those.

What you need to do is poll continuously, take all the messages as they arrive, and store them locally in your own data structures. After each successful fetch you can check your data structures to see if a complete set of message has been collected.

Keep in mind that messages will arrive out of order, and some messages will be delivered twice, as deletes have to propagate to all the SQS servers, but subsequent get requests sometimes beat out the delete messages.

3
votes

I execute this in a cronjob

from django.core.mail import EmailMessage
from django.conf import settings
import boto3
import json

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
         aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
         region_name=settings.AWS_REGION)

queue = sqs.get_queue_by_name(QueueName='email')
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)

while len(messages) > 0:
    for message in messages:
        mail_body = json.loads(message.body)
        print("E-mail sent to: %s" % mail_body['to'])
        email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']])
        email.send()
        message.delete()

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
0
votes

NOTE: This is not intended as a direct answer to the question. Rather it is an augmentation to @TimothyLiu's answer, assuming the end-user is using the Boto package (aka Boto2) not Boto3. This code is a "Boto-2-ization" of the delete_messages call referred to in his answer


Botodelete_message_batch(messages_to_delete)messages_to_deletedictidreceipt_handle

AttributeError: 'dict' object has no attribute 'id'.

It seems delete_message_batch expects a Message class object; copying the Boto source for delete_message_batch and allowing it to use a non-Message object (ala boto3) also fails if you're deleting more than 10 "messages" at a time. So, I had to use the following work-around.

eprint code from here

from __future__ import print_function
import sys
from itertools import islice

def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

@static_vars(counter=0)
def take(n, iterable, reset=False):
    "Return next n items of the iterable as same type"
    if reset: take.counter = 0
    take.counter += n
    bob = islice(iterable, take.counter-n, take.counter)
    if isinstance(iterable, dict): return dict(bob)
    elif isinstance(iterable, list): return list(bob)
    elif isinstance(iterable, tuple): return tuple(bob)
    elif isinstance(iterable, set): return set(bob)
    elif isinstance(iterable, file): return file(bob)
    else: return bob

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
  """
  Deletes a list of messages from a queue in a single request.
  :param cx: A boto connection object.
  :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
  :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
  """
  listof10s = []
  asSuc, asErr, acS, acE = "","",0,0
  res = []
  it = tuple(enumerate(messages))
  params = {}
  tenmsg = take(10,it,True)
  while len(tenmsg)>0:
    listof10s.append(tenmsg)
    tenmsg = take(10,it)
  while len(listof10s)>0:
    tenmsg = listof10s.pop()
    params.clear()
    for i, msg in tenmsg: #enumerate(tenmsg):
      prefix = 'DeleteMessageBatchRequestEntry'
      numb = (i%10)+1
      p_name = '%s.%i.Id' % (prefix, numb)
      params[p_name] = msg.get('id')
      p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
      params[p_name] = msg.get('receipt_handle')
    try:
      go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
      (sSuc,cS),(sErr,cE) = tup_result_messages(go)
      if cS:
        asSuc += ","+sSuc
        acS += cS
      if cE:
        asErr += ","+sErr
        acE += cE
    except cx.ResponseError:
      eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
    except:
      eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
  return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
  if sSuc == "": sSuc="None"
  if sErr == "": sErr="None"
  if cS == expect: sSuc="All"
  if cE == expect: sErr="All"
  return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)
-1
votes

Something like the code below should do the trick. Sorry it's in C#, but it shouldn't be hard to convert to python. The dictionary is used to weed out the duplicates.

    public Dictionary<string, Message> GetAllMessages(int pollSeconds)
    {
        var msgs = new Dictionary<string, Message>();
        var end = DateTime.Now.AddSeconds(pollSeconds);

        while (DateTime.Now <= end)
        {
            var request = new ReceiveMessageRequest(Url);
            request.MaxNumberOfMessages = 10;

            var response = GetClient().ReceiveMessage(request);

            foreach (var msg in response.Messages)
            {
                if (!msgs.ContainsKey(msg.MessageId))
                {
                    msgs.Add(msg.MessageId, msg);
                }
            }
        }

        return msgs;
    }