6
\$\begingroup\$

Assume I have a type task_info that stores the task-specific data needed to execute the task. A std::vector of those is built before executing any of the individual tasks; this build-up is done in single-thread fashion. Then the parallel task execution is set up.

To control access to the mutable resource (e.g. the output streams), I need a mutex.

#include <atomic>
#include <thread>
#include <mutex>
#include <vector>

class task_info{}; // fill the blanks
void execute_task(task_info info, std::mutex& mutex); // fill the blanks

void process_in_parallel(
    std::vector<task_info> task_infos,
    std::size_t threads_count = std::thread::hardware_concurrency()
)
{
    // Never use more threads than there is work.
    threads_count = std::min(threads_count, task_infos.size());

    std::atomic_size_t task_index{ 0 }; // Note: std::atomic_size_t task_index{} is UB!
    std::mutex mutex{};
    // Every thread will do this.
    auto process = [&task_index, &task_infos, &mutex]
    {
        // The loop variable i indicates the next task to do on this thread.
        // Note: `fetch_add` is a post-increment, basically task_index++.
        for (std::size_t i; (i = task_index.fetch_add(1, std::memory_order_relaxed)) < task_infos.size(); )
        {
            execute_task(task_infos[i], mutex);
        }
    };
    std::vector<std::thread> threads(threads_count); // Set up the threads.
    for (auto& thread : threads) thread = std::thread{ process }; // Start the threads.
    for (auto& thread : threads) thread.join(); // Don’t forget to join the threads!
}

What is std::memory_order_relaxed? That is a memory fence and it is concerned with the relations of changes to multiple atomic variables are visible to threads. As long as you have one atomic variable only, use std::memory_order_relaxed as it puts the least constraints on the optimizer. For that reason, do not ever use task_index++ because it does fetch_add(1, std::memory_order_seq_cst), i.e. it does what you want, but with a fence that puts unnecessary restrictions.

How to use the mutex? The mutex is locked before the resource is accessed and unlocked as soon as the thread does not use the resource anymore. In simple cases, you use a std::lock_guard object to lock the mutex at its initialization and unlock the mutex at its destruction.

void execute_task(task_info info, std::mutex& mutex)
{
    // work ...
    {
        std::lock_guard<std::mutex> lock{ mutex };
        std::cout << "Task " << info.id() << " done 50%" << std::endl;
        // lock_guard unlocks mutex at the closing brace
    }
    // work ...
    {
        std::lock_guard<std::mutex> lock{ mutex };
        std::cout << "Task " << info.id() << " completed" << std::endl;
    }
}

If the little scopes are too noisy for a single line of code, one can use a comma expression:

// work ...
std::lock_guard<std::mutex>{ mutex }, std::cout << "Task " << info.id() << " done 50%" << std::endl;
// work ...
std::lock_guard<std::mutex>{ mutex }, std::cout << "Task " << info.id() << " completed" << std::endl;

The unnamed temporary lock_guard’s destructor runs at the next semicolon.

If you don’t actually have a shared mutable resource, you don’t need the mutex.

If task_infos.size() > SIZE_MAX - threads_count the index i overlfows/wraps because task_index is incremented once for each task_info (loop condition is true) plus one additional time for each thread (loop condition false). Unless threads_count is unreasonably big or you have a gigantic number of task_infos, this won’t be a problem, but if it might be, it must be mitigated by fetch_adding manually:

auto process = [&task_index, &task_infos, &mutex]
{
    for (std::size_t i{}; i < task_infos.size(); )
    {
        if (task_index.compare_exchange_weak(i, i + 1, std::memory_order_relaxed))
            execute_task(task_infos[i], mutex);
    }
};

compare_exchange_weak is the crucial part: It compares task_index and i; if they’re equal, sets task_index to i + 1 (i.e. increments it) and returns true; else, it sets i to the current value of task_index and returns false. In our case, a return value of true, i.e. a successful increment, means that this thread has won the race for i and will perform the corresponding task. A return value of false sets i to a new value, which can be of one of two categories: An index to “bargain for” or task_infos.size(). If it is the latter, the for loop condition will fail and this thread stops doing tasks. Otherwise, it’s compare_exchange_weak again, but with the new i.

About the “weak” part, it means that compare_exchange_weak is allowed return false (and not update task_index) even if i and task_index are equal (it may “spuriously fail” in technical terms). Because that is fine in our case, compare_exchange_strong is not needed.

Is my reasoning in all of this correct?

\$\endgroup\$

1 Answer 1

4
\$\begingroup\$

Turn the pattern into a generic function

Your process_in_parallel() function only works with one specific task_info type and one execute_task() function. If you had different kinds of tasks to perform, you'd have to create multiple functions like process_in_parallel() as well. But in C++ we can avoid that by making process_in_parallel() a template:

template<typename TaskInfo, typename Executor>
void process_in_parallel(
    std::vector<TaskInfo> task_infos,
    Executor execute_task,
    std::size_t threads_count = std::thread::hardware_concurrency()
) {
    …
}

The standard library already provides this functionality

Since C++17 the standard library provides parallel versions of many of the standard algorithms. For example, you can write:

class task_info {…};
void execute_task(task_info);

std::vector<task_info> tasks;
std::for_each(std::execution::par, tasks.begin(), tasks.end(), execute_task);

If you need a mutex as well, then you could have it as a global variable, or use lambda expressions to pass a mutex:

void execute_task(task_info, std::mutex&);
…
std::mutex mutex;
std::for_each(std::execution::par, tasks.begin(), tasks.end(),
              [&mutex](auto& task_info){ execute_task(task_info, mutex); });
\$\endgroup\$
4
  • 1
    \$\begingroup\$ You’re right about making it a template. I didn’t tag it, but the code base I’m working with is C++14. But are you sure std::execution::par_unseq is right? It puts a lot of restrictions, like you cannot lock a mutex or allocate memory. As far as I understand, std::execution::par might the right one. \$\endgroup\$
    – Bolpat
    Commented Apr 20, 2023 at 12:23
  • \$\begingroup\$ You're right, it should just be std::execution::par. \$\endgroup\$
    – G. Sliepen
    Commented Apr 20, 2023 at 12:49
  • 1
    \$\begingroup\$ I just tested this with GCC 12.2.0 and Clang 16.0.1 using -O3 giving it 100 task_info elements: Clang does not support execution policies at all and GCC does not parallelize (as I understand, an execution policy is an optimization suggestion, not something it must do). \$\endgroup\$
    – Bolpat
    Commented Apr 27, 2023 at 17:44
  • \$\begingroup\$ You might need to link with the TBB library for the parallel execution policies to work as expected. \$\endgroup\$
    – G. Sliepen
    Commented Apr 28, 2023 at 6:18

Not the answer you're looking for? Browse other questions tagged or ask your own question.