2
\$\begingroup\$

Introduction


This is a follow-up to a previous question of mine, where I presented another queue of the same type to get some feedback on it. Some people pointed out some fundamental errors I had made, and I came to learn that I was very naive when it comes to how I was padding variables. So this is an updated version, created using the feedback I got in the previous thread.

The new queue is 32bit and is intended to be portable and light-weight

Bounded Circular Buffer & Two Cursors


I'm still using a bounded circular buffer to store/load the data. And it still uses two cursors which indicate to the next producer/consumer which index on the buffer they should be working with. When a producer/consumer wishes to increment their respective cursor, both cursors are loaded at the same time, as they are both contained within an aligned structure. This is intended to ensure true-sharing of access to the cursors because they are never loaded individually. Once the cursors have been loaded, a fullness/emptiness check is performed before a CAS on the object holding both cursors. If either of the cursors has changed in the interim, the CAS will fail and another attempt is made. When the CAS succeeds, the data can be put into or removed from the buffer. In order to calculate the index on the buffer, an index-mask is used which will be one less than a power of two because we can use a bitwise-and instead of modulo to have the index wrap back to zero. For this to work the queue size must be a power of two, so the size specified by the user is raised up to the next power of two.

The Circular Buffer Nodes & Spin-locks


While the cursors maybe be protected by a CAS operation, each node on the buffer is protected by a spin-lock. This is to prevent the case where a consumer may try to read some data before a producer has finished putting it in. Or the opposite case, where a producer tries to add some data before a consumer has finished reading it.

The Code


Here's the full source, with many Doxygen style comments removed for clarity.

// SPDX-License-Identifier: GPL-2.0-or-later
/**
 * C++14 32bit Lockless Bounded Circular MPMC Queue type.
 * Author: Primrose Taylor
 */

#ifndef BOUNDED_CIRCULAR_MPMC_QUEUE_H
#define BOUNDED_CIRCULAR_MPMC_QUEUE_H

#include "stdio.h"
#include "stdlib.h"

#include <atomic>
#include <stdint.h>
#include <functional>
#include <thread>

#define CACHE_LINE_SIZE     64U

#if defined(_MSC_VER)
    #define HARDWARE_PAUSE()                _mm_pause();
    #define _ENABLE_ATOMIC_ALIGNMENT_FIX    1 // MSVC atomic alignment fix.
    #define ATOMIC_ALIGNMENT                4
#else
    #define ATOMIC_ALIGNMENT                16
    #if defined(__clang__) || defined(__GNUC__)
        #define HARDWARE_PAUSE()            __builtin_ia32_pause();
    #endif
#endif

/**
 * Lockless, Multi-Producer, Multi-Consumer, Bounded Circular Queue type.
 * The type is intended to be light weight & portable.
 * The sub-types are all padded to fit within cache lines. Padding may be put
 * inbetween member variables if the variables are accessed seperatley.
 */
template <typename T, uint_least32_t queue_size, bool should_yield_not_pause = false>
class bounded_circular_mpmc_queue final
{
    /**
     * Simple, efficient spin-lock implementation.
     * A function that takes a void lambda function can be used to
     * conveiniently do something which will be protected by the lock.
     * @cite Credit to Erik Rigtorp https://rigtorp.se/spinlock/
     */
    class spin_lock
    {
        std::atomic<bool> lock_flag;
        
    public:
        spin_lock()
            : lock_flag{false}
        {
        }

        void do_work_through_lock(const std::function<void()> functor)
        {
            lock();
            functor();
            unlock();
        }
        
        void lock()
        {
            while (true)
            {
                if (!lock_flag.exchange(true, std::memory_order_acquire))
                {
                    break;
                }

                while (lock_flag.load(std::memory_order_relaxed))
                {
                    should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
                }
            }
        }

        void unlock()
        {
            lock_flag.store(false, std::memory_order_release);
        }
    };

    /**
     * Structure that holds the two cursors.
     * The cursors are held together because we'll only ever be accessing
     * them both at the same time.
     * We don't directly align the struct because we need to use it as an
     * atomic variable, so we must align the atomic variable instead.
     */
    struct cursor_data
    {
        uint_fast32_t producer_cursor;
        uint_fast32_t consumer_cursor;
        uint8_t padding_bytes[CACHE_LINE_SIZE -
            sizeof(uint_fast32_t) -
            sizeof(uint_fast32_t)
            % CACHE_LINE_SIZE];

