6
votes

I use a BlockingCollection to implement a producer-consumer pattern in C# 4.0.

The BlockingCollection holds items which take up quite a lot memory. I would like to let the producer takes one item out of the BlockingCollection at a time, and process it.

I was thinking that by using foreach on BlockingCollection.GetConsumingEnumerable(), each time, the BlockingCollection will remove the item from the underlying queue (that means all together with the reference) so at the end of the Process() method which processes the item, the item can be garbage collected.

But this is not true. It seems the foreach loop on BlockingCollection.GetConsumingEnumerable() does hold all the references of the items entered into the queue. All items are held (thus prevented from being garbage collected) until the stepping out of the foreach loop.

Instead of using the simple foreach loop on BlockingCollection.GetConsumingEnumerable(), I use a while loop testing BlockingCollection.IsComplete flag and inside the loop I use BlockingCollection.Take() to grab a consumable item. I would assume that BlockingCollection.Take() has a similar effect as List.Remove(), which will remove the reference of the item from the BlockingCollection. But again this is wrong. All items are only garbage collected outside the while loop.

So my question is, how can we easy implement the requirement such that BlockingCollection potentially holds memory-consuming items and each item can be garbage collected once it is consumed by the consumer? Thank you very much for any help.

EDIT: as requested, a simple demo code is added:

// Entity is what we are going to process.
// The finalizer will tell us when Entity is going to be garbage collected.
class Entity
{
    private static int counter_;
    private int id_;
    public int ID { get{ return id_; } }
    public Entity() { id_ = counter++; }
    ~Entity() { Console.WriteLine("Destroying entity {0}.", id_); }
}

...

private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>();
private List<Task> tasks_ = new List<Task>();

// This is the method to launch and wait for the tasks to finish the work.
void Run()
{
    tasks_.Add(Task.Factory.StartNew(ProduceEntity);
    Console.WriteLine("Start processing.");
    tasks_.Add(Task.Factory.StartNew(ConsumeEntity);
    Task.WaitAll(tasks_.ToArray());
}

// The producer creates Entity instances and add them to BlockingCollection.
void ProduceEntity()
{
    for(int i = 0; i < 10; i ++) // We are adding totally 10 entities.
    {
        var newEntity = new Entity();
        Console.WriteLine("Create entity {0}.", newEntity.ID);
        jobQueue_.Add(newEntity);
    }
    jobQueue_.CompleteAdding();
}

// The consumer takes entity, process it (and what I need: destroy it).
void ConsumeEntity()
{
    while(!jobQueue_.IsCompleted){
        Entity entity;
        if(jobQueue_.TryTake(entity))
        {
            Console.WriteLine("Process entity {0}.", entity.ID);
            entity = null;

            // I would assume after GC, the entity will be finalized and garbage collected, but NOT.
            GC.Collect();
            GC.WaitForPendingFinalizers();
            GC.Collect();
        }
    }
    Console.WriteLine("Finish processing.");
}

The output is that all the creation and process messages, followed by "Finish processing." and followed by all the destruction messages from the entities. And creation entities message showing Entity.ID from 0 to 9 and the destruction messages showing Entity.ID from 9 to 0.

EDIT:

Even when I set the BlockingCollection's bound capacity, all the items ever entering it are finalized only when the loop exits, which is weird.

3
Just because there is not ref being held does not mean the GC will step in right away and collect it... sample code that demonstrates your issue with the appropriate GC.Collect etc. methods would be helpfulSam Saffron

3 Answers

6
votes

ConcurrentQueue contains segments with an internal array of 32 items. The Entity items will not be garbage collected until the segment is garbage collected. This will occur after all 32 items is taken out of the queue. If you change your example to add 32 items you will see the “Destroying entity” messages before "Finish processing."

2
votes

Whether the BlockingCollection continues to hold references depends on the collection type which it is using.

The default collection type for BlockingCollection<T> is ConcurrentQueue<T>.

So the garbage collection behaviour will depend on the collection type. In the case of the ConcurrentQueue<T> this is a FIFO structure, so I would be extremely surprised if this did not release references from the data structure after they were removed from the queue (it's kind of the definition of a queue)!

How exactly are you determining the objects are not being garbage collected?