My first attempt at writing a thread safe priority_queue. It is not the most efficient because the locks can be even more fine grained if I add implementation of heap instead of using priority_queue but later.
Goal is to create a thread safe priority queue with timestamp as the ordering. Then extend it where a thread pool can pick up the earliest entry by using wait_pop() and work on it.
struct Entry
{
int timestamp;
int key;
int value;
Entry(int _timestamp, int _key, int _value) : timestamp(_timestamp), key(_key), value(_value) {}
bool operator < (const Entry& entry) const
{
return timestamp < entry.timestamp;
}
bool operator > (const Entry& entry) const
{
return timestamp > entry.timestamp;
}
};
struct Compare
{
template<class T>
bool operator()(const T& a, const T& b) const
{
return a > b;
}
};
template<typename T>
class threadsafe_priority_queue
{
private:
std::mutex mtx;
priority_queue<T, vector<T>, Compare> pq;
std::condition_variable data_cond;
public:
threadsafe_priority_queue(){}
void push(T val)
{
std::lock_guard<std::mutex> lk(mtx);
pq.push(std::move(val));
data_cond.notify_one();
}
//not using top() to avoid race condition between top() and pop()
//wait version
T wait_pop()
{
std::unique_lock<std::mutex> lk(mtx);
data_cond.wait(lk, [this] { return !pq.empty();});
T val = std::move(pq.top());
pq.pop();
return val;
}
//nowait version
T nowait_pop()
{
std::lock_guard<std::mutex> lk(mtx);
if (pq.empty())
{
throw std::exception("Cannot pop an empty priority queue");
}
T val = std::move(pq.top());
pq.pop();
return val;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mtx);
return pq.empty();
}
};
int main() {
threadsafe_priority_queue<Entry> pq;
pq.push(Entry(1, 2, 4));
pq.push(Entry(5, 2, 4));
pq.push(Entry(2, 2, 4));
pq.push(Entry(7, 2, 4));
Entry e = pq.wait_pop();
cout << e.timestamp << endl;
return 0;
}