5
\$\begingroup\$

This is a follow up to my previous post. I've made a number of improvements to the thread pool and corrected some bugs as well.

The most up to date version of the code is available on my Github.

I have since removed the use of std::binary_semaphore and instead moved to using std::condition_variable_any. I was playing around with using a std::counting_semaphore instead but couldn't figure out a good way to do so.

thread_pool.h

#pragma once

#include <concepts>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <thread>
#include <type_traits>

#include "thread_pool/thread_safe_queue.h"

namespace dp {

    namespace detail {
        template <class T>
        std::decay_t<T> decay_copy(T &&v) {
            return std::forward<T>(v);
        }

        // bind F and parameter pack into a nullary one shot. Lambda captures by value.
        template <typename... Args, typename F>
        auto bind(F &&f, Args &&...args) {
            return [f = decay_copy(std::forward<F>(f)),
                    ... args = decay_copy(std::forward<Args>(args))]() mutable -> decltype(auto) {
                return std::invoke(std::move(f), std::move(args)...);
            };
        }

    }  // namespace detail

    template <typename FunctionType = std::function<void()>>
    requires std::invocable<FunctionType> &&
        std::is_same_v<void, std::invoke_result_t<FunctionType>>
    class thread_pool {
      public:
        thread_pool(const unsigned int &number_of_threads = std::thread::hardware_concurrency()) {
            for (std::size_t i = 0; i < number_of_threads; ++i) {
                threads_.emplace_back([&](const std::stop_token stop_tok) {
                    do {
                        // check if we have task
                        if (queue_.empty()) {
                            // no tasks, so we wait instead of spinning
                            std::unique_lock lock(condition_mutex_);
                            condition_.wait(lock, stop_tok, [this]() { return !queue_.empty(); });
                        }

                        // ensure we have a task before getting task
                        // since the dtor notifies via the condition variable as well
                        if (!queue_.empty()) {
                            // get the task
                            auto task = queue_.pop();
                            // invoke the task
                            std::invoke(std::move(task));
                            // decrement in-flight counter
                            --in_flight_;
                        }
                    } while (!stop_tok.stop_requested());
                });
            }
        }

        ~thread_pool() {
            // wait for tasks to complete first
            do {
                std::this_thread::yield();
            } while (in_flight_ > 0);

            // stop all threads
            for (auto &thread : threads_) {
                thread.request_stop();
            }
            condition_.notify_all();
        }

        /// thread pool is non-copyable
        thread_pool(const thread_pool &) = delete;
        thread_pool &operator=(const thread_pool &) = delete;

        template <typename Function, typename... Args,
                  typename ReturnType = std::invoke_result_t<Function &&, Args &&...>>
        requires std::invocable<Function, Args...>
        [[nodiscard]] std::future<ReturnType> enqueue(Function f, Args... args) {
            /*
             * use shared promise here so that we don't break the promise later (until C++23)
             *
             * with C++23 we can do the following:
             *
             * std::promise<ReturnType> promise;
             * auto future = promise.get_future();
             * auto task = [func = std::move(f), ...largs = std::move(args),
                              promise = std::move(promise)]() mutable {...};
             */
            auto shared_promise = std::make_shared<std::promise<ReturnType>>();
            auto task = [func = std::move(f), ... largs = std::move(args),
                         promise = shared_promise]() { promise->set_value(func(largs...)); };

            // get the future before enqueuing the task
            auto future = shared_promise->get_future();
            // enqueue the task
            enqueue_task(std::move(task));
            return future;
        }

        template <typename Function, typename... Args>
        requires std::invocable<Function, Args...> &&
            std::is_same_v<void, std::invoke_result_t<Function &&, Args &&...>>
        void enqueue_detach(Function &&func, Args &&...args) {
            enqueue_task(detail::bind(std::forward<Function>(func), std::forward<Args>(args)...));
        }

      private:
        template <typename Function>
        void enqueue_task(Function &&f) {
            ++in_flight_;
            {
                std::lock_guard lock(condition_mutex_);
                queue_.push(std::forward<Function>(f));
            }
            condition_.notify_all();
        }

        std::condition_variable_any condition_;
        std::mutex condition_mutex_;
        std::vector<std::jthread> threads_;
        dp::thread_safe_queue<FunctionType> queue_;
        std::atomic<int64_t> in_flight_{0};
    };
}  // namespace dp

Again for clarity, below is my thread safe queue implementation:

thread_safe_queue.h

#pragma once

