I have set a sqs trigger in lambda with
batch size : 3
batch window : 300 seconds
concurrency: 1
The SQS queue is set with
visiblity timeout: 3 minutes
The idea here is to process 3 files at a time.
This is how the lambda code looks like
def lambda_handler(event, context):
maximum_jobs = 3
sqs_client = boto3.client(
'sqs'
)
for i in range(len(event["Records"])):
msg_string = event["Records"][i]["body"]
if get_active_executions() < maximum_jobs:
start_execution()
receipt_handle = event["Records"][i]["receiptHandle"]
delete_sqs_message(sqs_client, "myqueue", receipt_handle)
else:
print(f"Already {maximum_jobs} jobs running")
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
To test the success scenario, i pushed 6 entries into the queue.
- First 3 got processed immediately and first 3 messages are deleted from the queue.
- Other 3 were waiting in flight mode (since, i have set the concurrency as 1 and batch size as 3)
- After visibility timeout of 3 mins, the remaining 3 messages came back to the queue and the trigger picked up the remaining 3 files. the remaining 3 also got deleted.
- All 6 files got processed successfully.
Now, I tested the same code with 9 files
- First 3 got processed immediately (say file1, file2, file3)
- The second batch (after 3 mins) got triggered (say file4, file5, file6) . However at that time, the first batch was running. I could see the log - "Already 3 jobs running". Hence, it didnt get process. As per the code, i didnt delete it either.
- Now, the third batch got triggered (say file7, file8, file9). By this time, the first batch execution was over and the hence all 3 got processed and deleted from queue successfully.
I was expecting for another batch trigger in 3 mins (file4, file5, file6). However, it didnt happen. I noticed that the queue is empty.
Does the message disappear from the queue if the event is triggered and not manually deleted? what is the expected behaviour. Am i missing something in the code ?
start_execution()? Are you triggering another task outside of your Lambda function? If so, do you need to wait for that? It sounds a bit likegetActiveExecutions()is checking some outside task if something is running and this task takes longer than expected, hence you're seeing thisAlready 3 jobs runningbecause you didn't wait for the task to complete in your first batch. - s.hesseWhen your function successfully processes a batch, Lambda deletes its messages from the queue- s.hesse