1
votes

I am trying to create a custom .NET activity in Azure, which creates a pool of VMs and a job to process the extraction of files on the pool nodes. So far I have the code for all this, but I am not sure how to download the processed file from the node back to the blob storage. The excutable I am running is a third party exe with dlls, and I do not have access to the code. Here is my code:

using Microsoft.Azure.Batch;
using Microsoft.Azure.Batch.Auth;
using Microsoft.Azure.Batch.Common;
using Microsoft.Azure.Batch.FileStaging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.IO;

namespace unzipper
{


/// <summary>
/// In this class, the Batch Service is used to process a set of input blobs that are zip files in parallel on multiple 
/// compute nodes. Each task represents a single zip file.
/// 
/// A run-once job is created followed by multiple tasks which each task assigned to process a
/// specific blob. It then waits for each of the tasks to complete where it prints out the results for
/// each input blob.
/// </summary>
public static class Job
{
    // files that are required on the compute nodes that run the tasks
    private const string UnzipperExeName = "wgrib2.exe";
    private const string StorageClientDllName1 = "cyggcc_s-1.dll";
    private const string StorageClientDllName2 = "cyggfortran-3.dll";
    private const string StorageClientDllName3 = "cyggomp-1.dll";
    private const string StorageClientDllName4 = "cygwin1.dll";

    // Storage account credentials
    private const string StorageAccountName = "account";
    private const string StorageAccountKey = "key...";



    public static void JobMain(string[] args)
    {
        //Load the configuration
        Settings unzipperSettings = Settings.Default;

        CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(
            new StorageCredentials(
                unzipperSettings.StorageAccountName,
                unzipperSettings.StorageAccountKey),
            unzipperSettings.StorageServiceUrl,
            useHttps: true);

        StagingStorageAccount stagingStorageAccount = new StagingStorageAccount(
            unzipperSettings.StorageAccountName,
            unzipperSettings.StorageAccountKey,
            cloudStorageAccount.BlobEndpoint.ToString());

        // Construct the Storage account connection string
        string storageConnectionString = String.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
                                                        StorageAccountName, StorageAccountKey);

        // Retrieve the storage account
        CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);
        // Create the blob client, for use in obtaining references to blob storage containers
        CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

        const string outputContainerName = "output";
        // Obtain a shared access signature that provides write access to the output container to which
        // the tasks will upload their output.
        string outputContainerSasUrl = GetContainerSasUrl(blobClient, outputContainerName, SharedAccessBlobPermissions.Write);