#include <condition_variable>
#include <deque>
#include <mutex>

namespace dp {
    template <typename T>
    class thread_safe_queue {
      public:
        using value_type = T;
        using size_type = typename std::deque<T>::size_type;

        thread_safe_queue() = default;

        void push(T&& value) {
            {
                std::lock_guard lock(mutex_);
                data_.push_back(std::forward<T>(value));
            }
            condition_variable_.notify_all();
        }

        bool empty() {
            std::lock_guard lock(mutex_);
            return data_.empty();
        }

        [[nodiscard]] size_type size() {
            std::lock_guard lock(mutex_);
            return data_.size();
        }

        [[nodiscard]] T pop() {
            std::unique_lock lock(mutex_);
            condition_variable_.wait(lock, [this] { return !data_.empty(); });
            auto front = data_.front();
            data_.pop_front();
            return front;
        }

      private:
        using mutex_type = std::mutex;
        std::deque<T> data_;
        mutable mutex_type mutex_{};
        std::condition_variable condition_variable_{};
    };
}  // namespace dp

Example driver code:

#include <thread_pool/thread_pool.h>

dp::thread_pool pool(4);
const auto total_tasks = 30;
std::vector<std::future<int>> futures;

for (auto i = 0; i < total_tasks; i++) {
    auto task = [index = i]() { return index; };
    futures.push_back(pool.enqueue(task));
}

Any and all feedback is much appreciated. Would you use this implementation in one of your projects? If not, please share! I'm curious to hear where this can be improved. My goal is to have something that is not only performant, but reliable and "bulletproof".

\$\endgroup\$
0

2 Answers 2

4
\$\begingroup\$

This looks pretty solid. I just have a few minor nit-picks.


thread_safe_queue

There's no need to declare the default constructor just to explicitly default it.


empty() and size() ought to be declared const (the mutex is already mutable to support this, so I'm guessing that's just an oversight).


I don't see a good reason for [[nodiscard]] on size() (though strangely not on empty()). There's no real risk if the return value isn't used (unlike, say sscanf()). I'm not sure about pop() - it's certainly conceivable that we'd want to discard a series of values, dependent on other processing. But it might be good to explicitly mark those with a (void) cast.


We missed an initialiser for data_. Though default initialisation is fine, that's inconsistent with other members, and providing {} keeps GCC's -Weffc++ quiet.


thread_pool

We misspelt std::int64_t when declaring in_flight_. Does this really need to be exactly 64 bits, or would std::int_fast64_t be a better (and more portable) choice? Does it really need to be a signed type? I think we might be better with std::size_t here.

The pool worker in the constructor should probably catch exceptions and find a way to report them - at present, any exception from a queue task will kill the worker thread.


Instead of testing !queue.empty() (which takes a lock) before attempting queue.pop(), perhaps we could test !stop_tok.stop_requested()? I'm thinking something like

                for (;;) {
                    // check if we have task
                    if (queue_.empty()) {
                        // no tasks, so we wait for task or destructor
                        std::unique_lock lock(condition_mutex_);
                        condition_.wait(lock, stop_tok, [this]() { return !queue_.empty(); });
                        // was it the destructor?
                        if (stop_tok.stop_requested()) { break; }
                    }

                    std::invoke(queue_.pop());
                    --in_flight_;
                }

If two threads find the queue non-empty, they could both call pop() and one of them may block (forever, if no new tasks get queued) since there's no locking covering both test and access. I think we need to lock the mutex from the empty test right through to the pop():

    for (;;) {
        {
            std::unique_lock lock(condition_mutex_);
            if (queue_.empty()) {
                condition_.wait(lock, stop_tok, [this]() {
                    return !queue_.empty();
                });
                // was it the destructor?
                if (stop_tok.stop_requested()) { break; }
            }
            auto task = queue_.pop();
        }

        std::invoke(task);
        --in_flight_;
    }

This then causes us to reconsider whether our thread-safe queue class is really what we need, or whether we should handle all the locking externally, in thread_pool.


The destructor busy-loops through yield (at least, I saw one CPU pegged while waiting for tasks that just sleep). We could use a more efficient mechanism here.

In fact, I don't think we need to wait for the queue to empty, as the worker threads won't look at the stop token until that happens. And we don't need to signal the condition variable, because setting the stop token will interrupt any wait:

    ~thread_pool() {
        for (auto &thread : threads_) {
            thread.request_stop();
        }
        for (auto &thread : threads_) {
            thread.join();
        }
    }

