1
votes

I am trying to implement an array-based ring buffer that is thread-safe for multiple producers and a single consumer. The main idea is to have atomic head and tail indices. When pushing an element to the queue, the head is increased atomically to reserve a slot in the buffer:

#include <atomic>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <vector>

template <class T> class MPSC {
private:
  int MAX_SIZE;

  std::atomic<int> head{0}; ///< index of first free slot
  std::atomic<int> tail{0}; ///< index of first occupied slot

  std::unique_ptr<T[]> data;
  std::unique_ptr<std::atomic<bool>[]> valid; ///< indicates whether data at an
                                              ///< index has been fully written

  /// Compute next index modulo size.
  inline int advance(int x) { return (x + 1) % MAX_SIZE; }

public:
  explicit MPSC(int size) {
    if (size <= 0)
      throw std::invalid_argument("size must be greater than 0");

    MAX_SIZE = size + 1;
    data = std::make_unique<T[]>(MAX_SIZE);
    valid = std::make_unique<std::atomic<bool>[]>(MAX_SIZE);
  }

  /// Add an element to the queue.
  ///
  /// If the queue is full, this method blocks until a slot is available for
  /// writing. This method is not starvation-free, i.e. it is possible that one
  /// thread always fills up the queue and prevents others from pushing.
  void push(const T &msg) {
    int idx;
    int next_idx;
    int k = 100;
    do {
      idx = head;
      next_idx = advance(idx);

      while (next_idx == tail) {     // queue is full
        k = k >= 100000 ? k : k * 2; // exponential backoff
        std::this_thread::sleep_for(std::chrono::nanoseconds(k));
      } // spin

    } while (!head.compare_exchange_weak(idx, next_idx));

    if (valid[idx])
      // this throws, suggesting that two threads are writing to the same index. I have no idea how this is possible.
      throw std::runtime_error("message slot already written");

    data[idx] = msg;
    valid[idx] = true; // this was set to false by the reader,
                       // set it to true to indicate completed data write
  }

  /// Read an element from the queue.
  ///
  /// If the queue is empty, this method blocks until a message is available.
  /// This method is only safe to be called from one single reader thread.
  T pop() {
    int k = 100;
    while (is_empty() || !valid[tail]) {
      k = k >= 100000 ? k : k * 2;
      std::this_thread::sleep_for(std::chrono::nanoseconds(k));
    } // spin
    T res = data[tail];
    valid[tail] = false;
    tail = advance(tail);
    return res;
  }

  bool is_full() { return (head + 1) % MAX_SIZE == tail; }

  bool is_empty() { return head == tail; }
};

When there is a lot of congestion, some messages get overwritten by other threads. Hence there must be something fundamentally wrong with what I'm doing here.

What seems to be happening is that two threads are acquiring the same index to write their data to. Why could that be?

Even if a producer were to pause just before writing it's data, the tail could not increase past this threads idx and hence no other thread should be able to overtake and claim that same idx.

EDIT

At the risk of posting too much code, here is a simple program that reproduces the problem. It sends some incrementing numbers from many threads and checks whether all numbers are received by the consumer:

#include "mpsc.hpp" // or whatever; the above queue
#include <thread>
#include <iostream>

int main() {
  static constexpr int N_THREADS = 10; ///< number of threads
  static constexpr int N_MSG = 1E+5;   ///< number of messages per thread

  struct msg {
    int t_id;
    int i;
  };

  MPSC<msg> q(N_THREADS / 2);

  std::thread threads[N_THREADS];

  // consumer
  threads[0] = std::thread([&q] {
    int expected[N_THREADS] {};

    for (int i = 0; i < N_MSG * (N_THREADS - 1); ++i) {
      msg m = q.pop();
      std::cout << "Got message from T-" << m.t_id << ": " << m.i << std::endl;
      if (expected[m.t_id] != m.i) {
        std::cout << "T-" << m.t_id << " unexpected msg " << m.i << "; expected " << expected[m.t_id] << std::endl;
        return -1;
      }
      expected[m.t_id] = m.i + 1;
    }
  });

  // producers
  for (int id = 1; id < N_THREADS; ++id) {
    threads[id] = std::thread([id, &q] {
      for (int i = 0; i < N_MSG; ++i) {
        q.push(msg{id, i});
      }
    });
  }

  for (auto &t : threads)
    t.join();
}
1
I'm afraid your question is off-topic because it lacks a minimal reproducible example. In particular, the calling code is missing and the whole template stuff is superfluous.Ulrich Eckhardt
@UlrichEckhardt Thanks, I have edited the question to include an example that reproduces the problem.henrikl
when you check while (next_idx == tail) this can be yet false. but after this check and before head.compare_exchange_weak(idx, next_idx) - can be next_idx == tail already. while head the same (the head make full loop (advance MAX_SIZE times, while tail once). this will be most visible on 2 element buffer MAX_SIZE == 2 . say for (h,t) pair - intially it (0,0). when thread #1 check next_idx == tail all ok here. and then another threads do - (0,0)-push->(1,0)-pop->(1,1)-push->(0,1) so now (0,1) state at time CAS for thread #1. head == 0 again, but t == 1. queue is fullRbMm

1 Answers

1
votes

I am trying to implement an array-based ring buffer that is thread-safe for multiple producers and a single consumer.

I assume you are doing this as a learning exercise. Implementing a lock-free queue yourself is most probably the wrong thing to do if you want to solve a real problem.

What seems to be happening is that two threads are acquiring the same index to write their data to. Why could that be?

The combination of that producer spinlock with the outer CAS loop does not work in the intended way:

do {
  idx = head;
  next_idx = advance(idx);

  while (next_idx == tail) {     // queue is full
    k = k >= 100000 ? k : k * 2; // exponential backoff
    std::this_thread::sleep_for(std::chrono::nanoseconds(k));
  } // spin

// 
// ...
//
// All other threads (producers and consumers) can progress.
//
// ...
//

} while (!head.compare_exchange_weak(idx, next_idx));

The queue may be full when the CAS happens because those checks are performed independently. In addition, the CAS may succeed because the other threads may have advanced head to exactly match idx.