2
votes

I'm using SQS as a queue for video encoding and want to ensure that only a single encoding is performed per video.

SQS works fine in that when a message is queued, it will only be received by a single thread. However, it's possible that multiple messages could be sent to the queue for the same video/encoding, meaning the message content would be the same for the particular 'encoding' queue.

Is there anyway to de-duplicate to ensure that for a specific queue, that the messages in the queue or received from a queue, are unique?

One option I thought would be to create a new queue for each encoding type, as the message is sent. So the queue could be named something like encoding-video-id, which would only have a single message and I could check to ensure that the queue does not yet exist. The only "issue" is that there could be 1000's to 10's of thousands of these queues created.

5
So what could cause you to enqueue the same message multiple times?Mike Brant
The use case is that users can submit 'encode' which queues the video, in edge cases it's possible for it to be hit multiple times, which would result in multiple messages.dzm
Just noticed you can create "unlimited" queues in sqs, so possibly the option above could work.dzm
Even without the possibility of a user queuing a duplicate task, SQS itself does not guarantee "exactly once" delivery of a message. It guarantees "at least once", so SQS itself can deliver duplicate messages. I think the answers to these questions are relevant to your issue: stackoverflow.com/questions/32386877/… and stackoverflow.com/questions/13484845/…Mark B
@mbaird I think this will end being what needs to be done. Basically using atomic operations in redis and setting a lower TTL on it (which is updated while being processed). Could simply use INCR with a unique key based on the video guid and check if it exists or not. If TTL on this is say 20s and TTL on SQS is 1m, both being updated while a job is being processed every 10s, I think that should solve the issues of dedup and also allow for retries of SQS.dzm

5 Answers

3
votes

IMO, creating unlimited amount of queues with a single message in each is a really bad design, even if theoretically it would work.

If it was me, I'd try to make sure each video had some sort of unique identifier, that was the same even if the user 'double-clicked' the process button.

I would invision a system where the video, with a unique name (such as a guid) was uploaded to S3, a message gets put in the queue, your threads pickup the message from the queue and do the encoding and then write the video back to a different S3 bucket, but with the same base name.

Before processing any video, I would first check the 'output bucket' to see if there is already an encoded video there, with the matching name, and if it was - I'd skip the reprocessing and delete the message.

If everything is running on an EC2 local disk (and you are not using S3), then the same could be done using an input and output directory on the hard disk (but that would assume that multiple machines aren't doing the processing.

Its important to remember, that its possible for the same message to get delivered by SQS - even if the user only submitted it once. It happens, though rarely, so whatever system you setup you need to make sure if/when you do get the occassional duplicate it doesn't break anything.

2
votes

There is no way to ensure the uniqueness of messages in an SQS Queue, or ordering for that matter. Also, having too many queues isn't a good idea.

In my opinion, you need to add another component to your system. A meta data service of some kind would suffice. It could work something like this:

  • When you create an encoding task (before adding it to SQS), you could write it to your meta data service.
  • When a worker receives an encoding task, it would query the meta data service to see if the task has already been completed
  • When a worker completes an encoding task, it would mark the task as completed in the meta data service

If you're uploading the outputs of these encoding jobs to S3, you could effectively use S3 itself as the meta data service. If each video has a unique name/id, you could save the output in S3 with the key of this unique id. Or set it as an S3 meta-data key value (this would make the file a little harder to find as you can't just query the S3 meta data service). Then, When a worker receives an encoding task, it would check if the file already exists on S3, in which case it would delete the message from SQS and skip the task.

If you're not saving the outputs to S3, you'll probably need to employ a database of some kind. Dynamo DB could probably be helpful in terms of speed and cost.

Hope this helps! :)

1
votes

Your suggested solution is a bad design, even if it is possible or not. Following is my approach to the problem.

