I have a data flow as follows.
1. A task that reads a text file in chunks and adds them to BatchBlock<chunkSize>
2. An ActionBlock
that is linked to the above BatchBlock
that partitions the data into batches and adds them to a BufferBlock
3. A TransformationBlock
that is linked to BufferBlock
, which spawns an async
task for each batch
4. The process is finished when all the spanwed async
calls are finished.
The below code isn't working as expected. It finishes before all batches are processed. What am I missing?
private static void DataFlow(string filePath, int chunkSize, int batchSize)
{
int chunkCount = 0;
int batchCount = 0;
BatchBlock<string> chunkBlock = new BatchBlock<string>(chunkSize);
BufferBlock<IEnumerable<string>> batchBlock = new BufferBlock<IEnumerable<string>>();
Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
chunkBlock.Post(line);
}
Console.WriteLine("Finished producing");
chunkBlock.Complete();
});
var makeBatches = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk " + ++chunkCount);
// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
foreach (var batch in batches)
{
batchBlock.Post(batch);
}
batchBlock.Complete();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
chunkBlock.LinkTo(makeBatches, new DataflowLinkOptions { PropagateCompletion = true });
var executeBatches = new TransformBlock<IEnumerable<string>, IEnumerable<string>>(async b =>
{
Console.WriteLine("Got a batch " + ++batchCount);
await ExecuteBatch(b);
return b;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
batchBlock.LinkTo(executeBatches, new DataflowLinkOptions { PropagateCompletion = true });
var finishBatches = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing batch" + batchCount);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
executeBatches.LinkTo(finishBatches, new DataflowLinkOptions { PropagateCompletion = true });
Task.WaitAll(produceTask);
Console.WriteLine("Production complete");
makeBatches.Completion.Wait();
Console.WriteLine("Making batches complete");
executeBatches.Completion.Wait();
Console.WriteLine("Executing batches complete");
Task.WaitAll(finishBatches.Completion);
Console.WriteLine("Process complete with total chunks " + chunkCount + " and total batches " + batchCount);
Console.ReadLine();
}
// async task to simulate network I/O
private static async Task ExecuteBatch(IEnumerable<string> batch)
{
Console.WriteLine("Executing batch ");
await Task.Run(() => System.Threading.Thread.Sleep(2000));
}