2
votes

I am solving a specific kind of producer-consumer which goes as follows - There is a buffer of size n. Consumers take items from buffer, one item at a time (By this I mean, whenever a consumer thread has access to buffer, it won't take more than one item). Whenever the buffer is empty, a call to producer must be raised. The producer completely fills in this buffer and then blocks itself until a call is made again. I have modelled each producer and consumer as a thread and implemented it this way -

bool buffer[n];

//Producer
while(true){
    lock(bufferLock);
    wait(producerSemaphore,bufferLock);
    completelyFillbuffer();
    signalAll(consumerSemaphore);
    unlock(bufferLock);
}

//Consumer
while(true){
    lock(bufferLock);
    if(buffer.isEmpty()){
        signal(producerSemaphore);
        wait(consumerSemaphore,bufferLock);
    }
    takeSliceFrombuffer();
    unlock(bufferLock);
}

takeItemFrombuffer(){
    take any true item and make it false;
}

completelyFillbuffer(){
    make all items true;
}

The problem is that, I am using a single lock for complete buffer. So at any point, only a single consumer can take an item. But when the buffer is of large size, it makes sense to allow more consumers to take items simultaneously. How do I implement this?

3

3 Answers

0
votes

I think you are able to safely remove items from the buffer of safely mark them false. Just make this operation atomic. For instance:

// consumer
getTheItem(buffer);
if( item != false)
    checkAndChange(item);

checkAndChange(item):
    if( item != false)
        atomicChange(item)

And you can lock the buffer only for producer. Another way is to use lock-free structures.

0
votes

You can't; if you do that then how will you know that multiple consumers do not take the same element? You do need a single lock for the single producer buffer in order to safely remove one item at a time (serially). So, you can't parallelize the fetching of items from that queue/buffer, but you can parallelize the processing of the values.

// consumer
while (true) {
    item = fetchItemFromBuffer();
    process( item );
}

fetchItemFromBuffer() {
    lock(bufferLock);
    while (buffer.isEmpty()) {
        signal(producerSemaphore);
        wait(consumerSemaphore,bufferLock);
    }
    item = buffer.remove( 0 ); 
    unlock(bufferLock);
    return item;
}
0
votes

A relatively simple idea would be to split the buffer into chunks.

For example, let's say you have a buffer of size 1024. You could split it into 64 chunks of size 16 each, or in any other way that suits your needs.

You will then need a mutex for each chunk. Each consumer then decides which element it wants to remove and then proceeds to lock the appropriate chunk. However, it may need to re-select and lock other chunks, if the initially selected chunk only has false values.

Another approach is lock-free programming, but it depends on how far you want to go into this. A good introduction is here: http://preshing.com/20120612/an-introduction-to-lock-free-programming/