Instead of explicitly deleting the copy constructor and assignment operator, we could just write a comment saying that the mutex member makes thread_pool non-copyable.


In enqueue, perhaps the ReturnType template parameter should be first, as that's the one thing that the user might want to explicitly provide, whereas Function and Args must always be consistent with the argument list. But then we couldn't reasonably default it; I think that means we need two versions of the function:

    template <typename ReturnType, typename Function, typename... Args>
       requires std::assignable_from<ReturnType&, std::invoke_result_t<Function &&, Args &&...>>
    [[nodiscard]] std::future<ReturnType> enqueue(Function f, Args... args) {
        ⋮
    }

    template <typename Function, typename... Args>
    requires std::invocable<Function, Args...>
    [[nodiscard]] auto enqueue(Function&& f, Args&&... args) {
        return enqueue<std::invoke_result_t<Function &&, Args &&...>>
            (std::forward<Function>(f), std::forward<Args>(args)...);
    }

Future directions

Could the queue use lock-free techniques? Would that be worthwhile, or just extra complexity for very small gain?

\$\endgroup\$
2
  • \$\begingroup\$ Good points overall! Thank you for the feedback. I think I can address most of these issues. As for performance, I want to add some benchmarks to get a baseline before making changes or adding features. Eventually I want to look into lock free queues and work stealing. For the yield() issue, do you have any ideas for alternative approaches? \$\endgroup\$ Commented Feb 25, 2022 at 16:05
  • 2
    \$\begingroup\$ I think it might work to just remove the yield loop. I've updated. I also found a race condition (all threads share the same queue, so we can't assume that !empty() implies we can pop() if other threads can operate on it in between). \$\endgroup\$ Commented Feb 25, 2022 at 16:54
4
\$\begingroup\$

Make thread_safe_queue handle termination

Toby Speight already pointed out some thread safety issues. I think a big issue is that both thread_safe_queue and thread_pool have mutexes and condition variables, and thread_pool is also keeping track of the number of items in the queue. This can all very easily go out of sync if you are not careful. I think the best way is to modify thread_safe_queue so it can tell the worker threads when to terminate. Consider adding a request_stop() function to thread_safe_queue, and have pop() return a std::optional<T> so it can signal when there is nothing left to do:

template <typename T>
class thread_safe_queue {
public:
    ...
    void request_stop() {
        {
            std::unique_lock lock(mutex_);
            stop_ = true;
        }
        condition_.notify_all();
    }

    [[nodiscard]] std::optional<T> pop() {
        std::unique_lock lock(mutex_);
        condition_variable_.wait(lock, [this] { return !data_.empty() || stop_; });
        if (data_.empty()) {
            return std::nullopt;
        }
        auto front = std::move(data_.front());
        data_.pop_front();
        return front;
    }
    ...
private:
    ...
    bool stop_{};
};

Then the thread pool can just do:

thread_pool(unsigned int number_of_threads = std::thread::hardware_concurrency()) {
    for (std::size_t i = 0; i < number_of_threads; ++i) {
        threads_.emplace_back([&] {
            while (auto task = queue_.pop()) {
                std::invoke(std::move(*task));
            }
        });
    }
}

~thread_pool() {
    queue_.request_stop();
}

You might want to throw an exception if someone tries to push() to a queue after request_stop() has been used.

Use notify_one() if not all threads need to wake up

Normally, when you push a new item to the queue, only one thread will be able to use it. So it doesn't make sense to wake up all threads that are waiting for an item. Instead, use notify_one(). You should use notify_all() only if you really need all threads to be woken, which is only when you want to terminate the threads.

\$\endgroup\$
3
  • \$\begingroup\$ Thanks for the feedback. I've actually made a number of changes to the queue already including removing the condition variable as well as the size() function. I've also made pop() return an optional and I removed the second queue_.empty() check in the thread lambda in the thread pool. This should help mitigate some of the issues mentioned (I think). If I'm wrong please let me know \$\endgroup\$ Commented Feb 26, 2022 at 14:16
  • 1
    \$\begingroup\$ Great! If you create a new question here on Code Review with the revised code, we can have a look at it. \$\endgroup\$
    – G. Sliepen
    Commented Feb 26, 2022 at 16:01
  • \$\begingroup\$ I've created a new question here: codereview.stackexchange.com/questions/276593/… \$\endgroup\$ Commented May 16, 2022 at 16:31

Not the answer you're looking for? Browse other questions tagged or ask your own question.