39
votes

I have recently made a port to C++11 using std::atomic of a triple buffer to be used as a concurrency sync mechanism. The idea behind this thread sync approach is that for a producer-consumer situation where you have a producer that is running faster that the consumer, triple buffering can give some benefits since the producer thread won't be "slowed" down by having to wait for the consumer. In my case, I have a physics thread that is updated at ~120fps, and a render thread that it running at ~60fps. Obviously, I want the render thread to always gets the most recent state possible, but I also know that I will be skipping a lot of frames from the physics thread, because of the difference in rates. On the other hand, I want my physics thread to maintain its constant update rate and not be limited by the slower render thread locking my data.

The original C code was made by remis-thoughts and the full explanation is in his blog. I encourage anyone interested in reading it for a further understanding of the original implementation.

My implementation can be found here.

The basic idea is to have an array with 3 positions (buffers) and an atomic flag that is compare-and-swapped to define which array elements correspond to what state, at any given time. This way, only one atomic variable is used to model all 3 indexes of the array and the logic behind the triple buffering. The buffer's 3 positions are named Dirty, Clean and Snap. The producer always writes to the Dirty index, and can flip the writer to swap the Dirty with the current Clean index. The consumer can request a new Snap, which swaps the current Snap index with the Clean index to get the most recent buffer. The consumer always reads the buffer in the Snap position.

The flag consists of an 8 bit unsigned int and the bits correspond to:

(unused) (new write) (2x dirty) (2x clean) (2x snap)

The newWrite extra bit flag is set by the writer and cleared by the reader. The reader can use this to check if there have been any writes since the last snap, and if not it won't take another snap. The flag and indexes can be obtained using simple bitwise operations.

Ok now for the code:

template <typename T>
class TripleBuffer
{

public:

  TripleBuffer<T>();
  TripleBuffer<T>(const T& init);

  // non-copyable behavior
  TripleBuffer<T>(const TripleBuffer<T>&) = delete;
  TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;

  T snap() const; // get the current snap to read
  void write(const T newT); // write a new value
  bool newSnap(); // swap to the latest value, if any
  void flipWriter(); // flip writer positions dirty / clean

  T readLast(); // wrapper to read the last available element (newSnap + snap)
  void update(T newT); // wrapper to update with a new element (write + flipWriter)

private:

  bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
  uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
  uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes

  // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
  // newWrite   = (flags & 0x40)
  // dirtyIndex = (flags & 0x30) >> 4
  // cleanIndex = (flags & 0xC) >> 2
  // snapIndex  = (flags & 0x3)
  mutable atomic_uint_fast8_t flags;

  T buffer[3];
};

implementation:

template <typename T>
TripleBuffer<T>::TripleBuffer(){

  T dummy = T();

  buffer[0] = dummy;
  buffer[1] = dummy;
  buffer[2] = dummy;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){

  buffer[0] = init;
  buffer[1] = init;
  buffer[2] = init;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
T TripleBuffer<T>::snap() const{

  return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}

template <typename T>
void TripleBuffer<T>::write(const T newT){

  buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}

template <typename T>
bool TripleBuffer<T>::newSnap(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  do {
    if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
      return false;
  } while(!flags.compare_exchange_weak(flagsNow,
                                       swapSnapWithClean(flagsNow),
                                       memory_order_release,
                                       memory_order_consume));
  return true;
}

template <typename T>
void TripleBuffer<T>::flipWriter(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  while(!flags.compare_exchange_weak(flagsNow,
                                     newWriteSwapCleanWithDirty(flagsNow),
                                     memory_order_release,
                                     memory_order_consume));
}

template <typename T>
T TripleBuffer<T>::readLast(){
    newSnap(); // get most recent value
    return snap(); // return it
}

template <typename T>
void TripleBuffer<T>::update(T newT){
    write(newT); // write new value
    flipWriter(); // change dirty/clean buffer positions for the next update
}

template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
    // check if the newWrite bit is 1
    return ((flags & 0x40) != 0);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
    // swap snap with clean
    return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
    // set newWrite bit to 1 and swap clean with dirty 
    return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}

As you can see, I have decided to use a Release-Consume pattern for memory ordering. The Release (memory_order_release) for the store assures no writes in the current thread can be reordered after the store. On the other side, the Consume assures no reads in the current thread dependent on the value currently loaded can be reordered before this load. This ensures that writes to dependent variables in other threads that release the same atomic variable are visible in the current thread.

If my understanding is correct, since I only need the flags to be atomically set, operations on other variables that don't affect directly the flags can be reordered freely by the compiler, allowing for more optimizations. From reading some documents on the new memory model, I am also aware that these relaxed atomics will only have noticeable effect on platforms such as ARM and POWER (they were introduced mainly because of them). Since I am targeting ARM, I believe that I could benefit from these operations and be able to squeeze a little bit more performance out.

Now for the question:

Am I using correctly the Release-Consume relaxed ordering for this specific problem?

Thanks,

André