I will use a database (probably DynamoDB) to store an unique id based on the encoded type of the video and I will add a column called status. As soon as user click on the convert button, first, I will check the database. If item is not available, a new record will be pushed to the database with the status "Converting". Then push the work into SQS. After processing the workload, change the status of the database to "Finished". If user click again on the convert button show the result based on the status variable in the database.

1
votes

There is a way though to check only for unique message after reaceiving data from queue. I will explain the same below.

Let's say you are adding random messages (irrespective of any id or, anything) frequently to a single SQS queue. The logic is for at the time of receiving messages from the queue.

While creating the ReceiveMessageRequest object, you can specify the AttributeNames. So, add "ApproximateReceiveCount" attribute to request object. That will fetch the "ApproximateReceiveCount" value along with each message fetched from the SQS queue.

Now, for the messages which have been read for the first time, the "ApproximateReceiveCount" is 1. Otherwise this value will be greater than 1. So, you can consider only those mesages each time you do a SQS read. Just limit the maximum number of messages read each time by setting the "MaxNumberOfMessages" property of the request object to make sure that you don't get a huge payload on each read (Each 64 KB chunk of a payload is billed as 1 request).

I know, FIFO queue will do a much better job in some cases. But, it has few limitations-

  • It has limited throughput (only 300 transactions per second (TPS))
  • Currently it has support for only two regions (US West (Oregon) and US East (Ohio) regions)

Please find the C# code bellow explaining the logic-

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;

namespace DriverDataPooler1
{
    class Program
    {
        AmazonSQSClient objClient = new AmazonSQSClient
                ("<AWSAccessKeyId>", "<AWSSecretAccessKey>", Amazon.RegionEndpoint.APSouth1);
        //Create New SQS Queue
        CreateQueueResponse queueResponse = new CreateQueueResponse();
        ListQueuesResponse objqueuesResponseList = new ListQueuesResponse();

        // Declare the request and response objects
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
        ReceiveMessageResponse receiveMessageResponse = new ReceiveMessageResponse();

        static void Main(string[] args)
        {
            Program p1 = new Program();
            p1.getQueueData();
        }

        public void getQueueData(){

            objqueuesResponseList = objClient.ListQueues(new ListQueuesRequest());
            List<String> QueueList = objqueuesResponseList.QueueUrls;



            // Receive Message from SQS Queue
            if (QueueList.Any())
            {
                // I am only considering the first queue here as I have only one SQS queue
                receiveMessageRequest.QueueUrl = QueueList[0];
                receiveMessageRequest.WaitTimeSeconds = 20;

                //You can limit t6he number of messages to decrease the mayload amount (depends on the size of each message) 
                receiveMessageRequest.MaxNumberOfMessages = 10;
                receiveMessageRequest.AttributeNames = new List<string>() { "ApproximateReceiveCount" };
                receiveMessageResponse = objClient.ReceiveMessage(receiveMessageRequest);
                List<Message> result = receiveMessageResponse.Messages;
                if (result.Any())
                {
                    foreach (Message res in result)
                    {
                        // Checking for the messages that are read for the first time
                        if (Int16.Parse(res.Attributes["ApproximateReceiveCount"]) == 1)

                            // Process you messages here 
                            Console.WriteLine(res.Body);
                    }
                }
                else
                {
                    Console.WriteLine("You have no new messages in your SQS");
                }
            }
            else
            {
                Console.WriteLine("You have no available SQS");
            }
            Console.ReadKey();

        }
    }
}

Please comment if you have any further query.

1
votes

SQS Has a Deduplication ID Property. Messages sent with the same deduplication ID within a 5-minute window will be successfully recieved, but not actually added to the queue.

You can use this to prevent extra queuing of the same video.

There is some added complexity, even if the message is processed, additional messages with the same deduplication ID won't get queued until the window has elapsed. Likewise, if you send the same ID after the window has elapsed, the message will queue again, which may also be undesired.

However, rather than maintain your own buffer of queued videos, the Deduplication ID should grant you the behaviour you are requesting.