        cursor_data(const uint_fast32_t in_producer_cursor = 0,
            const uint_fast32_t in_consumer_cursor = 0)
            : producer_cursor(in_producer_cursor),
            consumer_cursor(in_consumer_cursor),
            padding_bytes{0}
        {
        }
    };

    /**
     * Structure that represents each node in the circular buffer.
     * Access to the data is protected by a spin lock.
     * Contention on the spin lock should be minimal, as it's only there
     * to prevent the case where a producer/consumer may try work with an element before
     * someone else has finished working with it. The data and the spin lock are seperated by
     * padding to put them in differnet cache lines, since they are not accessed
     * together in the case mentioned previously. The problem with this is
     * that in low contention cases, they will be accessed together, and thus
     * should be in the same cache line. More testing is needed here.
     */
    struct buffer_node
    {
        T data;
        uint8_t padding_bytes_0[CACHE_LINE_SIZE -
            sizeof(T) % CACHE_LINE_SIZE];
        spin_lock spin_lock_;
        uint8_t padding_bytes_1[CACHE_LINE_SIZE -
            sizeof(spin_lock)
            % CACHE_LINE_SIZE];

        buffer_node()
            : spin_lock_(),
            padding_bytes_0{0},
            padding_bytes_1{0}
        {
        }

        void get_data(T& out_data) const
        {
            spin_lock_.do_work_through_lock([&]()
            {
                out_data = data;
            });
        }

        void set_data(const T& in_data)
        {
            spin_lock_.do_work_through_lock([&]()
            {
               data = in_data; 
            });
        }
    };

    /**
     * Strucutre that contains the index mask, and the circular buffer.
     * Both are accessed at the same time, so they are not seperated by padding.
     */
    struct alignas(CACHE_LINE_SIZE) circular_buffer_data
    {
        const uint_fast32_t index_mask;
        buffer_node* circular_buffer;
        uint8_t padding_bytes[CACHE_LINE_SIZE -
            sizeof(const uint_fast32_t) -
            sizeof(buffer_node*)
            % CACHE_LINE_SIZE];

        circular_buffer_data()
            : index_mask(get_next_power_of_two()),
            padding_bytes{0}
        {
            static_assert(queue_size > 0, "Can't have a queue size <= 0!");
            static_assert(queue_size <= 0xffffffffU,
                "Can't have a queue length above 32bits!");
            static_assert(
                std::is_copy_constructible_v<T>     ||
                std::is_copy_assignable_v<T>        ||
                std::is_move_assignable_v<T>        ||
                std::is_move_constructible_v<T>,
                "Can't use non-copyable, non-assignable, non-movable, or non-constructible type!"
        );

            /** Contigiously allocate the buffer.
              * The theory behind using calloc and not aligned_alloc
              * or equivelant, is that the memory should still be aligned,
              * since calloc will align by the type size, which in this case
              * is a multiple of the cache line size.
             */
            circular_buffer = (buffer_node*)calloc(
                index_mask + 1, sizeof(buffer_node));
        }

        ~circular_buffer_data()
        {
            if(circular_buffer != nullptr)
            {
                free(circular_buffer);
            }
        }
        
    private:
        /**
         * @cite https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
         */
        uint_least32_t get_next_power_of_two()
        {
            uint_least32_t v = queue_size;

            v--;
            v |= v >> 1;
            v |= v >> 2;
            v |= v >> 4;
            v |= v >> 8;
            v |= v >> 16;
            v++;
            
            return v;
        }
    };
    
public:
    bounded_circular_mpmc_queue()
        : cursor_data_(cursor_data{}),
        circular_buffer_data_()
    {
    }

    bool push(const T& in_data)
    {
        cursor_data current_cursor_data;

        // An infinite while-loop is used instead of a do-while, to avoid
        // the yield/pause happening before the CAS operation.
        while(true)
        {
            current_cursor_data = cursor_data_.load(std::memory_order_acquire);

            // Check if the buffer is full..
            if (current_cursor_data.producer_cursor + 1 == current_cursor_data.consumer_cursor)
            {
                return false;
            }

            // CAS operation used to make sure the cursors have not been incremented
            // by another producer/consumer before we got to this point, and to then increment
            // the cursor by 1 if it hasn't been changed.
            if (cursor_data_.compare_exchange_weak(current_cursor_data,
            {current_cursor_data.producer_cursor + 1,
                current_cursor_data.consumer_cursor},
            std::memory_order_release, std::memory_order_relaxed))
            {
                break;
            }

            should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
        }

        // Set the data
        circular_buffer_data_.circular_buffer[
            current_cursor_data.producer_cursor & circular_buffer_data_.index_mask
            ].set_data(in_data);
        
        return true;
    }

