3
votes

I'd like to implement a buffer with single producer and a single consumer, where only the consumer may be blocked. Important detail here is that the producer can drop the update if the queue is full.

I've considered converting a wait-free implementation, but at first glance there seems to be no easy way to notify the consumer new data has arrived without losing notifications. So I settled on the very simple approach below, using a counting semaphore (some error handling details omitted for clarity):

Object ar[SIZE];
int head = 0, tail = 0;
sem_t semItems; // initialized to 0

void enqueue(Object o) {
    int val;
    sem_getvalue(&semItems, &val);
    if (val < SIZE - 1) {
        ar[head] = o;
        head = (head + 1) % SIZE;
        sem_post(&semItems);
    }
    else {
       // dropped
    }
}
Object dequeue(void) {
    sem_wait(&semItems);
    Object o = ar[tail];
    tail = (tail + 1) % SIZE;
    return o;
}

Are there any safety issues with this code? I was surprised not to see an implementation like it anywhere in the popular literature. An additional question is whether sem_post() would ever block (calls futex_wake() under the hood in linux). Simpler solutions are also welcome of course.

edit: edited code to leave a space between reader and writer (see Mayurk's response).

1
Can i know why do you need to block consumer only? - Sumit Gemini
@SumitGemini when there's a lot of traffic, the consumer should run slower than the producer, since the producer talks to hardware. This is just one possible approach I'm looking at, it could be that the implicit CAS loops actually make it worse. - wds
@SumitGemini just came across a proposal for native C++ semaphores that explains it better: open-std.org/jtc1/sc22/wg21/docs/papers/2006/… - wds

1 Answers

2
votes

I can see one problem in this implementation. Consider the following sequence.

  1. Assume buffer is full, but consumer is not yet started. So (head=0, tail=0, sem_val=SIZE).
  2. dequeue() is called from consumer thread. sem_wait() succeeds. So just at that instance (head=0, tail=0, sem_val=SIZE-1). Consumer starts reading ar[0].
  3. Now there is a thread switch. enqueue() is called from producer thread. sem_getvalue() would return SIZE-1. So producer writes at ar[0].

Basically I think you need mutex protection for reading and writing operations. But adding mutex might block the threads. So I am not sure whether you get expected behavior from this logic.