15
\$\begingroup\$

Background

I wanted to write an event loop that supports scheduling, but found that no implementation of priority queue that I know of supports waiting on a condition variable (the idea is that the looping thread will wait on the condition variable and if the wait timed out, it means that it can pop the top of the queue and execute that, otherwise it was interrupted and needs to reconsider sleeping time).

I thought about using plain std::mutexes. Somebody already found out how to do that. But I thought perhaps I could do better. So I ventured forth into a world of atomics ... and there was no end to surprises.

Priority lock

The basic idea was stolen from a bakery lock (I believe it is also called a ticket lock). Normal priority threads take a "ticket" on attempt to enter into critical section. When the critical section has their ticket number, they enter the critical section. The leaving thread will increment the ticket number that should enter the critical section. The only change is that there is an additional atomic flag that is used to control mutual exclusion for the priority thread in case it needs to skip the line.

Guarantees

  • Mutual exclusion

Mutual exclusion guarantee is provided by the atomic flag is_lock_held. The entering thread can enter only if it is false, and it acquires the lock via strong compare and exchange (CAS). When the thread that owns the lock exits the critical section, it sets the flag to false.

  • Strong order among normal priority threads

This is guaranteed by ticketing system. The thread that enters the wait state earlier in the view of next_label will enter the critical section earlier than others. Only highest priority thread (of which there is only one) can skip the line.

  • Deadlock freedom

Unless there was some serious error (process crash or some other way to bypass RAII) the lock is guaranteed to be released in case of exceptions and any other control flow RAII can affect. There is also absence of ways for threads to actively block each other by doing actions that would prevent the other from progress. Only the current owner of the critical section will decide if anybody will progress and then cache effects will determine which thread goes next (since atomics are used, exactly one is guaranteed to progress).

- Only one high priority lock at a time

The implementation does CAS on high priority lock creation (if the counter is 0, replace with 1). The destructor of the high priority lock will reset it back to 0.

Facepalm. I somehow managed to put the reset to zero in the constructor. I guess writing code at night is a bad idea.

The ugly parts

  • Potential starvation

As any atomics only algorithm, the lock can starve any thread. The high priority lock might get starved because it is not fast enough to acquire the boolean flag (getting scheduled out of CPU in inopportune moment). The normal priority threads can be starved by design.

Code

#include <cstddef>
#include <atomic>
#include <stdexcept>
#include <immintrin.h>

namespace shino {

    /**
     * @brief the shared state of the normal priority and highest priority locks. This class is intended to be
     * a factory to the mentioned locks, thus not usable directly.
     *
     * Use `create_normal_priority_lock` and `create_highest_priority_lock` to obtain locks which are akin to
     * std::unique_lock (they do provide exception safety by unlocking in the destructor).
     */
    class priority_mutex {
        std::atomic<std::size_t> current_label = 0;
        std::atomic<std::size_t> next_label = 0;
        std::atomic<bool> is_lock_held = false;

        std::atomic<int> priority_thread_count = 0;
    public:
        /**
         * @brief a lock class intended to be used by normal priority thread.
         *
         * The mechanism of acquiring lock is by obtaining
         * a new label to wait on, and then when the other normal priority thread sets the label of the lock to the label
         * obtained by this thread, it will try to lock the boolean which is used to allow high priority thread to
         * skip the waiting line and be the next owner of the lock.
         */
        class normal_priority_thread_lock {
            priority_mutex* shared_state = nullptr;
            std::size_t my_label = -1;

            bool is_locked = false;
        public:
            normal_priority_thread_lock(const normal_priority_thread_lock&) = delete;
            normal_priority_thread_lock& operator=(const normal_priority_thread_lock&) = delete;