    bool pop(T& out_data)
    {
        cursor_data current_cursor_data;

        while(true)
        {
            current_cursor_data = cursor_data_.load(std::memory_order_acquire);

            // Check if the queue is empty.. 
            if (current_cursor_data.consumer_cursor == current_cursor_data.producer_cursor)
            {
                return false;
            }

            if (cursor_data_.compare_exchange_weak(current_cursor_data,
            {current_cursor_data.producer_cursor,
                current_cursor_data.consumer_cursor + 1},
                std::memory_order_release, std::memory_order_relaxed))
            {
                break;
            }
            
            should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
        }

        // Get the data
        circular_buffer_data_.circular_buffer[
            current_cursor_data.consumer_cursor & circular_buffer_data_.index_mask
            ].get_data(out_data);
        
        return true;
    }
    
    uint_fast32_t size() const
    {
        const cursor_data cursors = cursor_data_.load(std::memory_order_acquire);
        return cursors.producer_cursor - cursors.consumer_cursor;
    }

    bool empty() const
    {
        return size() == 0;
    }

    bool full() const
    {
        return size() == circular_buffer_data_.index_mask + 1;
    }
    
private:
    alignas(CACHE_LINE_SIZE) std::atomic<cursor_data> cursor_data_;
    circular_buffer_data circular_buffer_data_;
    
private:
    bounded_circular_mpmc_queue(
        const bounded_circular_mpmc_queue&) = delete;
    bounded_circular_mpmc_queue& operator=(
        const bounded_circular_mpmc_queue&) = delete;
};

#endif

I'm wondering if my push/pop methods work as I think they do? Is there any chance of the ABA problem? And is the use of the spin-lock to guard each node they best way of doing it? I'm using one because in theory it shouldn't really need to be used very often, as in the vast majority of cases where no-one else is still in the middle of working with the node.

Any help would be greatly appreatiated! Cheers.

\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

Remove do_work_through_lock()

The intention behind this function is good, and the implementation looks reasonable for its intended use. However, since you added lock() and unlock() member functions, you can use a std::lock_guard to lock your spin_lock. This means that you can write:

void set_data(const T& in_data)
{
    std::lock_guard lg(spin_lock_);
    data = in_data;
}

If the function you would pass to do_work_through_lock() would be more complicated and could potentially throw exceptions, you cannot guarantee that unlock() would be called. std::lock_guard however takes care of that.

Use an enum class for should_yield_not_pause

Suppose you want to declare a queue that should yield, you have to write something like:

bounded_circular_mpmc_queue<int, 10, true> queue;

While it's very normal to see a value type and a size being passed as template parameters for a container, that true says very little. It's not only hard for someone reading this code to understand what it means, it might also be unclear for someone writing this code whether true means yield or pause. You can make it much more explicit by passing it as an enum class type template parameter, like so:

enum class wait_method {
    YIELD,
    PAUSE,
};

template <typename T, uint_least32_t queue_size, wait_method yield_or_pause = wait_method::YIELD>
class bounded_circular_mpmc_queue final
{
    ...
};

And when you need to decide whether to yield or pause write:

yield_or_pause == wait_method::YIELD ? std::this_thread::yield() : HARDWARE_PAUSE();

It might also help to make a private member function that yields-or-pauses, so you only have to write this logic once. Finally, while I wouldn't recommend it over the enum class solution, you could consider passing a function pointer as a template parameter:

template <typename T, uint_least32_t queue_size, void (*wait_method)() = std::this_thread::yield>
class bounded_circular_mpmc_queue final
{
    ...
};

And then just call wait_method() whenever you need to wait. This allows the user to pass an arbitrary non-member function.

Yet another solution is to take away the choice from the user, and instead do something like pausing for the first 10 iterations or so, and if you still haven't got a lock by then, start yielding.

Proper way to align things

Maybe CACHE_LINE_SIZE is set correctly for the CPU you are running on your code on, but it might be wrong on another CPU. Since your code only compiles with C++17 and up, consider using std::hardware_destructive_interference_size to get the size objects need to be apart to avoid cache line sharing. (Note that it might not be implemented in the C++ standard library you are using, so use the fallback shown in the example.)

