I am working on a producer/consumer problem with a modification. Yet, there is a race condition and I am debating the best way to go about it. There might be cleaner ways and I wonder if anyone has done something similar and if possible share a better solution.
it starts as normal producer/consumer using a queue. One producer thread reads items from the disk and enqueues on a shared queue. Then multiple consumer threads tries to dequeues the items for processing. However each item has a tag (like a thread Id) that MUST match the consumer tag. A consumer thread looks at the front of the queue and checks the tag of the item. If it does not match the tag of the consumer thread, the consumer must go to sleep and wait until the front of the queue has an item that matches its tag. A bit confusing, but the pseudo-code below hopefully clarifies the algorithm:
struct item
{
// This is unique tag that only a specific consumer can consumer
int consumerTag;
// data for the consumer to consume
void *pData;
}
///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
lock(queue)
if (queueNotFull)
{
enqueue(item);
}
else
{
// check front of the queue, notify worker.
Sleep(); // Releases Queue Mutex upon entering
// requires the mutex after it has been awaken
}
unlock(queue);
wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
lock (queue);
if (queueNotEmpty)
{
item = queueFront()
if (myTag == item->id)
{
// this item is for me, let's dequeue and process
item = dequeue();
process();
}
else
{
// This is not for me let's go to sleep
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
}
else
{
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
unlock (queue);
wakeUpProducer();
}
However there are problems with the algorithm above. Let's consider the following events and assume this:
item.tag=1 means this item must be consumed only by the consumer with the same tag. I will represent this as consumer.tag = 1
- Producer reads
item.tag=1and enqueues - Producer wakes up all consumer threads (
consumer.tag=1,consumer.tag=2, etc... are all awake now and checking the front of the queue) - Producer reads
item.tag=2and enqueues - Producer wakes up all consumer threads
- queue now has
[item.tag=1, item.tag=2] consumer.tag=2 wakes up and peek at the front of the queue, butitem.tag=1 which does not matchconsumer.tag=1; therefore, it goes to sleep.consumer.tag=2is sleeping now.consumer.tag=1wakes up and peek at the front of the queue, anditem.tag=1which matchesconsumer.tag=1. Dequeues and notify the producer it can consume more.- The producer finishes reading the data and exits. Now the queue has
item.tag=2andconsumer.tag=2is sleeping and never consumes that data. Notice that can there be many consumers. So at the end many consumers can end up sleeping and the queue
I thought in just add at the end of the producer thread a loop that keeps waking up all the sleeping threads until the queue is empty.
// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
WakeUpAllConsumer();
Sleep();
}
But I believe there must be a more elegant way to handle this problem. Any ideas let me know
thx!