6
\$\begingroup\$

Here's a rather hastily made thread pool to suit my requirements and even for my future projects. It is working fine and executing tasks as intended but I want to improve more and I am not sure if it's the proper way to design a thread-pool as I am a self taught enthusiast, and rather weak as a language lawyer.

Tasks are stored in the queue. They are executed in order only when an idle thread is available from the pool. New tasks can be pushed into the queue. To initialize the pool, one has to specify the total number of threads and a bool variable which will be used to terminate the pool and threads gracefully.

#include "stdafx.h"
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
#include <queue>    
#include <chrono>    
#include <mutex>    
#include <condition_variable>

using namespace std;

typedef function<void(void)> task_t;

class thread_t
{
public:
    thread_t(int id, bool& running, condition_variable& cv)
        :id_(id)
        , running_(running)
        , idle_notify_cv_(cv)
    {
        idle_ = true;
        thread_ = new thread([=]() { run(); });
    }

    ~thread_t()
    {
        notify();
        cout << id_ << "  stopping \n";
        thread_->join();
    }

    void push(task_t task)
    {
        task_ = task;
        idle_ = false;
        cv_.notify_one();
    }

    void notify()
    {
        cv_.notify_all();
    }

    bool is_idle() const
    {
        return idle_;
    }
    int get_id() const
    {
        return id_;
    }

private:
    void run()
    {
        cout << id_ << "  starting  \n";
        while (running_)
        {
            unique_lock<mutex> lock(mu_);
            cv_.wait(lock, [=]() { return idle_ == false || !running_; });
            if (!running_) return;
            task_();
            cout << id_ << " :work done  \n";
            idle_ = true;
            idle_notify_cv_.notify_all();
        }
    }
private:
    condition_variable& idle_notify_cv_;
    mutex mu_;
    condition_variable cv_;
    task_t task_;
    thread* thread_;
    bool idle_;
    int id_;
    bool& running_;
};

class pool
{
public:
    pool(int n, bool& running)
        :nthreads_(n)
        ,running_(running)
    {
        if (n > std::thread::hardware_concurrency()) nthreads_ = n = std::thread::hardware_concurrency()-1;
        for (int i = 0; i < n; i++)
        {
            threads_.push_back(make_unique<thread_t >(i, running_, idle_notify_cv_));
        }

        pool_thread_ = new thread([=]() { run(); });
    }

    void push(task_t task)
    {
        unique_lock<mutex> lock(write_queue_mu_);
        tasks_.push(task);
        idle_notify_cv_.notify_one();
    }

    int get_idle()
    {
        for (int i = 0; i < nthreads_; i++)
        {
            if (threads_[i]->is_idle())
            {
                return i;
            }
        }
        return -1;
    }

    void run()
    {
        cout << " pool thread started \n " ;
        while (running_)
        {
            int idle;
            if (!tasks_.empty() && (idle = get_idle()) != -1)
            {
                unique_lock<mutex> lock(write_queue_mu_);
                idle_notify_cv_.wait(lock, [=]() { return idle != -1 || !running_; });
                if (!running_) return;
                auto task = tasks_.front();
                tasks_.pop(); 
                lock.unlock();
                cout << " thread# " << threads_[idle]->get_id() << " assigned a task \n";
                threads_[idle]->push(task);
            }
        }
    }


    ~pool()
    {
        pool_thread_->join();
        cout << " thread pool destroyed \n ";
    }

private:
    mutex write_queue_mu_;
    queue<task_t> tasks_;
    vector<unique_ptr<thread_t>> threads_;
    int nthreads_;
    bool& running_;
    condition_variable idle_notify_cv_;
    thread* pool_thread_;
};