            /**
             * @brief Lock the priority_mutex using normal priority (will yield to high priority thread if contended),
             * the calling thread will be granted exclusive ownership to the critical section guarded by this `priority_mutex`
             *
             * @pre the lock is in unlocked state (all previous `lock()` calls were followed by `unlock()` in one by one manner)
             * @post the lock is in locked state
             */
            void lock() {
                if (is_locked) {
                    throw std::logic_error("attempt to deadlock by double locking");
                }

                my_label = shared_state->next_label++;
                while (shared_state->current_label.load(std::memory_order_consume) != my_label) {
                    _mm_pause();
                }

                /*
                 * attempt to reduce starvation of the priority thread. Since pause takes more than one instruction,
                 * when the pause points don't exactly align, the priority thread might miss the window to lock the
                 * atomic bool. That is why there is exactly one pause after the loop
                 */
                _mm_pause();


                /*
                 * don't spin on CAS to decrease cache traffic
                 */
                bool expected = false;
                do {
                    expected = false;
                    while (shared_state->is_lock_held.load(std::memory_order_consume) != expected)
                    {
                        _mm_pause();
                    }
                } while (!shared_state->is_lock_held.compare_exchange_strong(expected, true, std::memory_order_acq_rel, std::memory_order_consume));

                is_locked = true;
            }

            /**
             * @brief Unlock the critical section guarded by this `priority_mutex` thus allowing the next thread to progress.
             * If highest_priority_thread_lock contends with other locks, the highest priority one will get priority.
             *
             * @pre the lock is in locked state (all previous `unlock()` calls were followed by `lock()` in one by one manner)
             * @post the lock is in unlocked state
             */
            void unlock() {
                if (!is_locked) {
                    throw std::logic_error("attempting to unlock non locked lock");
                }

                bool expected = true;
                if (!shared_state->is_lock_held.compare_exchange_strong(expected, false, std::memory_order_acq_rel, std::memory_order_consume)) {
                    throw std::logic_error("either double unlock or something wrong with the lock itself");
                }

                const auto next_in_line = my_label + 1;
                auto stored_label = my_label;
                if (!shared_state->current_label.compare_exchange_strong(stored_label, next_in_line, std::memory_order_acq_rel, std::memory_order_consume)) {
                    throw std::logic_error("somebody acquired the lock before this lock unlocked?");
                }
                is_locked = false;
            }

            ~normal_priority_thread_lock() {
                if (is_locked) {
                    unlock();
                }
            }

        private:
            friend priority_mutex;
            normal_priority_thread_lock(priority_mutex* shared_state, bool start_locked):
                shared_state(shared_state)
            {
                if (start_locked) {
                    lock();
                }
            }
        };

        /**
         * @brief a lock class intended to be used by highest priority thread.
         *
         * The mechanism of locking is to acquire atomic boolean by setting it to true when it is false. Normal priority
         * thread is unlikely to contend if both were waiting on the locked lock, because the boolean will be unlocked
         * first and only then the label will be set to the label of the waiting normal priority thread. **Staration is
         * possible in theory**.
         */
        class highest_priority_thread_lock {
            priority_mutex* shared_state = nullptr;
            bool is_locked = false;
        public:
            highest_priority_thread_lock(const highest_priority_thread_lock&) = delete;
            highest_priority_thread_lock& operator=(const highest_priority_thread_lock&) = delete;

            /**
             * @brief Lock the priority_mutex using highest priority (will skip the line if contended on a locked lock),
             * the calling thread will be granted exclusive ownership to the critical section guarded by this `priority_mutex`
             *
             * @pre the lock is in unlocked state (all previous `lock()` calls were followed by `unlock()` in one by one manner)
             * @post the lock is in locked state
             */
            void lock() {
                if (is_locked) {
                    throw std::logic_error("attempting to deadlock via double locking");
                }

                /*
                 * don't spin on CAS to decrease cache traffic
                 */
                bool expected = false;
                do {
                    expected = false;
                    while (shared_state->is_lock_held.load(std::memory_order_consume) != expected)
                    {
                        _mm_pause();
                    }
                } while (!shared_state->is_lock_held.compare_exchange_strong(expected, true, std::memory_order_acq_rel, std::memory_order_consume));

                is_locked = true;
            }

            /**
             * @brief Unlock the critical section guarded by this `priority_mutex` thus allowing the next thread to progress.
             * If highest_priority_thread_lock contends with other locks, the highest priority one will get priority.
             *
             * @pre the lock is in locked state (all previous `unlock()` calls were followed by `lock()` in one by one manner)
             * @post the lock is in unlocked state
             */
            void unlock() {
                if (!is_locked) {
                    throw std::logic_error("attempting to unlock non-locked lock");
                }

                bool expected = true;
                if (!shared_state->is_lock_held.compare_exchange_strong(expected, false, std::memory_order_acq_rel, std::memory_order_acquire)) {
                    throw std::logic_error("either double unlock or something wrong with the lock itself");
                }
                is_locked = false;
            }

