1
votes

I have multiple producers and multiple consumers. My shared resource is the BlockingCollection. However, my code only works if I have one consumer. I know it is a race condition since the output is different each time I run the code.

I thought BlockingCollection would take care of all the syncing etc. but it does not.

How can I sync my shared resource among all the producers and consumers then?

Here is my code:

/// <summary>
/// PURE PRODUCER TYPE
/// </summary>
class Caller
{

    private BlockingCollection<Call> incommingCalls;

    public Caller(BlockingCollection<Call> calls)
    {
        incommingCalls = calls;
        //start the producer thread
        Thread thread = new Thread(new ThreadStart(placeCall));
        thread.Start();
    }

    public void placeCall()
    {
            incommingCalls.Add(myCall);
    }

}



/// <summary>
/// CONSUMER
/// </summary>
class Fresher : Employee
{

    private BlockingCollection<Call> calls;

    public Fresher(BlockingCollection<Call> incalls)
    {
        calls = incalls;
        Thread thread = new Thread(new ThreadStart(HandleCalls));
        thread.Start();

    }

    /// <summary>
    /// 
    /// </summary>
    public void HandleCalls()
    {
        while (!incommingCalls.IsCompleted)
        {
            Call item;
            if (incommingCalls.TryTake(out item, 100000))
            {
                //do something with the call

            } //else do nothing - just wait

        }


}






/// <summary>
/// 
/// </summary>
class CallCenter
{

    private BlockingCollection<Call> fresherCalls;


    private List<Caller> myCallers;
    private List<Employee> myFreshers;


    public CallCenter() 
    {
        //initial incomming calls to the fresher queue
        fresherCalls = new BlockingCollection<Call>();

        myFreshers = new List<Employee>();
        myCallers = new List<Caller>();

        generate_freshers();

        //generate to start the producer
        generate_callers();

    }

    /// <summary>
    /// 
    /// </summary>
    private void generate_freshers() 
    {
        for (int i = 0; i < 1; i++ )
        {
            myFreshers.Add(new Fresher(fresherCalls, tlCalls, locker2));
        }
    }

    /// <summary>
    /// 
    /// </summary>
    private void generate_callers() 
    {
        for (int i = 0; i < 20; i++ )
        {
            myCallers.Add(new Caller(fresherCalls, locker));
        }

    }
}
1
What is the behavior that you're seeing which you expect is incorrect?Reed Copsey
I expect that the first Employee will take the first call, the second will take the second call etc... but they are all rotating around and the result is not predictable...user1261710
You can't guarantee calls are processed in the same order when you have multiple producers/consumers. It's going to depend on the operating system's task scheduler. What makes you say that things aren't working? Does it miss calls or process them incorrectly when you have multiple consumers?Jim Mischel
There is not guarantee of ordering with BlockingCollection<T> like that... In general, specific ordering tends to dramatically reduce or eliminate the gains you get by concurrent programming. Problems that scale well in concurrent space tend to be ones where order of processing doesn't matter.Reed Copsey
@user1261710 If you need the results to be both produced and consumed in a particular order then you cannot paralelize either.Servy

1 Answers

3
votes

I know it is a race condition since the output is different each time I run the code.

This is common with multithreading, and not necessarily due to a race condition (at least not a bad one). Order processing in multithreaded scenarios tends to not be deterministic - which would likely change the output.

That being said, with BlockingCollection<T>, it's typically easier to write your consumers as:

public void HandleCalls()
{
    foreach(var item in incommingCalls.GetConsumingEnumerable())
    {
        //do something with the call
    }
}

This will handle all of the synchronization and checking for you, for any number of consumers on the BlockingCollection<T>.


Edit: If you need to control the scheduling, and implement some form of Round-Robin Scheduling, you may want to take a look at the Parallel Extension Extras within the samples for the TPL. They provide a RoundRobinTaskScheduler which can be used to schedule Task<T> instances which work in a predictable manner.