You have a race condition between incrementing the write index and assigning the entry pointer.
Consider the case where producer A increments the write index, but runs out of its timeslice. Meanwhile, producer B increments the write index again, populates the next entry -- remember, A didn't populate its entry yet --, and increments the semaphore. Now, if consumer C gets woken up before A, it has every reason to believe A has already populated its entry, and grabs it. Because it has not been populated yet, it is NULL.
In other words:
Producer A Producer B Consumer C
write_pos++
write_pos++
sets buffer[]
sem_post()
sem_wait()
read_pos++
uses buffer[]
sets buffer[]
sem_post()
sem_wait()
read_pos++
uses buffer[]
The more you have producers, the higher the probability that you see the above scenario.
The solution is simple: you add a write_pos2
counter, which serializes the writers, so that they post the semaphore in the correct sequence.
Consider the following example program:
#define _POSIX_C_SOURCE 200809L
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
typedef struct {
sem_t semaphore;
uint64_t size;
volatile uint64_t wrnext; /* Next free write slot */
volatile uint64_t wrindex; /* Write index, second half */
volatile uint64_t rdindex; /* Read index */
void *entry[];
} cbuffer;
static cbuffer *cbuffer_destroy(cbuffer *const cbuf)
{
if (cbuf) {
cbuf->size = 0;
cbuf->wrnext = 0;
cbuf->wrindex = 0;
cbuf->rdindex = 0;
sem_destroy(&cbuf->semaphore);
free(cbuf);
}
return NULL;
}
static cbuffer *cbuffer_create(const size_t size)
{
cbuffer *cbuf;
if (size < 2) {
errno = EINVAL;
return NULL;
}
cbuf = malloc(sizeof *cbuf + size * sizeof cbuf->entry[0]);
if (!cbuf) {
errno = ENOMEM;
return NULL;
}
memset(cbuf->entry, 0, size * sizeof cbuf->entry[0]);
sem_init(&cbuf->semaphore, 0, 0);
cbuf->size = size;
cbuf->wrnext = 0;
cbuf->wrindex = 0;
cbuf->rdindex = 0;
return cbuf;
}
static void cbuffer_add(cbuffer *const cbuf, void *const entry)
{
uint64_t wrnext;
/* Get next nose value. */
wrnext = __sync_fetch_and_add(&cbuf->wrnext, (uint64_t)1);
/* Spin while buffer full. */
while (!__sync_bool_compare_and_swap(&cbuf->entry[wrnext % cbuf->size], NULL, entry))
;
/* Spin until we can update the head to match next. */
while (!__sync_bool_compare_and_swap(&cbuf->wrindex, wrnext, wrnext + (uint64_t)1))
;
/* TODO: check for -1 and errno == EOVERFLOW */
sem_post(&cbuf->semaphore);
}
static void *cbuffer_get(cbuffer *const cbuf)
{
uint64_t rdindex;
/* Get the index of the oldest entry. */
rdindex = __sync_fetch_and_add(&cbuf->rdindex, (uint64_t)1);
sem_wait(&cbuf->semaphore);
/* Pop entry. */
return __sync_fetch_and_and(&cbuf->entry[rdindex % cbuf->size], NULL);
}
static volatile int done = 0;
static cbuffer *cb = NULL;
void *consumer_thread(void *payload)
{
const long id = (long)payload;
unsigned long count = 0UL;
void *entry;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
while (1) {
if (done)
return NULL;
entry = cbuffer_get(cb);
count++;
if (!entry) {
printf("Consumer %ld: NULL pointer at %lu encountered!\n", id, count);
fflush(stderr);
done = 1;
return NULL;
}
}
}
void *producer_thread(void *payload __attribute__((unused)))
{
unsigned long count = 0UL;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
while (1) {
if (done)
return NULL;
cbuffer_add(cb, (void *)(256UL + (count & 255UL)));
}
}
int main(int argc, char *argv[])
{
pthread_attr_t attrs;
pthread_t *producer_id;
pthread_t *consumer_id;
sigset_t blocked;
siginfo_t info;
struct timespec timeout;
int producers, consumers, size, i, result;
char dummy;
if (argc != 4 || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
fprintf(stderr, "\n");
fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
fprintf(stderr, " %s SIZE PRODUCERS CONSUMERS\n", argv[0]);
fprintf(stderr, "\n");
return 1;
}
if (sscanf(argv[1], " %d %c", &size, &dummy) != 1 || size < 2) {
fprintf(stderr, "%s: Invalid circular buffer size.\n", argv[1]);
return 1;
}
if (sscanf(argv[2], " %d %c", &producers, &dummy) != 1 || producers < 1) {
fprintf(stderr, "%s: Invalid number of producer threads.\n", argv[2]);
return 1;
}
if (sscanf(argv[3], " %d %c", &consumers, &dummy) != 1 || consumers < 1) {
fprintf(stderr, "%s: Invalid number of consumer threads.\n", argv[3]);
return 1;
}
cb = cbuffer_create(size);
producer_id = malloc((size_t)producers * sizeof *producer_id);
consumer_id = malloc((size_t)consumers * sizeof *consumer_id);
if (!cb || !producer_id || !consumer_id) {
fprintf(stderr, "%s.\n", strerror(ENOMEM));
return 1;
}
sigemptyset(&blocked);
sigaddset(&blocked, SIGINT);
sigaddset(&blocked, SIGTERM);
sigprocmask(SIG_BLOCK, &blocked, NULL);
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 32768);
/* Start consumer threads. */
for (i = 0; i < consumers; i++) {
result = pthread_create(&consumer_id[i], &attrs, consumer_thread, (void *)(1L + (long)i));
if (result) {
fprintf(stderr, "Cannot start consumer threads: %s.\n", strerror(result));
exit(1);
}
}
/* Start producer threads. */
for (i = 0; i < producers; i++) {
result = pthread_create(&producer_id[i], &attrs, producer_thread, (void *)(1L + (long)i));
if (result) {
fprintf(stderr, "Cannot start producer threads: %s.\n", strerror(result));
exit(1);
}
}
pthread_attr_destroy(&attrs);
printf("Press CTRL+C or send SIGTERM to process %ld to stop testing.\n", (long)getpid());
fflush(stdout);
while (1) {
if (done)
break;
timeout.tv_sec = (time_t)0;
timeout.tv_nsec = 10000000L; /* 0.010000000 seconds */
result = sigtimedwait(&blocked, &info, &timeout);
if (result != -1 || errno != EAGAIN) {
done = 1;
break;
}
}
printf("Exiting...\n");
fflush(stdout);
for (i = 0; i < producers; i++)
pthread_cancel(producer_id[i]);
for (i = 0; i < consumers; i++)
pthread_cancel(consumer_id[i]);
for (i = 0; i < producers; i++)
pthread_join(producer_id[i], NULL);
for (i = 0; i < consumers; i++)
pthread_join(consumer_id[i], NULL);
cb = cbuffer_destroy(cb);
free(producer_id);
free(consumer_id);
return 0;
}
While I could be wrong about this, I can run the above with any number of producers (with a single consumer only, obviously) without encountering NULL pointers. You can easily add some logic to verify the pointers.
I believe you're spinning quite a lot, even in the uncontested case.
I would personally consider using two linked lists instead: one for unused/free slots, and the other for the added entries. (If your pointed-to entries start with a next
pointer field, then you only need the used list. I prefer this, myself.)
Producers always grab first node from free list, and prepend to the used list. The consumer grabs the entire used list. All of these operations use a simple do { } while (!__sync_bool_compare_and_swap());
loop, or do { } while (!__atomic_compare_exchange());
for GCC 4.7 and later, with the loop executing only once. Something similar to the following -- untested -- code:
struct node {
struct node *next;
/* whatever data here */
};
void add_one(volatile struct node **const list, struct node *item)
{
do {
item->next = (*list) ? (*list)->next : NULL;
} while (!__sync_bool_compare_and_swap(list, item->next, item);
}
struct node *get_one(volatile struct node **const list)
{
struct node *first, *next;
do {
first = *list;
next = (first) ? first->next : NULL;
} while (!__sync_bool_compare_and_swap(list, first, next);
if (first)
first->next = NULL;
return first;
}
struct node *get_all(volatile struct node **const list)
{
struct node *all, *root;
do {
all = *list;
} while (!__sync_bool_compare_and_swap(list, all, NULL));
root = NULL;
while (all) {
struct node *const curr = all;
all = all->next;
curr->next = root;
root = curr;
}
return root;
}
Note that above get_all()
reverses the list, so that the oldest entry is first in the returned list. This makes it easy for the consumer to process all entries in the order they were added, with minimal overheads in the common case.
Questions?
posix_memalign
instead of plainmalloc
, have you done measurements to see that it's worthwhile? – Some programmer dude