3
votes

The problem is as follows.

Given a POD object that has two parts: index and data. I want to perform an atomic conditional exchange operation over it with a condition that checks equality for the index only.

Something like this:

struct Data { size_t m_index; char m_data; };
std::atomic<Data> dd; // some initialization
Data zz; // some initialization

// so I want something like this
dd.exchange_if_equals<&Data::m_index>(10,zz);

So this is a sort of "partial-compare-and-full-swap" operation. Maybe this will require an appropriate alignment for Data::m_index.

Obviously std::atomic doesn't support this, but can one implement this by his own, or maybe there is another library that supports this?

3
I would probably do a read, then your custom condition, then a compare-and-swap, where the comparison is that the current value is totally equal to the read value. If the last step fails, loop. - Mooing Duck
Hardware CAS (like x86-64 or ARMv8.1) doesn't support this in asm, you'd have to roll your own. An LL/SC machine like ARM or PowerPC could do this easily in assembly, but there are no libraries that expose that portably. (Most importantly because it couldn't compile for machines like x86.) - Peter Cordes

3 Answers

2
votes

I think you have to do a load, then your custom condition, then a compare-and-swap, where the comparison is that the current value is totally equal to the read value. If the last step fails, loop.

template<class T, class F>
bool swap_if(std::atomic<T>& atomic, T desired, F&& condition) {
    for (;;) {
        T data = atomic.load();
        if (!condition(data)) break;
        if (atomic.compare_exchange_weak(data, desired)) return true;
    }
    return false;
}

http://coliru.stacked-crooked.com/a/a394e336628246a9

Due to the complexity, you should probably just use a mutex. Separately, std::atomic<Data> may already be using a mutex under the covers, since Data is so big.

2
votes

Just like C++, hardware CAS (like x86-64 or ARMv8.1) doesn't support this in asm, you'd have to roll your own.

In C++, it's fairly easy: load the original value and replace part of it. This can of course introduce spurious failure if another core changed the other part that you didn't want to compare against.

If possible use unsigned m_index instead of size_t, so the whole struct can fit in 8 bytes on typical 64-bit machines, instead of 16. 16-byte atomics are slower (especially the pure-load part) on x86-64, or not even lock-free at all on some implementations and/or some ISAs. See How can I implement ABA counter with c++11 CAS? re: x86-64 lock cmpgxchg16b with current GCC/clang.

If each atomic<> access separately takes a lock, it would be vastly better to just take a mutex around the whole custom compare and set.


I wrote a simple implementation of one CAS attempt (like cas_weak) as an example. You could maybe use it in a template specialization or derived class of std::atomic<Data> to provide a new member function for atomic<Data> objects.

#include <atomic>
struct Data {
    // without alignment, clang's atomic<Data> doesn't inline load + CAS?!?  even though return d.is_always_lock_free; is true
    alignas(long long)  char m_data;
    unsigned m_index;               // this last so compilers can replace it slightly more efficiently
};

inline bool partial_cas_weak(std::atomic<Data> &d, unsigned expected_idx, Data zz, std::memory_order order = std::memory_order_seq_cst)
{
    Data expected = d.load(std::memory_order_relaxed);
    expected.m_index = expected_idx;            // new index, same everything else
    return d.compare_exchange_weak(expected, zz, order);
    // updated value of "expected" discarded on CAS failure
    // If you make this a retry loop, use it instead of repeated d.load
}

This compiles nicely in practice with clang for x86-64 (Godbolt), inlining into a caller that passes a compile-time-constant order (else clang goes berserk branching on that order arg for a stand-alone non-inline version of the function)

# clang10.0 -O3 for x86-64
test_pcw(std::atomic<Data>&, unsigned int, Data):
    mov     rax, qword ptr [rdi]                  # load the whole thing
    shl     rsi, 32
    mov     eax, eax                              # zero-extend the low 32 bits, clearing m_index
    or      rax, rsi                              # OR in a new high half = expected_idx
    lock            cmpxchg qword ptr [rdi], rdx      # the actual 8-byte CAS
    sete    al                                        # boolean FLAG result into register
    ret

Unfortunately compilers are too dumb to only load the part of the atomic struct they actually need, instead loading the whole thing and then zeroing out the part they didn't want. (See How can I implement ABA counter with c++11 CAS? for union hacks to work around that on some compilers.)

Unfortunately GCC makes messy asm that stores/reloads temporaries to the stack, leading to a store-forwarding stall. GCC also zeros the padding after char m_data (whether it's the first or last member), possibly leading to CAS always failing if the actual object in memory had non-zero padding. That might be impossible if pure stores and initialization always make it zero.


An LL/SC machine like ARM or PowerPC could do this easily in assembly (the compare/branch is done manually, between the load-linked and the store-conditional), but there are no libraries that expose that portably. (Most importantly because it couldn't compile for machines like x86, and because what you can do in an LL/SC transaction is severely limited and debug-mode spill/reload of local vars could result in code that always failed.)

0
votes

If using a std::mutex instead of atomic is an option you could put the mutex in your own atomic-like wrapper.

Here's a beginning of what it could look like:

#include <iostream>
#include <type_traits>
#include <mutex>

template<typename T>
class myatomic {
public:
    static_assert(
        // std::is_trivially_copyable_v<T> && // used in std::atomic, not needed here
        std::is_copy_constructible_v<T> &&
        std::is_move_constructible_v<T> &&
        std::is_copy_assignable_v<T> &&
        std::is_move_assignable_v<T>, "unsupported type");

    using value_type = T;

    myatomic() : data{} {}
    explicit myatomic(const T& v) : data{v} {}

    myatomic(const myatomic& rhs) : myatomic(rhs.load()) {}

    myatomic& operator=(const myatomic& rhs) {
        std::scoped_lock lock(mtx, rhs.mtx);
        data = rhs.data;
        return *this;
    }

    T load() const {
        const std::lock_guard<std::mutex> lock(mtx);
        return data;
    }

    operator T() const {
        return load();
    }

    void store(const T& v) {
        const std::lock_guard<std::mutex> lock(mtx);
        data = v;
    }

    myatomic& operator=(const T& v) {
        store(v);
        return *this;
    }

    // partial compare and full swap
    template<typename Mptr, typename V>
    bool exchange_if_equals(Mptr mvar, V mval, const T& oval) {
        const std::lock_guard<std::mutex> lock(mtx);
        if(data.*mvar == mval) {
            data = oval;
            return true;
        }
        return false;
    }

    template<typename Mptr>
    auto get(Mptr mvar) const {
        const std::lock_guard<std::mutex> lock(mtx);
        return data.*mvar;
    }

    template<typename Mptr, typename V>
    void set(Mptr mvar, const V& v) {
        const std::lock_guard<std::mutex> lock(mtx);
        data.*mvar = v;
    }

private:
    mutable std::mutex mtx;
    T data;
};

struct Data {
    size_t m_index;
    char m_data;
};

int main() {
    Data orig{10, 'a'};
    Data zz; // some initialization

    myatomic<Data> dd(orig);
    dd.exchange_if_equals(&Data::m_index, 10U, zz);
    std::cout << dd.get(&Data::m_index);
}