This data flow network has a single bifurcation and produces the correct text output with proper results. Why does it not complete?
// Connect multiple blocks
// source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
// |-> multiply2 -> writeListOut
var source = new BufferBlock<List<int>>();
var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
{
return l.Select(_l => (double)_l).ToList();
});
source.LinkTo(convertToDouble);
Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
{
return l.Select(_l => _l * 10.0).ToList();
};
var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
convertToDouble.LinkTo(multiply);
var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
{
return l;
});
multiply.LinkTo(multiplyBuffer);
var summation = new TransformBlock<List<double>, double>((List<double> l) =>
{
return l.Sum();
});
multiplyBuffer.LinkTo(summation);
var writeOut = new ActionBlock<double>((double d) =>
{
Console.WriteLine("Writing out: " + d.ToString());
});
summation.LinkTo(writeOut);
var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
multiplyBuffer.LinkTo(multiply2);
var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
{
Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l =>
_l.ToString()).ToList()));
});
multiply2.LinkTo(writeListOut);
source.Post(new List<int> { 1, 2, 3 });
Task.Run(async () =>
{
await Task.Delay(3000);
Console.WriteLine("posting 2nd...");
source.Post(new List<int> { 4, 5, 6 });
source.Complete();
});
// Never completes
try
{
writeOut.Completion.Wait();
writeListOut.Completion.Wait();
}
catch (AggregateException ex)
{
ex.Handle(e =>
{
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
return true;
});
}
I've noticed if the Completion.wait()
calls are omitted, then the program returns. There are no errors to be observed upon execution of the network.
Sample output:
Writing list out: 100, 200, 300 Writing out: 60 posting 2nd... Writing out: 150 Writing list out: 400, 500, 600 (hangs)
Expected output:
Writing list out: 100, 200, 300 Writing out: 60 posting 2nd... Writing out: 150 Writing list out: 400, 500, 600 (returns)
source.Post()
calls. – Pedro OS