        using (BatchClient client = BatchClient.Open(new BatchSharedKeyCredentials(unzipperSettings.BatchServiceUrl, unzipperSettings.BatchAccountName, unzipperSettings.BatchAccountKey)))
        {
            string stagingContainer = null;
            //create pool
            CloudPool pool = CreatePool(unzipperSettings, client);

            try
            {
                CreateJob(unzipperSettings, client);

                List<CloudTask> tasksToRun = CreateTasks(unzipperSettings, stagingStorageAccount, outputContainerSasUrl);

                AddTasksToJob(unzipperSettings, client, stagingContainer, tasksToRun);

                MonitorProgess(unzipperSettings, client);


            }
            finally
            {
                Cleanup(unzipperSettings, client, stagingContainer);
            }
        }

    }
    private static void DownloadBlobsFromContainerAsync(CloudBlobClient blobClient, string containerName, string directoryPath)
    {
        Console.WriteLine("Downloading all files from container [{0}]...", containerName);

        // Retrieve a reference to a previously created container
        CloudBlobContainer container = blobClient.GetContainerReference(containerName);

        // Get a flat listing of all the block blobs in the specified container
        foreach (IListBlobItem item in container.ListBlobs(prefix: null, useFlatBlobListing: true))
        {
            // Retrieve reference to the current blob
            CloudBlob blob = (CloudBlob)item;

            // Save blob contents to a file in the specified folder
            string localOutputFile = Path.Combine(directoryPath, blob.Name);
            blob.DownloadToFileAsync(localOutputFile, FileMode.Create);
        }

        Console.WriteLine("All files downloaded to {0}", directoryPath);
    }
    private static void Cleanup(Settings unzipperSettings, BatchClient client, string stagingContainer)
    {
        //Delete the pool that we created
        if (unzipperSettings.ShouldDeletePool)
        {
            Console.WriteLine("Deleting pool: {0}", unzipperSettings.PoolId);
            client.PoolOperations.DeletePool(unzipperSettings.PoolId);
        }

        //Delete the job that we created
        if (unzipperSettings.ShouldDeleteJob)
        {
            Console.WriteLine("Deleting job: {0}", unzipperSettings.JobId);
            client.JobOperations.DeleteJob(unzipperSettings.JobId);
        }

        //Delete the containers we created
        if (unzipperSettings.ShouldDeleteContainer)
        {
            DeleteContainers(unzipperSettings, stagingContainer);
        }
    }

    private static void MonitorProgess(Settings unzipperSettings, BatchClient client)
    {
        //Get the job to monitor status.
        CloudJob job = client.JobOperations.GetJob(unzipperSettings.JobId);

        Console.Write("Waiting for tasks to complete ...   ");
        // Wait 120 minutes for all tasks to reach the completed state. The long timeout is necessary for the first
        // time a pool is created in order to allow nodes to be added to the pool and initialized to run tasks.
        IPagedEnumerable<CloudTask> ourTasks = job.ListTasks(new ODATADetailLevel(selectClause: "id"));
        client.Utilities.CreateTaskStateMonitor().WaitAll(ourTasks, TaskState.Completed, TimeSpan.FromMinutes(120));
        Console.WriteLine("tasks are done.");

        foreach (CloudTask t in ourTasks)
        {
            Console.WriteLine("Task " + t.Id);
            Console.WriteLine("stdout:" + Environment.NewLine + t.GetNodeFile(Microsoft.Azure.Batch.Constants.StandardOutFileName).ReadAsString());
            Console.WriteLine();
            Console.WriteLine("stderr:" + Environment.NewLine + t.GetNodeFile(Microsoft.Azure.Batch.Constants.StandardErrorFileName).ReadAsString());
        }
    }
    /// <summary>
    /// Returns a shared access signature (SAS) URL providing the specified permissions to the specified container.
    /// </summary>
    /// <param name="blobClient">A <see cref="Microsoft.WindowsAzure.Storage.Blob.CloudBlobClient"/>.</param>
    /// <param name="containerName">The name of the container for which a SAS URL should be obtained.</param>
    /// <param name="permissions">The permissions granted by the SAS URL.</param>
    /// <returns>A SAS URL providing the specified access to the container.</returns>
    /// <remarks>The SAS URL provided is valid for 2 hours from the time this method is called. The container must
    /// already exist within Azure Storage.</remarks>
    private static string GetContainerSasUrl(CloudBlobClient blobClient, string containerName, SharedAccessBlobPermissions permissions)
    {
        // Set the expiry time and permissions for the container access signature. In this case, no start time is specified,
        // so the shared access signature becomes valid immediately
        SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
        {
            SharedAccessExpiryTime = DateTime.UtcNow.AddHours(2),
            Permissions = permissions
        };

        // Generate the shared access signature on the container, setting the constraints directly on the signature
        CloudBlobContainer container = blobClient.GetContainerReference(containerName);
        string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

        // Return the URL string for the container, including the SAS token
        return String.Format("{0}{1}", container.Uri, sasContainerToken);
    }

    private static void AddTasksToJob(Settings unzipperSettings, BatchClient client, string stagingContainer, List<CloudTask> tasksToRun)
    {
        // Commit all the tasks to the Batch Service. Ask AddTask to return information about the files that were staged.
        // The container information is used later on to remove these files from Storage.
        ConcurrentBag<ConcurrentDictionary<Type, IFileStagingArtifact>> fsArtifactBag = new ConcurrentBag<ConcurrentDictionary<Type, IFileStagingArtifact>>();
        client.JobOperations.AddTask(unzipperSettings.JobId, tasksToRun, fileStagingArtifacts: fsArtifactBag);

        // loop through the bag of artifacts, looking for the one that matches our staged files. Once there,
        // capture the name of the container holding the files so they can be deleted later on if that option
        // was configured in the settings.
        foreach (var fsBagItem in fsArtifactBag)
        {
            IFileStagingArtifact fsValue;
            if (fsBagItem.TryGetValue(typeof(FileToStage), out fsValue))
            {
                SequentialFileStagingArtifact stagingArtifact = fsValue as SequentialFileStagingArtifact;
                if (stagingArtifact != null)
                {
                    stagingContainer = stagingArtifact.BlobContainerCreated;
                    Console.WriteLine(
                        "Uploaded files to container: {0} -- you will be charged for their storage unless you delete them.",
                        stagingArtifact.BlobContainerCreated);
                }
            }
        }
    }

    private static List<CloudTask> CreateTasks(Settings unzipperSettings, StagingStorageAccount stagingStorageAccount, string outputContainerSasUrl)
    {
        // create file staging objects that represent the executable and its dependent assembly to run as the task.
        // These files are copied to every node before the corresponding task is scheduled to run on that node.
        FileToStage unzipperExe = new FileToStage(UnzipperExeName, stagingStorageAccount);
        FileToStage storageDll1 = new FileToStage(StorageClientDllName1, stagingStorageAccount);
        FileToStage storageDll2 = new FileToStage(StorageClientDllName2, stagingStorageAccount);
        FileToStage storageDll3 = new FileToStage(StorageClientDllName3, stagingStorageAccount);
        FileToStage storageDll4 = new FileToStage(StorageClientDllName4, stagingStorageAccount);



        //get list of zipped files
        var zipFiles = GetZipFiles(unzipperSettings).ToList();
        Console.WriteLine("found {0} zipped files", zipFiles.Count);


        // initialize a collection to hold the tasks that will be submitted in their entirety. This will be one task per file.
        List<CloudTask> tasksToRun = new List<CloudTask>(zipFiles.Count);
        int i = 0;
        foreach (var zipFile in zipFiles)
        {
            //CloudTask task = new CloudTask("task_no_" + i, String.Format("{0} --Task {1} {2} {3}",
            //    UnzipperExeName,
            //    zipFile.Uri,
            //    unzipperSettings.StorageAccountName,
            //    unzipperSettings.StorageAccountKey));

            string outputFileName = System.IO.Path.GetFileName(zipFile.Uri.ToString());

            CloudTask task = new CloudTask("task_no_" + i, String.Format("{0} {1} -csv {2}.csv ",
                UnzipperExeName,
                zipFile.Uri,
                outputFileName));

            //wgrib2.exe gfs.t00z.pgrb2.1p00.f000 - csv junk.csv


            //This is the list of files to stage to a container -- for each job, one container is created and 
            //files all resolve to Azure Blobs by their name (so two tasks with the same named file will create just 1 blob in
            //the container).
            task.FilesToStage = new List<IFileStagingProvider>
                                        {
                                            unzipperExe,
                                            storageDll1,
                                            storageDll2,
                                            storageDll3,
                                            storageDll4

                                        };

            tasksToRun.Add(task);
            i++;
        }

        return tasksToRun;
    }

    private static void CreateJob(Settings unzipperSettings, BatchClient client)
    {
        Console.WriteLine("Creating job: " + unzipperSettings.JobId);
        // get an empty unbound Job
        CloudJob unboundJob = client.JobOperations.CreateJob();
        unboundJob.Id = unzipperSettings.JobId;
        unboundJob.PoolInformation = new PoolInformation() { PoolId = unzipperSettings.PoolId };

        // Commit Job to create it in the service
        unboundJob.Commit();

    }

    private static CloudPool CreatePool(Settings unzipperSettings, BatchClient client)
    {
        //OSFamily 4 == OS 2012 R2. You can learn more about os families and versions at:
        //http://msdn.microsoft.com/en-us/library/azure/ee924680.aspx
        CloudPool pool = client.PoolOperations.CreatePool(
            poolId: unzipperSettings.PoolId,
            targetDedicated: unzipperSettings.PoolNodeCount,
            virtualMachineSize: unzipperSettings.MachineSize,
            cloudServiceConfiguration: new CloudServiceConfiguration(osFamily: "4"));
        pool.MaxTasksPerComputeNode = unzipperSettings.MaxTasksPerNode;
        Console.WriteLine("Adding pool {0}", unzipperSettings.PoolId);

        try
        {
            pool.Commit();
        }
        catch (AggregateException ae)
        {
            // Go through all exceptions and dump useful information
            ae.Handle(x =>
            {
                Console.Error.WriteLine("Creating pool ID {0} failed", unzipperSettings.PoolId);
                if (x is BatchException)
                {
                    BatchException be = x as BatchException;

                    Console.WriteLine(be.ToString());
                    Console.WriteLine();
                }
                else
                {
                    Console.WriteLine(x);
                }

                // can't continue without a pool
                return false;
            });
        }
        catch (BatchException be)
        {
            if (be.Message.Contains("conflict"))
            {
                Console.WriteLine("pool already exists");
            }
        }

        return pool;
    }

    /// <summary>
    /// create a client for accessing blob storage
    /// </summary>
    private static CloudBlobClient GetCloudBlobClient(string accountName, string accountKey, string accountUrl)
    {
        StorageCredentials cred = new StorageCredentials(accountName, accountKey);
        CloudStorageAccount storageAccount = new CloudStorageAccount(cred, accountUrl, useHttps: true);
        CloudBlobClient client = storageAccount.CreateCloudBlobClient();

        return client;
    }

     /// <summary>
    /// Delete the containers in Azure Storage which are created by this sample.
    /// </summary>
    private static void DeleteContainers(Settings unzipperSettings, string fileStagingContainer)
    {
        CloudBlobClient client = GetCloudBlobClient(
            unzipperSettings.StorageAccountName,
            unzipperSettings.StorageAccountKey,
            unzipperSettings.StorageServiceUrl);

        //Delete the file staging container
        if (!string.IsNullOrEmpty(fileStagingContainer))
        {
            CloudBlobContainer container = client.GetContainerReference(fileStagingContainer);
            Console.WriteLine("Deleting container: {0}", fileStagingContainer);
            container.DeleteIfExists();
        }
    }

    /// <summary>
    /// Gets all blobs in specified container
    /// </summary>
    /// <param name="unzipperSettings">The account settings.</param>
    /// <returns>The list of blob items blob.</returns>
    private static IEnumerable<IListBlobItem> GetZipFiles(Settings unzipperSettings)
    {
        CloudBlobClient client = GetCloudBlobClient(
            unzipperSettings.StorageAccountName,
            unzipperSettings.StorageAccountKey,
            unzipperSettings.StorageServiceUrl);
        var container = client.GetContainerReference(unzipperSettings.Container);
        var list = container.ListBlobs(null,true,BlobListingDetails.None);

        return list;
    }
}
}

