1
votes

I need to port a whole partition of records in a table in Azure Table Storage from Partition1 to Partition2. Thousands, if not millions.

I know there is no way to port an entity from one partition to another in Azure Table Storage, you need to delete the old one and insert a new one, with updated PartitionKey, so my task is about doing the same for many records.

Is there something standard?

I came up with the following solution (simplified):

public async Task Migrate(string oldPartition, string newPartition)
{
    TableContinuationToken token = null;

    List<Task> migrationTasks = new List<Task>();

    do
    {
         TableQuerySegment<MyTableEntity> entries = await GetEntriesSegment(
             oldPartition, 
             token);

         token = entries.ContinuationToken;

         migrationTasks.Add(MigrateEntries(entries, newPartition));
    } while (token != null)

    await Task.WhenAll(migrationTasks);
}

private async Task MigrateEntries(IEnumerable<MyTableEntity> entries, string newPartition)
{
    await Task.WhenAll(
        InsertInBatches(entries.Select(
            e => GetEntryWithUpdatedPartitionKey(e, newPartition)),
        DeleteInBatches(entries));
}
  • GetEntriesSegment wraps the logic to access the table and get the segment
  • GetEntryWithUpdatedPartitionKey simply copies all fields from one object of MyTableEntity type into a newly created one, but using different PartitionKey
  • InsertInBatches takes care of splitting the collection of entries into batches of 100 (Azure Table Storage limitation) and performing batch inserts for all in parallel (via one more await Task.WhenAll(insertTasks) inside)
  • DeleteInBatches takes care of splitting the collection of entries into batches of 100 (Azure Table Storage limitation) and performing batch deletes for all in parallel (via one more await Task.WhenAll(deleteTasks) inside)

My main goal is to parallel everything. I.e., new entries should be read while already read ones are being deleted and new ones are being inserted.

Does this solution look reasonable? Do you know any proven by time (well tested, used in production projects) alternative?

2

2 Answers

1
votes

Can you try https://msdn.microsoft.com/library/hh228603.aspx

The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications.

0
votes

You can accomplish this using the TPL Dataflow library, which also elegantly handles the requirement of batching the operations into 100 operations per table batch. I would also like to emphasize that this code does not block waiting for the entire partition to be read, but rather streams one batch at a time. This is what the Dataflow library gives you.

public async Task MigratePartitionAsync<T>(CloudTable table, string oldPartitionKey, string newPartitionKey)
    where T : TableEntity, new()
{
    // batch up to 100 records per table operation
    var buffer = new BatchBlock<T>(100);

    // migrate the records
    var migrator = new ActionBlock<T[]>(async x => await MigrateRecordsAsync(table, x, newPartitionKey));

    // link the blocks and set them to propogate their completion
    buffer.LinkTo(migrator, new DataflowLinkOptions { PropagateCompletion = true });

    // read the old partition
    await ReadPartitionAsync(table, buffer, oldPartitionKey);

    await migrator.Completion;
}

public async Task ReadPartitionAsync<T>(CloudTable table, ITargetBlock<T> buffer, string partitionKey)
    where T : TableEntity, new()
{
    var results = table.CreateQuery<T>().Where(x => x.PartitionKey == partitionKey); 

    foreach (var record in results)
    {
        await buffer.SendAsync(record);
    }

    buffer.Complete();
}

public async Task MigrateRecordsAsync<T>(CloudTable table, IEnumerable<T> records, string newPartitionKey)
    where T : TableEntity, new()
{
    var deleteBatch = new TableBatchOperation();

    foreach (var element in records)
    {
        deleteBatch.Delete(element);
    }

    await table.ExecuteBatchAsync(deleteBatch);

    var insertBatch = new TableBatchOperation();

    foreach (var element in records)
    {
        element.PartitionKey = newPartitionKey;
        insertBatch.Insert(element);
    }

    await table.ExecuteBatchAsync(insertBatch);
}

You would use it like this:

CloudTable table = GetCloudTable();

await MigratePartitionAsync<MyTableEntityClass>(table, "OldPartitionKey", "NewPartitionKey");