Furthermore, there is no need to add padding bytes to structs you want to align. Your calculations for the size is incorrect anyway, as % has a higher operator precedence than -, and it would fail to compile if CACHE_LINE_SIZE is smaller than the size of the data you want to align, since taking the remainder of a negative number might be negative in C++. So consider writing:

struct buffer_node
{
    alignas(std::hardware_destructive_interference_size) T data;
    alignas(std::hardware_destructive_interference_size) spin_lock spin_lock_;
    ...
};

std::atomic<T> does not guarantee it is lock-free

While most built-in types will be lock-free on most platforms when used atomically, the use of std::atomic does not automatically guarantee that. Check with is_lock_free() that, for example, std::atomic<cursor_data> is lock-free, otherwise your whole queue will no longer be lock-free. Note that if you pad it to be the size of a cache line, it will very likely not be lock-free.

Avoid memory allocations

The theory behind using calloc() and not aligned_alloc() or equivelant, is that the memory should still be aligned, since calloc will align by the type size, which in this case is a multiple of the cache line size.

Unfortunately, that is not the case. It will return a pointer which is suitably aligned for any built-in type (this will probably be smaller than the cache line size), but it will not align it to the size parameter you pass to calloc(). Also, a buffer_node can be larger than a cache line, given a large enough T.

new will actually see the type of the object you are trying to allocate, including its alignment restrictions. So just by adding alignas attributes to the member variables of buffer_node, new buffer_node[index_mask + 1] will allocate a suitably aligned array.

Even better that new/delete would be to use a std::unique_ptr. But even better than that would be not to have to allocate memory at all. Consider writing:

/* Note: outside circular_buffer_data */
static constexpr uint_least32_t get_next_power_of_two()
{
    uint_least32_t v = queue_size();
    ...
    return v;
}

struct circular_buffer_data
{
    static constexpr uint_fast32_t index_mask = get_next_power_of_two();
    buffer_node circular_buffer[index_mask + 1];
};

Now this struct just has a single member variable, consider removing it entirely and just declare this directly in bounded_circular_mpmc_queue:

static constexpr uint_fast32_t index_mask = get_next_power_of_two();
buffer_node circular_buffer_data_[index_mask + 1];

Move the static_assert()s to the top of bounded_circular_mpmc_queue

The static_assert()s you had in the constructor of buffer_node don't depend on any parameter of buffer_node itself. So they should just be in bounded_circular_mpmc_queue directly. Also note that unlike assert(), static_assert() is a declaration, which means you don't need to put it inside a function. You can write it directly at the top of bounded_circular_mpmc_queue:

template <typename T, uint_least32_t queue_size, ...>
class bounded_circular_mpmc_queue final
{
     static_assert(queue_size > 0, "Can't have a queue size <= 0!");
     static_assert(queue_size <= 0xffffffffU, "Can't have a queue length above 32bits!");
     ...
};

Incorrect check for full queue

Your cursors are 32-bit integers that you increment indefinitely. Only when using it to index an item in circular_buffer[] do you AND it with the index_mask. This is fast and avoids the ABA problem for small queues. However, your check for whether the queue is full looks like this:

if (current_cursor_data.producer_cursor + 1 == current_cursor_data.consumer_cursor)

This is however incorrect. Consider what happens if the producers adds more than queue_size items to the queue before any consumer finishes consuming a single item. You should apply the mask on both sides of the equality operator:

if ((current_cursor_data.producer_cursor + 1) & circular_buffer_data_.index_mask ==
    current_cursor_data.consumer_cursor & circular_buffer_data_.index_mask)

Producers can still overwrite data a consumer is working on

You still have the same problem as in the first iteration of your code. Again, simplifying your push() and pop() function:

push(const T& in_data)
{
    auto produced_cursor = claim_cursor_for_push();
    circular_buffer_data_[producer_cursor].set_data(in_data);
}

pop(T& out_data)
{
    auto consumer_cursor = claim_cursor_for_pop();
    circular_buffer_data_[consumer_cursor].get_data(out_data);
}

In both cases, the act of claiming the cursor itself is an atomic operation, but getting/setting the data is a separate operation. This means that by calling pop(), consumer_cursor might be claimed, but another thread might then do a push() operation, which might overwrite the data at the consumer_cursor, since as far as cursor_data_ is concerned, the consumer just freed that index, so it is free for the taking of a producer.

\$\endgroup\$

Not the answer you're looking for? Browse other questions tagged or ask your own question.