This code creates the pool and job for all the nodes with the startup task. The exe, ddls and the files to be processed are passed to the node.

My question is: how do I get the processed files out to the blob storage? How can I stream them to the blob, in case there is not enough disk space on the VM to store them and copy them?

1

1 Answers

1
votes

You could use this way to persist the task output:
https://docs.microsoft.com/en-us/azure/batch/batch-task-output-files

Azure Batch provides more than one way to persist task output. Using the Batch service API is a convenient approach that's best suited to these scenarios:

  • You want to write code to persist task output from within your client application, without modifying the application that your task is running.
  • You want to persist output from Batch tasks and job manager tasks in pools created with the virtual machine configuration.
  • You want to persist output to an Azure Storage container with an arbitrary name.
  • You want to persist output to an Azure Storage container named according to the Batch File Conventions standard.

Sample project:
https://github.com/Azure/azure-batch-samples/tree/master/CSharp/ArticleProjects/PersistOutputs

Sample code:

<!-- language: lang-cs -->
OutputFiles = new List<OutputFile>
{
    new OutputFile(
        filePattern: @"..\std*.txt",
        destination: new OutputFileDestination(new OutputFileBlobContainerDestination(
            containerUrl: containerUrl,
            path: taskId)),
        uploadOptions: new OutputFileUploadOptions(
            uploadCondition: OutputFileUploadCondition.TaskCompletion)),
    new OutputFile(
        filePattern: @"output.txt",
        destination: new OutputFileDestination(new OutputFileBlobContainerDestination(
            containerUrl: containerUrl,
            path: taskId + @"\output.txt")),
        uploadOptions: new OutputFileUploadOptions(
            uploadCondition: OutputFileUploadCondition.TaskCompletion)),
}