0
votes

We have a service that generates 3000 files per minute. The file size is less than 5 KB. These are stored in blob storage in azure. We need to concatenate these files and send it to S3. The final file size in s3 should be between 10MB -100MB (This data goes to snowflake through Snowpipe.). How can this be achieved in a fast and cost effective way.

Adding more information: What I have already tried:

1) Sending a blob create event to azure queue. Queue trigger function to load data to S3. Then using aws lambda to concatenate (but lambda usually times out)

2)Python code that uses multiprocessing that reads the azure queue and blob and then concatenate the data to create a 10 MB file and send it to S3. Tried running this code from an azure webjob.(Webjob only has 4 cores). This is not fast enough and not scalable.

I need a solution that can run tasks in parallel in most cost effective way and is scalable. It can be a batch process. The latency of data in S3 can be 24 hours. (Can not use azure batch as we have already exhausted the number of accounts for our subscription plan for some other process. ).

Any recommendations for ETL tools or services that will be best suited for this case.

1
Is the data on Azure ingested to Snowflake first before needing to be sent on to S3?Mike Donovan
You can use pypi.org/project/azure-storage-blob/#downloading-a-blob for downloading blobs and custom python script to concatenate files honoring your limitation and then use boto3.amazonaws.com/v1/documentation/api/latest/guide/… for uploading concatenated files to aws s3prudviraj
@MikeDonovan we have a aws snowflake account and our data is in azure. Snowflake does not have cross cloud auto ingest and I need to move this data to s3 for snowpipes to consume. For snowpipe the recommended file size is 10MB -100 MB.sanjita bisht

1 Answers

0
votes

One creative option that comes to mind is to create AWS Lambda function which will perform several steps:

  • Generate name for the output object in S3
  • List Blobs (probably by the pattern, up to your specifics)
  • Iterate through the list
  • Download blob
  • Save all the content to the object using multipart upload (suggest using smart_open lib)
  • [Optional] Delete all source files

Now you can schedule this function to run every 6-10 minutes (depending on the size of the input files)

The code of the function should resemble the following (though this is not tested, only compiled from docs)

import datetime

import smart_open
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobClient

_STORAGE_ACCOUNT_NAME = "<storage account name>"
_STORAGE_ACCOUNT_KEY = "<storage account key>"
_CONTAINER_NAME = "<container name>"

blob_service = BlockBlobService(
    account_name=_STORAGE_ACCOUNT_NAME,
    account_key=_STORAGE_ACCOUNT_KEY)

s3_path = f"s3://your-bucket/prefix/path/{datetime.now().strftime('%Y-%m-%d-%H-%M')}.gz"


def lambda_handler(a, b):
    generator = blob_service.list_blobs(_CONTAINER_NAME)

    with smart_open.open(s3_path, 'wb') as out:
        for blob in generator:
            blob_client = BlobClient.from_blob_url(
                blob_url=f"https://account.blob.core.windows.net/{blob.container}/{blob.name}")
            out.write(blob_client.download_blob().readall())


if __name__ == '__main__':
    lambda_handler(None, None)