28
votes

I know SQS ain't build for that, but I'm curious is it possible to find messages in a queue that meet some criteria?

I can pull messages in a loop, search the message bodies for some pattern (without even deserializing them), and filter the messages I needed. But then it is possible to end up with an infinite loop - the first messages I read will be back to the queue by the time when I reach the end of the queue...

Extending visibility of the messages possible, but how do I know exactly how long it will take to scan the entire queue, and for how long should I extend the visibility? What if I have literally ten thousand messages in there?

Is there any workaround here? I need to scan the queue for some messages, and delete those...

10
Maybe SNS and topics could help you here, as I think what you are trying to have is a topic-based consumer setup. - igracia

10 Answers

27
votes

Short answer: no.

Queues are designed for things like tasks. A machine grabs a new task (i.e., message) from the queue, executes the task, then deletes the task.

If you're trying to search the messages to filter them, I can't help but wonder if you're using the wrong tool for the job…

6
votes

I do not think the short or long answers are "No".

Here are two counterpoint solutions that are "Yes".

  1. Traversing the Queue, maintaining a visited list
  2. Using Enterprise Integration Patterns (message routing) to split your messages into downstreams based on criteria

1. Traversing the Queue, maintaining a visited list

Consider the case of a queue with N messages, with no messages being added or deleted. Without additional information (e.g. if you knew how many messages should match your criteria), you need to traverse all N messages.

The key point here is knowing when you've traversed all N messages. There are some issues here.

  1. To know exactly, you'd have to track messages as they are added to the queue
  2. To know approximately, you can get the ApproximateNumberOfMessages attribute of the queue
  3. Or you could receive messages in a loop, maintaining a visited list, and assume that you will eventually sample and exhaust messages from each server your queue is sharded over

To maintain the visited list, as you receive messages and evaluate your match criteria, you could store the message_id of all visited messages.

Message ID's are nearly unique. See this thread

https://forums.aws.amazon.com/message.jspa?messageID=76119

If you went with (3), you wouldn't be certain how many iterations would be required to exhaust the queue. However, if you performed this indefinitely, you'd be guaranteed to exhuast the queue so long as the weighted random distribution over the SQS shard servers gives them all non-zero probability.

2. Using Enterprise Integration Patterns (message routing) to split your messages into downstreams based on criteria

If you have control over your messaging architecture you could use a Message Router as a "front-end" message processor that dispatches messages to various recipients based on criteria.

And specifically you'd use a Content-Based Router:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html

3
votes

We had a similar requirement and ended up with an architecture described in this "Hands On" tutorial: Filter Messages Published to Topics.

Essentially, instead of publishing events/messages to an SQS queue, you publish them to an SNS topic and each consumer will have their own SQS queue that is subscribed to the topic. You can then use SNS Subscription Filters to ensure only the relevant messages are enqueued to each consumer's queue.

This creates additional infrastructure overhead but it worked well as a solution for us.

1
votes

Even though when requesting specific attributes, the value will just be set to null for the messages that do not contain the attribute, you could still use to filter in a way. Those that do not have the attribute set the way you want can have their visibility set to 1 and then released, so they will remain on the queue. Would give a crude way of doing priority queuing, though you could just as easily do the same based on the message content.

1
votes

Tested with different cases. it does not work. The answer is NO

TestData

public void fillQueueWithMessages(){

  MessageAttributeValue value1 = new MessageAttributeValue();
  value1.setDataType("String");
  value1.setStringValue("1");

  SendMessageRequest send_msg_request = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test1").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request);


  MessageAttributeValue value2 = new MessageAttributeValue();
  value2.setDataType("String");
  value2.setStringValue("2");


  SendMessageRequest send_msg_request2 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test2").addMessageAttributesEntry(value2.getStringValue(), value2);
  amazonSqs.sendMessage(send_msg_request2);

  SendMessageRequest send_msg_request3 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test3").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request3);

}

Test

