6
\$\begingroup\$

My first attempt at writing a thread safe priority_queue. It is not the most efficient because the locks can be even more fine grained if I add implementation of heap instead of using priority_queue but later.

Goal is to create a thread safe priority queue with timestamp as the ordering. Then extend it where a thread pool can pick up the earliest entry by using wait_pop() and work on it.

struct Entry
{
   int timestamp;
   int key;
   int value;

   Entry(int _timestamp, int _key, int _value) : timestamp(_timestamp), key(_key), value(_value) {}

   bool operator < (const Entry& entry) const 
   {
      return timestamp < entry.timestamp; 
   }

   bool operator > (const Entry& entry) const 
   {
      return timestamp > entry.timestamp; 
   }
};

struct Compare
{
    template<class T>
    bool operator()(const T& a, const T& b) const 
    {
       return a > b;
    }
};

template<typename T>
class threadsafe_priority_queue
{
   private:
     std::mutex mtx;
     priority_queue<T, vector<T>, Compare> pq;
     std::condition_variable data_cond;

  public:
    threadsafe_priority_queue(){}

void push(T val)
{
   std::lock_guard<std::mutex> lk(mtx);
   pq.push(std::move(val));
   data_cond.notify_one();
}

//not using top() to avoid race condition between top() and pop()
//wait version
T wait_pop()
{
   std::unique_lock<std::mutex> lk(mtx);
   data_cond.wait(lk, [this] { return !pq.empty();});
   T val = std::move(pq.top());
   pq.pop();
   return val;
}

//nowait version
T nowait_pop()
{
    std::lock_guard<std::mutex> lk(mtx);
    if (pq.empty())
    {
        throw std::exception("Cannot pop an empty priority queue"); 
    }
    T val = std::move(pq.top());
    pq.pop();
    return val;
}

  bool empty() const
  {
     std::lock_guard<std::mutex> lk(mtx);
     return pq.empty();
  }
};

int main() {
   threadsafe_priority_queue<Entry> pq;
   pq.push(Entry(1, 2, 4));
   pq.push(Entry(5, 2, 4));
   pq.push(Entry(2, 2, 4));
   pq.push(Entry(7, 2, 4));
   Entry e = pq.wait_pop();
   cout << e.timestamp << endl;
   return 0;
}
\$\endgroup\$

2 Answers 2

10
\$\begingroup\$

Haven't used locks too often, but a few things I noticed in general:

  • Your code is incomplete and lacks headers, even though it seems to be meant as a complete example (e.g. including main()).

  • Indentation is inconsistent, but this might be a result of pasting the code here.

  • More of a personal preference, but I'm not a fan of using template parameters over and over again, so I'd add one or two local aliases to avoid having to repeat them:

    namespace {
      using unique_lock = std::unique_lock<std::mutex>;
      using lock_guard = std::lock_guard<std::mutex>;
    }
    
  • Instead of implementing your own comparator, you could just use std::greater (if available).

  • You're using cout, vector, endl, and priority_queue without namespace. Any chance you're using using namespace std;? As you can see, this causes inconsistent code and might cause trouble if you reuse code somewhere else. Plus there's always the danger of some naming conflicts.

  • Your example/testing code doesn't use multiple threads, so no matter how often you try, you won't run into issues, unless there's some inherent bug somewhere. You should at least spawn one additional thread and let both do something for a bit.

  • Compare should be called greater or greater_than (ideally in a local/anonymous namespace), since it's only providing this type of comparison. But as mentioned, might be obsolete.

  • nowait_pop() is certainly debatable, but how about no_wait_pop or pop_immediate() to keep words separated or functionality grouped together?

  • Entry(int _timestamp, int _key, int _value): I'm not a fan of starting parameters with an underscore. I'd only do it to denote parameters not yet used (e.g. for future revisions). IMO you can totally reuse the member names in such a trivial context:

    Entry(int timestamp, int key, int value) : timestamp(timestamp), key(key), value(value) {}
    

    This code isn't ambiguous in any way and there's no danger of modifying the wrong value by accident.

  • More of a bonus thing: This is a multiple readers, one (or more) writers scenario, so one could try to avoid std::lock_guard and use std::unique_lock everywhere. If three threads want to check empty() at the same time, just let them do so.

\$\endgroup\$
2
  • 2
    \$\begingroup\$ Another possible alternative name for nowait_pop() could be try_pop(). It would also be nice if it would return a std::optional<T> or something similar, as having to first check the result of empty() before calling nowait_pop() would be a TOCTTOU bug. \$\endgroup\$
    – G. Sliepen
    Commented Feb 3, 2022 at 11:43
  • 2
    \$\begingroup\$ Note that since C++17 it is possible to just write std::lock_guard guard(mutex) and deduction guide will kick in. \$\endgroup\$ Commented Feb 3, 2022 at 11:51
11
\$\begingroup\$

Design

Starvation. std::mutex does not guarantee starvation freedom. If there are too many producers and much fewer consumers, on systems with mutexes that don't guarantee ordering, the consuming threads will starve due to inability to acquire a lock. Although hand-written locks that do support correct order provide better safety, they will also have worse performance. The threads attempting to lock will wake up sporadically to check the value and overall it becomes a bit of a mess. The pros should be weighed against the cons.

wait_pop can deadlock. If the producers are all gone, then there is no possible way to unlock the waiting thread. There should be a function that allows to unlock anybody waiting and not allow them to enter critical section again because they will be stuck again.

Using exceptions for non-exceptional situations. It is much better to use std::optional in cases where empty value is possible. Exceptions are extremely slow along with diverting the control flow into a place possibly much further from the call site.

Allow to cancel the push. Sometimes data becomes too stale and a new one is arrived. It is better to drop the stale data because nobody cares about it. Provide a way for timed push that also accepts a predicate and will cancel push if predicate is false upon acquisition of the lock.

Implementation

Use semaphores. Semaphores are vastly superior when it comes to producer-consumer situations. Still have the mutex, and when one consumer empties the queue, acquire the semaphore (decrease counter from 1 to 0). When the producer transitions the state from empty to non-empty, post on the semaphore (increase counter from 0 to 1). Condition variables are prone to lost wake up problem, which are particularly annoying to deal with. Needless to say that consumers should proceed only when semaphore counter is 1.

Use a namespace. Some other library might use the same name and there might be name clash. In the best situation, it is a compiler error. In the worst, it is a silent unintentional mistake.

Put an upper bound. Unboundly growing queues are the sweet spot for DDoS attacks. Temporarily blocking producer might greatly reduce the amount of served requests, but it will never be zero. Note that this would help with consumer starvation too, as hitting the limit will allow consumers to progress. Note that now this might deadlock producers as when consumers are gone, they will be left waiting forever. Ability to wake up everybody becomes even more important.


Usually solutions are created to solve specific problems. It would be great to see actual use case in the next review request if it ever happens.

\$\endgroup\$
0

Not the answer you're looking for? Browse other questions tagged or ask your own question.