            ~highest_priority_thread_lock() {
                if (is_locked) {
                    unlock();
                }
            }
        private:
            friend priority_mutex;
            highest_priority_thread_lock(priority_mutex* shared_state, bool start_locked):
                shared_state(shared_state)
            {
                if (start_locked) {
                    lock();
                }

                shared_state->priority_thread_count.store(0, std::memory_order_release);
            }
        };

        /**
         * @brief creates an instance of `normal_priority_thread_lock` that shares inner state with this mutex
         * @param start_locked if true the lock will be locked on construction, otherwise it will be constructed in
         * unlocked state
         * @return instance of `normal_priority_thread_lock`
         */
        normal_priority_thread_lock create_normal_priority_lock(bool start_locked = true) {
            return {this, start_locked};
        }

        /**
         * @brief creates an instance of `highest_priority_thread_lock` that shares inner state with this mutex.
         * There can be only one highest priority lock at a time.
         *
         * @param start_locked if true the lock will be locked on construction, otherwise it will be constructed in
         * unlocked state
         * @return instance of `highest_priority_thread_lock`
         */
        highest_priority_thread_lock create_highest_priority_lock(bool start_locked = true) {
            int desired = 0;
            if (priority_thread_count.compare_exchange_strong(desired, 1, std::memory_order_acq_rel)) {
                return {this, start_locked};
            } else {
                throw std::logic_error("attempting to create multiple highest priority locks");
            }
        }
    };
}

/*
 * Test: the idea is to have a normal priority thread to be first and lock for a long amount of time,
 * and then the priority thread should take the lock even though there are threads waiting before it
 */

#include <iostream>
#include <thread>

