0
votes

I was trying out a very simple Producer Consumer approach using BlockingCollection as per the article described here: MS Docs to understand multithreading.

My producer is a single task which reads from an XML file (has around 4000 nodes) and pushes XElement nodes to a blocking collection.

My consumer will have multiple threads reading from the blocking collection and uploading files to a site based on the XElement.

The problem here is that the program unexpectedly closes every time I try to run it. It hits the producer Task.Run but stops after that. I'm not able to understand the reason why. Am I doing something wrong? It doesn't even hit the catch block.

Code is below:

            BlockingCollection<XElement> collection = new BlockingCollection<XElement>(100);                
            string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
            //create the producer
            Task.Run(() =>
            {                    
                //Process only the files that have not been uploaded                                   
                XDocument xmlFile = XDocument.Load(metadataFilePath);
                var query = from c in xmlFile.Elements("Items").Elements("Item")
                            where c.Attribute("IsUploaded").Value == "No"
                            select c;
                foreach (var item in query)
                {
                    collection.Add(item);
                }
                collection.CompleteAdding();
            });

            //process consumer
            Parallel.ForEach(collection, (new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 2 }), (metadata) => {
                ProcessItems();
            });
2
cna you put a breakpoint inside of the task.run?2174714
I did. It executes some statements then just closes suddenly.Akhoy
Could it be because you are adding to a collection in a separate thread, and then potentially accessing it in the Parallel.ForEach()? Maybe you need to put a lock around the collection.Add() statementmikeyq6
Not sure. The docs mention that this is already handled by the blocking collection. I've tried a very similar variant of the example they mention as will but with the same result.Akhoy
Ah yes, I didn't notice you were using BlockingCollectionmikeyq6

2 Answers

2
votes

With the assumption that you are trying to run a console application, i can think of following issues :

  1. Tasks in C# are by default background threads i.e. they can't keep the application alive. If the main thread exits which is a foreground thread, your background threads will stop executing as well.
  2. With #1 in mind , it is possible that your parallel block is getting executed before there is any data generated by your producer thread and hence the program exits leading to termination of the background producer thread as well. Try to start consumer task where you read from collection using TryTake() inside a loop and add a call to Console.ReadLine() in your program to ensure console can't exit without user hitting enter. If you want to consume parallely, see example 2 here.

You can see some more examples here. Try to notice the following things in example code :

  1. Use of using block using (BlockingCollection bc = new BlockingCollection())
  2. Calling method CompleteAdding() in the first thread which indicates that the collection won't accept any more item additions by the producer. This is called by the producer thread once all the items have been added.After a collection has been marked as complete for adding, adding to the collection is not permitted and attempts to remove from the collection will not wait when the collection is empty.

  3. Use of TryTake(out result) by the consumer thread in second example. The consumer thread starts and tries to take out value.Even if the producer thread has not added any item , it will keep waiting since collection has not been marked for IsAddingCompleted by calling CompleteAdding() by producer thread. Consumer thread will get a false return value from TryTake when collection has been marked for IsAddingCompleted and collection is empty i.e. IsCompleted property of collection becomes true allowing consumer thread to finish.

4.Call to Console.ReadLine() so that both the Tasks which are background thread don't terminate without completing.

Hope this helps.

2
votes

The answer by Nish26 is correct for the issue in the question.

I would propose solving your producer/consumer problem with Microsoft TPL Dataflow instead:

using System.Threading.Tasks.Dataflow;

var parallelBoundedOptions = new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 100,
    MaxDegreeOfParallelism = 2,
};
var uploadItemBlock = new ActionBlock<XElement>(
    item => ProcessItem(item),
    parallelBoundedOptions
);
string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
            where c.Attribute("IsUploaded").Value == "No"
            select c;
foreach (var item in query)
{
    uploadItemBlock.SendAsync(item).Wait();
}
uploadItemBlock.Complete();
uploadItemBlock.Completion.Wait();

Dataflow makes it easier to focus on producing and consuming the items instead of how to pass them from the producer to the consumer.

The actual issue in the question is that Parallel.Foreach is using BlockingCollection<T>.IEnumerable<T>.GetEnumerator instead of BlockingCollection<T>.GetConsumingEnumerable as demonstrated here:

static void Main()
{
    var collection = new BlockingCollection<int>(100);
    Task.Run(()=>
    {
        foreach (var element in Enumerable.Range(0, 100_000))
        {
            collection.Add(element);
        }
        collection.CompleteAdding();
    });

    Parallel.ForEach(
        collection, 
        new ParallelOptions { MaxDegreeOfParallelism = 2},
        i => Console.WriteLine(i));

    Console.WriteLine("Done");
}

Prints "Done" immediately

static void Main()
{
    var collection = new BlockingCollection<int>(100);
    Task.Run(()=>
    {
        foreach (var element in Enumerable.Range(0, 100_000))
        {
            collection.Add(element);
        }
        collection.CompleteAdding();
    });

    Parallel.ForEach(
        collection.GetConsumingEnumerable(), 
        new ParallelOptions { MaxDegreeOfParallelism = 2},
        i => Console.WriteLine(i));

    Console.WriteLine("Done");
}

Prints all the numbers