I have the following simplification of a program which consists of 2 threads. One thread pushes packets to the back of a deque while another waits for user input before performing a "heavy" operation on the current deque (Here, we just look for the maximum id).
#include <iostream>
#include <thread>
#include <random>
#include <chrono>
#include <deque>
#include <string>
#include <mutex>
using namespace std;
struct Packet{
int id;
int size;
};
mutex deque_mutex;
deque<shared_ptr<Packet> > packet_deque;
void gen_packet(Packet* packet){
// generate packets
// packet size is 1-100 bytes
// packet id is 0-1000
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> size_dist(1, 100);
uniform_int_distribution<> id_dist(0, 1000);
int size = size_dist(gen);
int id = id_dist(gen);
packet->size = size;
packet->id = id;
}
void packet_monitor(){
// Gets a packet every 100th of a second
// Add it to the deque
// If the deque is full, remove the oldest packet
// A deque is full if it has 1000 packets
while(true){
shared_ptr<Packet> packet = make_shared<Packet>();
gen_packet(packet.get());
unique_lock<mutex> lock(deque_mutex);
if(packet_deque.size() == 1000){
packet_deque.pop_front();
}
packet_deque.push_back(packet);
lock.unlock();
this_thread::sleep_for(chrono::milliseconds(10));
}
}
void operate(){
// Listens for a key stroke
// It then looks for the packet with the biggest id
while(true){
string input;
getline(std::cin, input);
if(!input.empty()){
int max_id = 0;
int max_index_id = 0;
unique_lock<mutex> lock(deque_mutex);
deque<shared_ptr<Packet> > packet_deque_copy(packet_deque);
lock.unlock();
for(int i = 0; i < packet_deque_copy.size(); i++){
if(packet_deque_copy[i]->id > max_id){
max_id = packet_deque_copy[i]->id;
max_index_id = i;
}
}
cout << "Max id: " << max_id << " at " << max_index_id << "\n";
}
}
}
int main(){
thread packet_monitor_thread(packet_monitor);
thread operate_thread(operate);
packet_monitor_thread.join();
operate_thread.join();
return 0;
}
I am not very familiar with cpp nor with concurrency, so I have some questions about the best way to solve this
- Do I have any memory leaks here? I am assuming the
shared_ptr
takes care of freeing the memories for packets that are deleted from the deque. Is that correct? - I use a
unique_lock
here to try to tackle the potential deadlock that might arise from an exception being thrown inbetween thelock
andunlock
. Is there a more cpp way of doing this? - I think that the locking and unlocking for every packet is a bit overkill. I can buffer 10 at a time and then push those. What are some of the implications of locking/unlocking so quickly?
- Here, I don't need to wait for a packet to come, but in the actual implementation I block until there is another packet available. Hence, I have both a blocking operation while waiting for input from the user as well as blocking for a packet to arrive. Am I introducing any potential deadlocks, or is there a better way to think about this?