int main()
{
    bool running = true;
    pool pool1(2, running);

    task_t task1 = []()
    {
        this_thread::sleep_for(chrono::seconds(2s));
        cout << " Task 1 executed \n";
    };

    task_t task2 = []()
    {
        this_thread::sleep_for(chrono::seconds(1s));
        cout << " Task 2 executed \n";
    };

    task_t task3= []()
    {
        this_thread::sleep_for(chrono::seconds(2s));
        cout << " Task 3 executed \n";
    };

    task_t task4 = []()
    {
        this_thread::sleep_for(chrono::seconds(1s));
        cout << " Task 4 executed \n";
    };

    pool1.push(task1);
    pool1.push(task2);
    pool1.push(task3);
    pool1.push(task4);

    this_thread::sleep_for(chrono::seconds(5s));
    running = false;

    return 0;
}
\$\endgroup\$
12
  • 2
    \$\begingroup\$ Welcome to CodeReview. Please do not add, remove, or edit code in a question after you've received an answer. The site policy is explained in What to do when someone answers. Since I've removed that section from my UB section it's fine since nothing was invalidated, but please keep that in mind for future questions. \$\endgroup\$
    – Zeta
    Commented Mar 17, 2018 at 2:18
  • \$\begingroup\$ @Zeta: Ok I got it, new to codereview \$\endgroup\$
    – ark1974
    Commented Mar 17, 2018 at 2:19
  • \$\begingroup\$ Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers. \$\endgroup\$
    – Mast
    Commented Mar 17, 2018 at 8:05
  • \$\begingroup\$ calls new but not delete! \$\endgroup\$ Commented Mar 19, 2018 at 3:45
  • \$\begingroup\$ has pointers but does not follow rule of three \$\endgroup\$ Commented Mar 19, 2018 at 3:46

3 Answers 3

6
\$\begingroup\$

Undefined behaviour

Your code contains undefined behaviour due to data races. The read-access on thread_t::idle_ in thread_t::is_idle isn't synchronized with possible write-access in thread_t::run. That's a data race. Same holds for thread_t::push. I can push a new task while the old one is currently processed.

Don't use using namespace std

Do not use using namespace std if your names are likely to collide with the ones from the standard library, and never use it in a header. You had to use thread_t since std::thread already exists. However, names that end with _t are reserved by POSIX. That's usually ignored, though.

Overall design

Your overall design seems sound, but some functions seem strange. Why is thread_t::notify() part of the public interface? Does pool::get_idle return the number of idle threads or the first id? Should a user be able to call pool::run() or thread_t::run()?

Move those functions into the private section. Make your classes easy to use and hard to misuse.

By the way, pool::run might contain a bug. After idle != -1 (in if), the lambda takes idle by copy ([=]). idle's value will never change at that point, so the check in the lambda is superfluous.

Delete forbidden functions

thread_t's copy constructor should get explicitly deleted, as well as its copy assignment. Depending on your use case, you maybe even want to prevent moves:

thread_t(thread_t&&) = delete;
thread_t(const thread_t&) = delete;
thread_t& operator=(thread_t&&) = delete;
thread_t& operator=(const thread_t&) = delete;

Add documentation

This depends on whether you want to re-use your code in a (future) project, but what is currently clear to you might not be as clear in some days/weeks/months. While we're at it, use a code formatter. Some of your single-statement ifs have braces, other's dont, e.g.

        // nice
        if (threads_[i]->is_idle())
        {
            return i;
        }

vs

    // not so nice :(
    if (n > std::thread::hardware_concurrency()) nthreads_ = n = std::thread::hardware_concurrency()-1;

Other than that you stuck to a single indentation/braces style, which is great.

Reduce complexity

You have both idle_notify_cv_ as a reference and cv_ as a referee. I guess you had two std::condition_variables at first and then removed one. If that's the case, I suggest you to remove one of them next time. The compiler will tell you where the variable has been used and you can do a case-by-case decision whether it still needs to get checked or can get removed.

Split functionality

This is likely a toy program for you to fiddle with std::thread, but for a library split your functionality into several files, e.g.

thread.h
thread.cpp
thread_pool.h
thread_pool.cpp

If you split your implementation from your header, the recompilation time should get down a lot if your application grows in size. It won't be noticeable on your current program, though.