public void shouldPollMessagesBasedOnMessageAttribute() throws InterruptedException {

ReceiveMessageRequest request =
    new ReceiveMessageRequest(env.getProperty("cloud.aws.sqs.readyForTranslation.url"));
request.setMaxNumberOfMessages(3);
request.setWaitTimeSeconds(20);
request.withMessageAttributeNames("1");

List<Message> messages = new ArrayList<Message>();

messages = amazonSqs.receiveMessage(request).getMessages();

assertEquals(2, messages.size());
}
0
votes

For the humble devops engineer with total control of everything:

(1) Quickly turn off the consumers, so the message is captured in the queue.

(2) Turn off the source.

(3) Read all the SQS queue looking for your message, while also copying to a 'temp' queue.

(4) Copy all the 'temp' queue back into the SQS queue. Google it there are many tools.

(5) Restart source and consumers.

Another way if you had thought of it beforehand would be to use SNS or something to copy to a auxilary 'devops' queue and read through that when you need to find a message. You could set the retention period of the 'devops' queue shortish to keep it reasonable in size.

0
votes

OK This question is forever ago old and so is doron-BGU bgu' claim that attributes work as he described.

I had several "producers" sending in the Same JSON into my SQS, My "consumer" had to dance with them differently depending upon from where they originated,, mobile client, MVC client or other in house desk top app.

I seriously want to test doron-BGU bgu's theory really bad. But to be pragmatic,, my single consumer that performed distinctly different processes on these messages from the queue, just checked each one for a value in the JSON that I forced the producers to fill in which defined the source of the message. Apples go over here oranges over there kind of thing.

0
votes

Old topic, but may be helpful You can use FIFO with GroupId for small list of messages thread which helped me

-1
votes

Lets understand this via some examples so create 10 message and send it

// Send a message
for (int i = 0; i < 10; i++) {
    System.out.println("Sending a message to MyQueue.\n");
    Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    // extra code

    String sdate;
    Format formatter;
    Date date = new Date();

    // 2012-12-01
    formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
    sdate = formatter.format(date);
    System.out.println(sdate);

    messageAttributes.put("Datestamp"+i, new MessageAttributeValue().withDataType("String").withStringValue(sdate));

    Map<String, MessageAttributeValue> messageAttributes1 = new HashMap<>();
    messageAttributes1.put("attributeName", new MessageAttributeValue().withDataType("String").withStringValue(sdate));
    SendMessageRequest request = new SendMessageRequest();
    request.withMessageBody("A test message body."+sdate);
    request.withQueueUrl(myQueueUrl);
    request.withMessageAttributes(messageAttributes);
    sqs.sendMessage(request);
}

Now even you have 10 message with datetimestamp1 to datetimestamp10
filtering with attribute will not work

lets try filter with some myTag attribute

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

//ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveMessageRequest.withMaxNumberOfMessages(10);
receiveMessageRequest.withMessageAttributeNames("myTag");
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

It gives 10 message and myTag value is null

message.getMessageAttributes().get("Datestamp") is null message.getMessageAttributes().get("myTag") is null

So we cannot filter with message attribute as if that key is not found. no message attribute or with All message attribute is same .

So long answer is NOOOOO

-6
votes

this is actually not all true,

in fact you can 'kinda' filter messages in a queue using messages attributes trick.

each message can contain attributes that you can add while creating the message (you will need to provide 3 things for each attribute: name, type, value).

later on, when you make new ReceiveMessageRequest object, you can use "withMessageAttributeNames" to specify an attribute, and what is actually happen is that your queue get filtered for the messaged containing that specific attribute.

for example:

String queueUrl = sqs.getQueueUrl("myQueue").getQueueUrl();

ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveRequest.withMaxNumberOfMessages(10);
receiveRequest.withMessageAttributeNames("myTag");

if your Queue was holding 5 messages but only 1 had the "myTag" attribute then only that specific one will be returned.

this was overwhelming for me as this is not mentioned in the ReceiveMessageRequest API

so basically all you have to do is give each message a unique attribute (please pay attention the attributes limits: The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore (_), hyphen (-), and period (.). The name must not start or end with a period, and it should not have successive periods. The name is case sensitive and must be unique among all attribute names for the message. The name can be up to 256 characters long. The name cannot start with "AWS." or "Amazon." (or any variations in casing)