Assume I have a type task_info
that stores the task-specific data needed to execute the task. A std::vector
of those is built before executing any of the individual tasks; this build-up is done in single-thread fashion. Then the parallel task execution is set up.
To control access to the mutable resource (e.g. the output streams), I need a mutex.
#include <atomic>
#include <thread>
#include <mutex>
#include <vector>
class task_info{}; // fill the blanks
void execute_task(task_info info, std::mutex& mutex); // fill the blanks
void process_in_parallel(
std::vector<task_info> task_infos,
std::size_t threads_count = std::thread::hardware_concurrency()
)
{
// Never use more threads than there is work.
threads_count = std::min(threads_count, task_infos.size());
std::atomic_size_t task_index{ 0 }; // Note: std::atomic_size_t task_index{} is UB!
std::mutex mutex{};
// Every thread will do this.
auto process = [&task_index, &task_infos, &mutex]
{
// The loop variable i indicates the next task to do on this thread.
// Note: `fetch_add` is a post-increment, basically task_index++.
for (std::size_t i; (i = task_index.fetch_add(1, std::memory_order_relaxed)) < task_infos.size(); )
{
execute_task(task_infos[i], mutex);
}
};
std::vector<std::thread> threads(threads_count); // Set up the threads.
for (auto& thread : threads) thread = std::thread{ process }; // Start the threads.
for (auto& thread : threads) thread.join(); // Don’t forget to join the threads!
}
What is std::memory_order_relaxed
? That is a memory fence and it is concerned with the relations of changes to multiple atomic variables are visible to threads. As long as you have one atomic variable only, use std::memory_order_relaxed
as it puts the least constraints on the optimizer. For that reason, do not ever use task_index++
because it does fetch_add(1, std::memory_order_seq_cst)
, i.e. it does what you want, but with a fence that puts unnecessary restrictions.
How to use the mutex? The mutex is locked before the resource is accessed and unlocked as soon as the thread does not use the resource anymore. In simple cases, you use a std::lock_guard
object to lock the mutex at its initialization and unlock the mutex at its destruction.
void execute_task(task_info info, std::mutex& mutex)
{
// work ...
{
std::lock_guard<std::mutex> lock{ mutex };
std::cout << "Task " << info.id() << " done 50%" << std::endl;
// lock_guard unlocks mutex at the closing brace
}
// work ...
{
std::lock_guard<std::mutex> lock{ mutex };
std::cout << "Task " << info.id() << " completed" << std::endl;
}
}
If the little scopes are too noisy for a single line of code, one can use a comma expression:
// work ...
std::lock_guard<std::mutex>{ mutex }, std::cout << "Task " << info.id() << " done 50%" << std::endl;
// work ...
std::lock_guard<std::mutex>{ mutex }, std::cout << "Task " << info.id() << " completed" << std::endl;
The unnamed temporary lock_guard
’s destructor runs at the next semicolon.
If you don’t actually have a shared mutable resource, you don’t need the mutex.
If task_infos.size() > SIZE_MAX - threads_count
the index i
overlfows/wraps because task_index
is incremented once for each task_info
(loop condition is true) plus one additional time for each thread (loop condition false). Unless threads_count
is unreasonably big or you have a gigantic number of task_infos
, this won’t be a problem, but if it might be, it must be mitigated by fetch_add
ing manually:
auto process = [&task_index, &task_infos, &mutex]
{
for (std::size_t i{}; i < task_infos.size(); )
{
if (task_index.compare_exchange_weak(i, i + 1, std::memory_order_relaxed))
execute_task(task_infos[i], mutex);
}
};
compare_exchange_weak
is the crucial part: It compares task_index
and i
; if they’re equal, sets task_index
to i + 1
(i.e. increments it) and returns true
; else, it sets i
to the current value of task_index
and returns false
. In our case, a return value of true
, i.e. a successful increment, means that this thread has won the race for i
and will perform the corresponding task. A return value of false
sets i
to a new value, which can be of one of two categories: An index to “bargain for” or task_infos.size()
. If it is the latter, the for
loop condition will fail and this thread stops doing tasks. Otherwise, it’s compare_exchange_weak
again, but with the new i
.
About the “weak” part, it means that compare_exchange_weak
is allowed return false
(and not update task_index
) even if i
and task_index
are equal (it may “spuriously fail” in technical terms). Because that is fine in our case, compare_exchange_strong
is not needed.
Is my reasoning in all of this correct?