PS: Sorry for the long post, but I believed that some decent context was needed for a better view of the problem.

EDIT : Implemented @Yakk's suggestions:

  • Fixed flags read on newSnap() and flipWriter() which were using direct assignment, hence using default load(std::memory_order_seq_cst).
  • Moved bit fiddling operations to dedicated functions for clarity.
  • Added bool return type to newSnap(), now returns false when there's nothing new and true otherwise.
  • Defined class as non-copyable using = delete idiom since both copy and assignment constructors were unsafe if the TripleBuffer was being used.

EDIT 2: Fixed description, which was incorrect (Thanks @Useless). It is the consumer that requests a new Snap and reads from the Snap index (not the "writer"). Sorry for the distraction and thanks to Useless for pointing it out.

EDIT 3: Optimized the newSnap() and flipriter() functions according to @Display Name's suggestions, effectively removing 2 redundant load()'s per loop cycle.

2
Just a quick code review, no answer here: template <typename T> TripleBuffer<T>::TripleBuffer(const TripleBuffer& t){ doesn't look safe to use in most contexts if the TripleBuffer is in use: I'd be tempted to omit it. Same with operator=. newSnap should return false for "nothing to read" possibly. You should move all that bit fiddling to a single type that takes a uint8_t& and outputs/inputs sensible values from it. You read flags in newSnap without any explicit memory ordering guarantees.Yakk - Adam Nevraumont
Wow, thanks! You are right about the copy and assignment operators. They are indeed unsafe if the TripleBuffer is in use, since type T assignments are not guaranteed to be atomic and the TripleBuffer being copied from can change state between calls. You think it´s better to remove them and let the compiler generate them? Or do you have an idea of how to make it work safely? I am not seeing many real uses for them when the TripleBuffer being copied from is being used. I added them for completeness but forgot to account for all possible situations.André Neves
About the the newSnap function, it can indeed return true/false and will be a nice addition. It can signal that we are attempting to read too fast ;) Really great observation about the flags read in newSnap! As it is, it's a default load(memory_order_seq_cst). It does no harm, but defeats my original purpose to use release / consume orderings :) Thanks a lot!André Neves
Just noticed the flags read has the same "problem" in flipWriter. I totally missed it! About the bit fiddling parts, I could wrap them in dedicated functions for clarity, if that's what you mean. I will edit the code and post.André Neves
I think your description is slightly off: isn't the consumer supposed to swap snap with clean?Useless

2 Answers

3
votes

Why are you loading the old flags value twice in your CAS loops? The first time is by flags.load(), and the second by the compare_exchange_weak(), which the standard specifies on CAS failure will load the previous value into the first argument, which in this case is flagsNow.

According to http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange, "Otherwise, loads the actual value stored in *this into expected (performs load operation)." So what your loop is doing is that on failure, compare_exchange_weak() reloads flagsNow, then the loop repeats, and the first statement loads it once again, immediately after the load by compare_exchange_weak(). It seems to me your loop ought to instead have the load pulled outside the loop. For example, newSnap() would be:

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
    if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));

and flipWriter():

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));
1
votes

Yes, it is a difference between memory_order_acquire and memory_order_consume, but you will not notice it when you use it 180 or so per second. You can run my test with m2 = memory_order_consume if you want to know the answer in numbers. Just change producer_or_consumer_Thread to something like that:

TripleBuffer <int> tb;

void producer_or_consumer_Thread(void *arg)
{
    struct Arg * a = (struct Arg *) arg;
    bool succeeded = false;
    int i = 0, k, kold = -1, kcur;

    while (a->run)
    {
        while (a->wait) a->is_waiting = true; // busy wait
        if (a->producer)
        {
            i++;
            tb.update(i);
            a->counter[0]++;
        }
        else
        {
            kcur = tb.snap();
            if (kold != -1 && kcur != kold) a->counter[1]++;
            succeeded = tb0.newSnap();
            if (succeeded)
            {
                k = tb.readLast();
                if (kold == -1)
                    kold = k;
                else if (kold = k + 1)
                    kold = k;
                else
                    succeeded = false;
            }
            if (succeeded) a->counter[0]++;   
        }
    }
    a->is_waiting =  true;
}

TEST Result:

_#_  __Produced __Consumed _____Total
  1    39258150   19509292   58767442
  2    24598892   14730385   39329277
  3    10615129   10016276   20631405
  4    10617349   10026637   20643986
  5    10600334    9976625   20576959
  6    10624009   10069984   20693993
  7    10609040   10016174   20625214
  8    25864915   15136263   41001178
  9    39847163   19809974   59657137
 10    29981232   16139823   46121055
 11    10555174    9870567   20425741
 12    25975381   15171559   41146940
 13    24311523   14490089   38801612
 14    10512252    9686540   20198792
 15    10520211    9693305   20213516
 16    10523458    9720930   20244388
 17    10576840    9917756   20494596
 18    11048180    9528808   20576988
 19    11500654    9530853   21031507
 20    11264789    9746040   21010829