void first_thread_function(shino::priority_mutex& mutex, std::atomic<bool>& flag) {
    auto normal_lock = mutex.create_normal_priority_lock();
    flag.store(true, std::memory_order_release);
    std::cout << "print from normal thread\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

void normal_priority_function(shino::priority_mutex& mutex) {
    auto normal_lock = mutex.create_normal_priority_lock();
    std::cout << "print from normal thread\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

void high_priority_function(shino::priority_mutex& mutex) {
    auto priority_lock = mutex.create_highest_priority_lock();
    std::cout << "print from priority thread\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

int main() {
    for (std::size_t i = 0; i < 4; ++i) {
        std::atomic<bool> start_flag = false;
        shino::priority_mutex mutex;
        auto t0 = std::thread(first_thread_function, std::ref(mutex), std::ref(start_flag));
        while (!start_flag.load(std::memory_order_consume)) {_mm_pause();}
        auto t1 = std::thread(normal_priority_function, std::ref(mutex));
        auto t2 = std::thread(normal_priority_function, std::ref(mutex));
        auto t3 = std::thread(high_priority_function, std::ref(mutex));
        t0.join();
        t1.join();
        t2.join();
        t3.join();
        std::cout << "=======\n";
    }

    return 0;
}

Why there is insufficient testing?

I only did a sanity check because I am planning on getting the tests reviewed as well separately. The reason for separation is to not overwhelm the reviewers with complexity. Please downvote and comment if you think this approach is wrong.

The output of the test should be

print from normal thread
print from priority thread
print from normal thread
print from normal thread
=======
print from normal thread
print from priority thread
print from normal thread
print from normal thread
=======
print from normal thread
print from priority thread
print from normal thread
print from normal thread
=======
print from normal thread
print from priority thread
print from normal thread
print from normal thread
=======

Build

Since this is a single file, any compiler invocation with C++17 mode will do.

What review I would prefer?

I would prefer a code review that would focus on proper usage of atomics and better mechanisms to reduce starvation. I thought about the layout of the class all falling on the same cache line, but making the class bigger was not the decision I wanted to make.

\$\endgroup\$
4
  • \$\begingroup\$ It's not quite clear from the description: how many "high priority" threads should the implementation support? That is, are we guaranteed there's a single one at any point in time? \$\endgroup\$ Commented Jan 26, 2022 at 8:33
  • 2
    \$\begingroup\$ @MatthieuM., the guarantee is to have only one. The shared state maintains a counter (actually I should've made it bool). The create function does CAS (if 0 replace with 1) and if it fails it will throw. The destructor of high priority lock will set it to 0 again. Edited into question. \$\endgroup\$ Commented Jan 26, 2022 at 8:39
  • \$\begingroup\$ @MatthieuM., when I went to double check, I noticed I put the reset into constructor instead of destructor ... Was that the reason you were unsure? \$\endgroup\$ Commented Jan 26, 2022 at 9:30
  • \$\begingroup\$ No, the use std::atomic<int> priority_thread_count; threw me off, as it seemed to suggest there may be several. \$\endgroup\$ Commented Jan 26, 2022 at 10:58

2 Answers 2

14
\$\begingroup\$

Should You Really Create the High-Priority Mutex Separately?

This introduces a new, invalid state (where the high-priority mutex hasn’t been created yet), and prevents you from checking multiple state variables with the same atomic operation (see below).

Consider Using an atomic_flag

You currently are reinventing the wheel with a CAS loop on an atomic<bool>, but there’s already std::atomic_flag::test_and_set() in the standard library.

One reason not to would be,

Check Multiple State Variables with a Single CAS

Currently, your lock() algorithm has multiple steps and is pretty complicated. But what it’s really trying to do is, wait for one specific state:

  • This thread’s ticket is next in line
  • The mutex lock is clear
  • There are zero high-priority threads waiting

It always wants to update to a specific state:

  • The current ticket is this thread’s
  • The mutex lock is set
  • There are zero high-priority threads waiting

If you pack a ticket number, a count of high-priority waiting threads, and a lock value into a struct, and wrap that into something like atomic<priority_mutex::state_t> the .lock() algorithm simplifies to a CAS loop that awaits that specific state. If this all fits into 64 bits, you’re golden. If you can fit it into 128 bits, and declare the type alignas(16), many x86_64 compilers (including Clang++ 13, MSVC 19 and ICX 2022) will automatically use the native cmpxchg16b instruction. ICX is even smart enough not to need the alignment hint.

Edit: It is in fact much faster to wait for the expected state with atomic loads than by attempting a CAS, which on x86-64 locks the bus.

The high-priority lock is a little more complicated, as it wants to be able to increment the count of waiting high-priority threads as a separate operation, test-and-set the lock, and ignore the ticket number. You don’t want a high-priority thread to be blocked because a lower-priority one updated the ticket.

This implementation doesn’t let you check for double-lock as you currently do, as multiple high-priority threads attempting to wait on the same queue is not a bug. (If you still want to check for double-increment or double-decrement, maybe each thread remembers whether it’s currently holding a lock, and checks that thread variable off the critical path.)

Is All the World an X86?

Currently, you have a non-portable function, _mm_pause(), scattered all over the place. As far as dependencies on specific hardware go, this isn’t the worst; you can do a search-and-replace if you ever have to.

So you might as well do it up front. Here’s a good start:

#if __x86__ || __x86_64__|| _M_IX86 || _M_X64

#include <immintrin.h>

inline void spin_pause()
{
  return _mm_pause();
}

#else
#  error "Implement spin_pause() for this target."
#endif // Target check

Then, if you need to migrate to ARM64, for example, you can update this to generate an isb sy instruction in your spinlocks on Clang, GCC or MSVC with:

#if __x86__ || __x86_64__ || _M_IX86 || _M_X64

#include <immintrin.h>

inline void spin_pause()
{
  return _mm_pause();
}

#elif __aarch64__

inline void spin_pause()
{
  asm volatile("isb sy");
}

#elif _M_ARM64 && _MSC_VER

#include <intrin.h>

inline void spin_pause()
{
  __isb(_ARM64_BARRIER_SY);
}

#else
#  error "Implement spin_pause() for this ISA."
#endif // ISA check

(This is a hypothetical example; many real spinlocks for ARM64 use acquire/release semantics to generate a dmb instruction instead.)

Consider Yielding

On Intel, a PAUSE instruction will tell the CPU to let any other hardware threads on the same core make progress, but it won’t necessarily tell the OS that your thread is spinning. This makes it more likely to lose its timeshare in the middle of a critical section.

A good compromise is to optimistically retry with a barrier wait for a few times—I’ve read that 16 seems to work well on current hardware—and then yield the thread. If the first sixteen or one hundred waits all timed out, maybe you should yield the CPU. Which might look something like:

#include <assert.h>
#include <atomic>
#include <bit>
#include <stdint.h>
#include <thread>

#if __x86__ || __x86_64__ || _M_IX86 || _M_X64

#include <immintrin.h>

inline void spin_pause()
{
  return _mm_pause();
}

#else
#  error "Implement spin_pause() for this ISA."
#endif // ISA check

/* The alignas hint is needed for some compilers, such as Clang++ 13 x86_64, to
 * generate cmpxchg instructions.
 */
struct alignas(8) state_t {
  uint32_t ticket;
  uint16_t priority_threads_waiting;
  bool in_use;
  // G++ generates much better code if we pad this out to exactly 64 bits.
  char pad[sizeof(uint64_t) - sizeof(uint32_t) - sizeof(uint16_t) - sizeof(bool)] =
    {};
};

/* The default comparison operator on a state_t does separate comparisons on each
 * member variable.  It is more efficient to compare the object representations.
 */
bool inline operator==( const state_t& a, const state_t& b )
{
  static_assert( sizeof(state_t) == sizeof(uint64_t), "" );
  static_assert( alignof(state_t) >= alignof(uint64_t), "" );
  return std::bit_cast<uint64_t>(a) == std::bit_cast<uint64_t>(b);
  /* A compiler that does not support std::bit_cast might instead need:
  return *reinterpret_cast<const uint64_t*>(&a) ==
         *reinterpret_cast<const uint64_t*>(&b);
   * That is technically undefined behavior, though.  Could also use memcmp().
   */
}

bool inline operator!=( const state_t& a, const state_t& b )
{
  return !(a == b);
}

static_assert( std::atomic<state_t>::is_always_lock_free, "" );



#include <cstdlib>
#include <iomanip>
#include <iostream>

using std::cout;

/* Spins until it successfully observes the expected value of the variable,
 * and replaces it with the desired value.  Returns the desired value.  T 
 * must be comparable and copyable.  It should fit in one or two registers and
 * atomic<T> should be lock-free.  Not currently required as constraints.
 */
template<class T>
T spin_CAS( std::atomic<T>& variable, const T expected, const T desired )
{
  constexpr unsigned spin_optimism = 16;

  /* It is significantly faster to check for the desired state with a load
   * than by attempting a compare-exchange, which locks the bus on x86_64.
   */
  do {
    // Attempt a limited number of retries, then yield the thread.
    for ( unsigned i = spin_optimism; i > 0; --i ) {
      T current = expected;
      if ( variable.load(std::memory_order_relaxed) == current &&
           variable.compare_exchange_weak( current,
                                           desired,
                                           std::memory_order_acquire )
         ) {
         return desired;
      } // end if
      spin_pause();
    } // end for

    // Checking for an erroneous state would go here.
    std::this_thread::yield();
  } while (true);
}

void thread_increment( std::atomic<state_t>& s,
                       const std::atomic_uint32_t& ticket
                     )
/* This simulates the other low-priority threads in the program by updating the
 * counter with acquire-release semantics.
 */
{
  // constexpr unsigned spin_optimism = 16;
  state_t current = s.load(std::memory_order_relaxed);
  state_t desired = { current.ticket+1, 0, false };
  unsigned long counter = 0x01000000;

  while ( !s.compare_exchange_strong( current,
                                      desired,
                                      std::memory_order_acq_rel ) ||
          desired.ticket < ticket.load(std::memory_order_acquire)-1
        ) {
    if (current.ticket >= counter ) {
      cout.flush();
      cout << std::hex << current.ticket << std::endl;
      counter += 0x01000000;
    }
    desired = { current.ticket+1, 0, false };
  } // end while

  cout.flush();
  cout << "Incrementer: " << std::hex << desired.ticket << std::endl;
}

std::atomic_uint32_t our_ticket = 0x12345678;
std::atomic<state_t> test_state = state_t{ 0x01, 0, false };

int main()
{
  auto t1 = std::thread( thread_increment,
                         std::ref(test_state),
                         std::ref(our_ticket)
                       );
  const auto ticket = our_ticket++;
  auto current = spin_CAS( test_state,
                           state_t{ ticket, 0, false },
                           state_t{ ticket, 1, false }
                         );
  const auto punched_ticket = current.ticket;
  cout.flush();
  cout << "Main thread: {"  << std::hex << current.ticket
         << ", " << current.priority_threads_waiting
         << ", " << (current.in_use ? "true" : "false")
         << "}" << std::endl;

  while(!test_state.compare_exchange_strong( current,
                                             { punched_ticket+1, 0, false },
                                             std::memory_order_release
                                           )) {
    cout.flush();
    cout << "Incorrectly spinning: {" << std::hex << current.ticket
         << ", " << current.priority_threads_waiting
         << ", " << (current.in_use ? "true" : "false")
         << "}" << std::endl;
  }

  t1.join();

  return EXIT_SUCCESS;
}

Edit: All compilers I tested generate much better code when the structure used for CAS is padded and aligned to an exact 64-bit boundary. It also compiles correctly on ICX 2022 and MSVC, although neither optimizes it as well. On Clang++ 13.0.0 on Linux with -std=c++20 -Os -march=x86-64-v4, the spin_CAS function compiles to the following inline loop:

.LBB1_1:                                # =>This Loop Header: Depth=1
        mov     ecx, 16
.LBB1_2:                                #   Parent Loop BB1_1 Depth=1
        mov     rax, qword ptr [rip + test_state]
        cmp     rax, rbp
        jne     .LBB1_4
        mov     rax, rbp
        lock            cmpxchg qword ptr [rip + test_state], r14
        je      .LBB1_6
.LBB1_4:                                #   in Loop: Header=BB1_2 Depth=2
        pause
        dec     ecx
        jne     .LBB1_2
        call    sched_yield
        jmp     .LBB1_1

There is, as you can see, a small bug in both Clang++13.0.0 and ICX 2022: they generate a wasteful mov instruction in the not-taken branch of a jump-if-not-equal, which sets a register to the register it just compared equal to! So, this loop should ideally be shorter.

You’d have to test to see if this improves performance, but in theory, this should allow other threads to progress make it less likely that a thread will time out while holding the lock, and perhaps save some battery life.

Are You Using the Right Memory Order?

Right now, you’re performing compare-and-swap with memory_order_acq_rel semantics. If you’re not depending on any data updates to or from other threads to be visible, you probably want memory_order_relaxed (although, if you’re using the right structures, the memory order should not actually generate different code on x86_64). You don’t want your CAS loop to give every other core a chance to alter the variable and starve you before you attempt to set it.

Is There a Better Algorithm?

You’re using a locking algorithm, but you sound like you want to minimize starvation. Would you be able to use a wait-free algorithm instead, at the cost of lower average throughput? This would avoid CAS and spinlocks entirely, using only operations such as test-and-set, increment and decrement.

Can you use a Receive-Copy-Update pattern that does not need to hold a lock, only to swap an atomic pointer?

\$\endgroup\$
10
  • \$\begingroup\$ yes, I believe RCU on the earliest event will do. That will probably take quite a bit of time though, as the rest of the datastructure has to be concurrent too. \$\endgroup\$ Commented Jan 25, 2022 at 17:49
  • \$\begingroup\$ @Incomputable RCU on 16-bit objects is very efficient on x86-64 because of cmpxchg16b. You will probably need an -march= or /arch: compiler option to enable that, however, and alignas(16) on the structure used for CAS for Clang, MSVC or ICX to optimize a CAS loop to generate optimal code. GCC 11.2 does not seem to be able to. \$\endgroup\$
    – Davislor
    Commented Jan 25, 2022 at 20:55
  • \$\begingroup\$ "If you’re not depending on any data updates to or from other threads to be visible" => Isn't the point of a lock, typically, to coordinate data updates with other threads, hence requiring an acquire/release pair? \$\endgroup\$ Commented Jan 26, 2022 at 8:25
  • \$\begingroup\$ Why are you using a Strong CAS in a loop, instead of a Weak one? And further, should you really retry immediately with a CAS (a small number of times) instead of going back to the lighter-weight (bus-wise) load loop? It seems to me that in case of contention by multiple threads on the lock that tight CAS loop is going to play havoc with cache lines (bouncing it back and forth across cores). \$\endgroup\$ Commented Jan 26, 2022 at 8:27
  • \$\begingroup\$ @MatthieuM. My understanding is that, on x86_64, those options don’t generate different instructions for CAS anyway. In this particular case, the loop seems to be setting only the ticket/lock/wait-count triple that controls access to a resource, and there might or might not be any other loads or stores that need to be sequenced with acquiring the lock. \$\endgroup\$
    – Davislor
    Commented Jan 26, 2022 at 8:39
10
\$\begingroup\$

Don't use CAS for bools

There is no need to use atomic compare-and-swap when toggling a boolean value. Consider that if you want to set the flag to true, you can just do:

bool oldvalue = flag.exchange(true);

Then either oldvalue == false, in which case you were the one that set it to true, or oldvalue == true, in which case you know someone else already set it to true, but the value will not be changed. So to lock something, you can use:

while (is_lock_held.exchange(true)) {
    _mm_pause();
}

Even better, use std::atomic_flag instead of std::atomic<bool>. Also consider using this for priority_thread_count, as it only ever is 0 or 1.

Don't rely on _mm_pause() for anything

_mm_pause() helps some CPUs keep cool while a thread is waiting for something to happen, but it doesn't guarantee anything related to thread starvation, nor does it prevent any race conditions. So keeping it inside while-loops that don't do anything else is fine, but the lone _mm_pause() outside the while-loop in lock() is just snake oil.

Let normal priority locks check priority_thread_count

Since you have the variable priority_thread_count, which is non-zero if a high-priority thread wants to take the lock, you can very simply make the low-priority threads back off by having them check that variable before trying to grab the lock:

class normal_priority_thread_lock {
    ...
public:
    void lock() {
        // Grab a ticket and wait for our number
        ...

        // Now it's our turn, take the lock
        while (priority_thread_count || is_lock_held.exchange(true)) {
            _mm_pause();
        }

        is_locked = true;
    }
}

While it may happen that a low-priority thread checks priority_thread_count and reads zero, then the high-priority thread sets it to one, and only then the low-priority thread performs the exchange on is_lock_held() before the high-priority thread can take it, this only happens sporadically, and it cannot starve the high priority thread, as eventually all low-priority threads will see that priority_thread_count is non-zero.

Make the lock classes work like std::lock_guard

I would maek the normal_priority_thread_lock and highest_priority_thread_lock classes work like std::lock_guard, and give them a constructor to which you have to pass the priority_mutex. This avoids the need to use create_*_lock() functions to make a lock object. It also adds extra safety, because consider that for the high-priority lock, bad things happen if you have one thread use create_highest_priority_lock() and another thread just constructs a highest_priority_thread_lock directly.

You could even consider moving the normal priority lock() and unlock() functions into priority_mutex, so that you can use a regular std::lock_guard object to lock a priority_mutex.

\$\endgroup\$
4
  • \$\begingroup\$ Damned... I was preparing a list of remarks while reading the code, and you nailed nearly all but one of them down already. +1 sir! The only remaining one I have, which would be lonely in a dedicated answer, is to possibly consider specifying looser memory orderings. is_lock_held should use acquire and release (it's the core of the mutex), the others can be relaxed. May not matter on x86/x64, but on ARM... \$\endgroup\$ Commented Jan 25, 2022 at 13:37
  • \$\begingroup\$ I believe it is antipattern to spin on exchange because it will create more cache traffic. The book I read (The Art of Multiprocessor Programming, second edition) advised exactly against exchange and test_and_set in a loop. The problem is that each thread will experience a cache miss because the value is now updated in each local version, thus there will be cache miss on every clock cycle. It is better to spin on load and then do exchange. Great answer otherwise, thanks! \$\endgroup\$ Commented Jan 25, 2022 at 15:04
  • \$\begingroup\$ But CAS still needs to get an exclusive hold of the value in order to be able to do the exchange atomically. Maybe it's not really a miss but it still is blocking something in hardware. The exact cost would surely depend on the platform you are running on, it needs to be benchmarked. \$\endgroup\$
    – G. Sliepen
    Commented Jan 25, 2022 at 18:52
  • \$\begingroup\$ @MatthieuM. I think Davislor has addressed the memory order in his answer. \$\endgroup\$
    – G. Sliepen
    Commented Jan 25, 2022 at 18:53

Not the answer you're looking for? Browse other questions tagged or ask your own question.