2
votes

This data flow network has a single bifurcation and produces the correct text output with proper results. Why does it not complete?

            // Connect multiple blocks
            // source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
            //                                                        |-> multiply2 -> writeListOut
            var source = new BufferBlock<List<int>>();
            var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
            {
                return l.Select(_l => (double)_l).ToList();
            });
            source.LinkTo(convertToDouble);
            Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
            {
                return l.Select(_l => _l * 10.0).ToList();
            };
            var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            convertToDouble.LinkTo(multiply);
            var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
            {
                return l;
            });
            multiply.LinkTo(multiplyBuffer);
            var summation = new TransformBlock<List<double>, double>((List<double> l) =>
            {
                return l.Sum();
            });
            multiplyBuffer.LinkTo(summation);
            var writeOut = new ActionBlock<double>((double d) =>
            {
                Console.WriteLine("Writing out: " + d.ToString());
            });
            summation.LinkTo(writeOut);
            var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            multiplyBuffer.LinkTo(multiply2);
            var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
            {
                Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l => 
                    _l.ToString()).ToList()));
            });
            multiply2.LinkTo(writeListOut);

            source.Post(new List<int> { 1, 2, 3 });

            Task.Run(async () =>
            {
                await Task.Delay(3000);
                Console.WriteLine("posting 2nd...");
                source.Post(new List<int> { 4, 5, 6 });
                source.Complete();
            });

            // Never completes
            try
            {
                writeOut.Completion.Wait();
                writeListOut.Completion.Wait();
            }
            catch (AggregateException ex)
            {
                ex.Handle(e =>
                {
                    Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
                    return true;
                });
            }

I've noticed if the Completion.wait() calls are omitted, then the program returns. There are no errors to be observed upon execution of the network.

Sample output:

Writing list out: 100, 200, 300 Writing out: 60 posting 2nd... Writing out: 150 Writing list out: 400, 500, 600 (hangs)

Expected output:

Writing list out: 100, 200, 300 Writing out: 60 posting 2nd... Writing out: 150 Writing list out: 400, 500, 600 (returns)

1
Can you supply some sample input and output? Also, What's the signature of the method? We need to be able to reproduce the behavior.Mark McWhirter
I don't know if I understood the question, but the input is specified in the source.Post() calls.Pedro OS

1 Answers

2
votes

In the TPL, source completion is not passed along to other blocks by default.

You need to construct a System.Threading.Tasks.Dataflow.DataflowLinkOptions and set its PropagateCompletion property to true, and then pass that into the your invocations of LinkTo.

Alternately you can call the Complete method on all of your blocks sequentially.