\$\endgroup\$
6
  • \$\begingroup\$ Thanks for the quick inputs I will definitely work on that. \$\endgroup\$
    – ark1974
    Commented Mar 17, 2018 at 2:10
  • \$\begingroup\$ Are you sure about UB? There is idle = get_idle() assignment. \$\endgroup\$
    – vnp
    Commented Mar 17, 2018 at 2:11
  • \$\begingroup\$ @vnp missed that. Didn't expect a side-effect in that if. However, then idle will never be -1 since it's taken by copy. Hm.... \$\endgroup\$
    – Zeta
    Commented Mar 17, 2018 at 2:13
  • \$\begingroup\$ @Zeta: All your inputs are solid, can't be more better, I am super excited . \$\endgroup\$
    – ark1974
    Commented Mar 17, 2018 at 2:28
  • \$\begingroup\$ thread_t::notify() signals the thread-pool that an executor is over and if tasks are in queue, they can be executed also. \$\endgroup\$
    – ark1974
    Commented Mar 17, 2018 at 2:50
2
\$\begingroup\$

This is a test analysis of the above thread pool by pushing 100 tasks in the pool.

The result shows multi-threaded is much faster (x10 in this example) than ordinary function call , but in some simple cases it may vary. I want to reserve 1-2 threads in the pool for critical time bound tasks in the next step.

I can't edit my post, but std::this_thread::sleep_for(std::chrono::milliseconds(1)) may inserted in pool::run() while loop to prevent blocking during task push

int main()
{
    bool running = true;
    thread_pool pool(10, running);

    task_t task = []()
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    };

    auto start =  std::chrono::system_clock::now(); 
    for(int i= 0;i<100;i++)
        pool.push_task(task);
    while (!pool.empty())
    {
        std::this_thread::sleep_for(std::chrono::nanoseconds(30));
    }
    auto stop = std::chrono::system_clock::now();
    std::cout <<"Multithreaded  took "<< (std::chrono::duration_cast<std::chrono::milliseconds>(stop - start)).count()/1000 << " sec\n";


    start = std::chrono::system_clock::now();
    for (int i = 0; i<100; i++)
        task();
    stop = std::chrono::system_clock::now();
    std::cout <<"Plain function took "<< (std::chrono::duration_cast<std::chrono::milliseconds>(stop - start)).count()/1000 << " sec\n";


    running = false;
    return 0;
}

Here is the interesting result ( excluding pool setup time): enter image description here

\$\endgroup\$
1
  • \$\begingroup\$ As the sleep does no work, it allows the core to be assigned to a new thread. Therefore all 100 sleeps can run in parallel (as no work is being done any of them). Therefore an optimal implementation should finish in slightly over 100 ms. A run time of 1 second is extremely slow in my opinion for this test. Just saw that you only use 10 threads. So you get the expected result for doing no work (as it can easily run 10 no work threads in parallel). I don't think this test proves much. \$\endgroup\$ Commented Mar 19, 2018 at 4:01
0
\$\begingroup\$

One very important issue hasn't been mentioned so far. You should definitively redesign the active wait loop when waiting for a thread in run method:

void run()
{
    while (running_)
    {
        if (!tasks_.empty() && (idle = get_idle()) != -1)
        {
            // ...
        }
    }
}

This active loop consumes CPU time when there is a task pending but no available worker thread. So not only all the workers are busy doing their work, but you also keeps busy the dispatcher. Better approach is to use some signalling mechanism that will pause your run method and resume when some of tasks completes.

And one more detail. Passing of reference running flag to pool constructor is basically not a good idea. Please note if you pass false to constructor and later you decide to activate your pool by changing the reference, it will silently fail - nothing starts run method again. You should consider implementing the enable/disable logic in public methods rather than simple boolean flag.

\$\endgroup\$
1
  • \$\begingroup\$ Nice catch. I miss that. \$\endgroup\$
    – ark1974
    Commented Mar 19, 2018 at 18:15

Not the answer you're looking for? Browse other questions tagged or ask your own question.