4
\$\begingroup\$

I have the following simplification of a program which consists of 2 threads. One thread pushes packets to the back of a deque while another waits for user input before performing a "heavy" operation on the current deque (Here, we just look for the maximum id).

#include <iostream>
#include <thread>
#include <random>
#include <chrono>
#include <deque>
#include <string>
#include <mutex>

using namespace std;

struct Packet{
  int id;
  int size;
};

mutex deque_mutex;
deque<shared_ptr<Packet> > packet_deque;

void gen_packet(Packet* packet){
  // generate packets
  // packet size is 1-100 bytes
  // packet id is 0-1000
  random_device rd;
  mt19937 gen(rd());
  uniform_int_distribution<> size_dist(1, 100);
  uniform_int_distribution<> id_dist(0, 1000);
  int size = size_dist(gen);
  int id = id_dist(gen);
  packet->size = size;
  packet->id = id;
}

void packet_monitor(){
  // Gets a packet every 100th of a second
  // Add it to the deque
  // If the deque is full, remove the oldest packet
  // A deque is full if it has 1000 packets

  while(true){
    shared_ptr<Packet> packet = make_shared<Packet>();
    gen_packet(packet.get());
    unique_lock<mutex> lock(deque_mutex);
    if(packet_deque.size() == 1000){
      packet_deque.pop_front();
    }
    packet_deque.push_back(packet);
    lock.unlock();
    this_thread::sleep_for(chrono::milliseconds(10));
  }
}

void operate(){
  // Listens for a key stroke
  // It then looks for the packet with the biggest id
  while(true){
    string input;
    getline(std::cin, input);
    if(!input.empty()){
      int max_id = 0;
      int max_index_id = 0;
      unique_lock<mutex> lock(deque_mutex);
      deque<shared_ptr<Packet> > packet_deque_copy(packet_deque);
      lock.unlock();
      for(int i = 0; i < packet_deque_copy.size(); i++){
        if(packet_deque_copy[i]->id > max_id){
          max_id = packet_deque_copy[i]->id;
          max_index_id = i;
        }
      }
      cout << "Max id: " << max_id << " at " << max_index_id << "\n";
    }
  }
}

int main(){
  thread packet_monitor_thread(packet_monitor);
  thread operate_thread(operate);

  packet_monitor_thread.join();
  operate_thread.join();

  return 0;
}

I am not very familiar with cpp nor with concurrency, so I have some questions about the best way to solve this

  1. Do I have any memory leaks here? I am assuming the shared_ptr takes care of freeing the memories for packets that are deleted from the deque. Is that correct?
  2. I use a unique_lock here to try to tackle the potential deadlock that might arise from an exception being thrown inbetween the lock and unlock. Is there a more cpp way of doing this?
  3. I think that the locking and unlocking for every packet is a bit overkill. I can buffer 10 at a time and then push those. What are some of the implications of locking/unlocking so quickly?
  4. Here, I don't need to wait for a packet to come, but in the actual implementation I block until there is another packet available. Hence, I have both a blocking operation while waiting for input from the user as well as blocking for a packet to arrive. Am I introducing any potential deadlocks, or is there a better way to think about this?
\$\endgroup\$
1
  • \$\begingroup\$ I've completed my review now. Sorry to have to leave it overnight in a half-finished state! \$\endgroup\$ Commented Aug 19, 2023 at 8:28

1 Answer 1

5
\$\begingroup\$

General review

Please don't using namespace std;. That invites conflict with identifiers (and perhaps even changes in behaviour when compiled with later versions of standard library).

We include <atomic> and <string> but never use anything from them.

gen_packet() seems to assume its argument is never null - it's probably clearer to accept a reference instead of a pointer.

It certainly shouldn't be creating a new std::random_device and generator every time it is called. We should do that just once:

    static std::mt19937 gen{std::random_device{}()};

packet_monitor() never leaves its loop. This means that wait()ing for it will never return.

I don't like hardcoding the specific size (1000) in there - why shouldn't we be able to create one with larger size?

Instead of writing lock.unlock(), I find it clearer to use scope for locks:

        {
            std::lock_guard<std::mutex> _{deque_mutex};
            if (packet_deque.size() == 1000) [[likely]] {
                packet_deque.pop_front();
            }
            packet_deque.emplace_back(std::move(packet));
        }

(I've added a couple of small optimisations in there, too.)


operate() never checks the state of std::cin after calling std::getline(). So we busy-loop when input is disconnected.

The for loop uses the wrong type for i. Since we're comparing to a std::size_t, prefer to use the same type for i.

Instead of hand-coding the loop, we could be using std::max_element(), though that might cost more if we really do need the index (as we'd need std::distance() to derive that). We could create a std::ranges::enumerate_view to avoid that.


main() spawns two threads and waits for them both. That's three threads in total, but one of them is just waiting. Instead, we could call operate() directly from main(). And we could use std::jthread so that end of scope waits for completion.


Your questions

  1. I see no leaks. With no naked new or any C Library allocations, and no calls to std::unique_ptr::release(), every object is accounted for. That assessment is corroborated by a few runs under Valgrind.
  2. Scoped locks such as std::unique_lock or std::lock_guard are entirely appropriate here. I prefer not to unlock() them if avoidable, as mentioned above.
  3. We could buffer input, and push in batches, but only if it's acceptable for the reader to get stale data. For most platforms, lock acquisition is an important performance consideration, and much work has been put into optimising them, particularly in low-contention scenarios such as this. So I wouldn't be concerned about the overhead unless profiling identifies it as a problem.
  4. There should be no starvation as long as we don't hold the lock during the blocking operations. There cannot be deadlock if we only ever hold one lock at a time.

Performance considerations

Use of a std::deque is sub-optimal here, as we never need more than 1000 elements. What we have implemented is a ring buffer, and we can simply overwrite old values with the new ones as we go. Then we never need to allocate and release storage (in push_back() and pop_front()).

Since we're storing (smart) pointers, we have a natural null value for not-yet initialised ring-buffer elements, so we only need to track the insert position, and just filter nulls out when reading.


Suggested code

Here, I create a ring buffer, and move the mutex inside it, giving atomic functions to add a value and to take a copy of the contents.

I also avoid global variables, and use std::jthread to simplify the main().

#include <algorithm>
#include <chrono>
#include <iostream>
#include <iterator>
#include <mutex>
#include <random>
#include <ranges>
#include <thread>
#include <tuple>


struct Packet {
    int id;
    int size;
};

template<std::size_t N>
class PacketBuffer
{
    using pkt_ptr = std::shared_ptr<Packet>;
    using arr_t = std::array<pkt_ptr, N>;
    using iterator = arr_t::iterator;
    using const_iterator = arr_t::const_iterator;

    arr_t buffer = {};
    iterator insert_pos = buffer.begin();
    mutable std::mutex mutex = {};

public:
    void emplace_back(pkt_ptr packet)
    {
        std::lock_guard<std::mutex> _{mutex};
        *insert_pos++ = std::move(packet);
        if (insert_pos == buffer.end()) {
            insert_pos = buffer.begin();
        }
    }

    auto to_vector() const
    {
        std::lock_guard<std::mutex> _{mutex};
        const_iterator const p = insert_pos;
        std::vector<pkt_ptr> result;
        auto out = std::back_inserter(result);

        if (*insert_pos) [[likely]] {
            result.reserve(N);
            // put the older data at beginning of output
            std::copy(p, buffer.end(), out);
        } else {
            result.reserve(std::distance(buffer.begin(), p));
        }
        std::copy(buffer.begin(), p, out);
        return result;
    }
};

void gen_packet(Packet& packet)
{
    // generate packets
    // packet size is 1-100 bytes
    // packet id is 0-1000
    static std::mt19937 gen{std::random_device {}()};
    static std::uniform_int_distribution size_dist{0, 100};
    // static std::uniform_int_distribution id_dist{0, 1000};
    static std::poisson_distribution id_dist{600};
    packet.size = size_dist(gen);
    packet.id = id_dist(gen);
}

template<class Buffer>
void packet_monitor(Buffer& buffer)
{
    // Gets a packet every 100th of a second
    // and appends it to the buffer
    while (std::cin) {
        auto packet = std::make_shared<Packet>();
        gen_packet(*packet);
        buffer.emplace_back(std::move(packet));
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

template<class Buffer>
void operate(Buffer& buffer)
{
    // Listens for a newline
    // and then find the packet with the biggest id
    std::string input;
    while (std::getline(std::cin, input)) {
        auto buf_copy = buffer.to_vector();
        if (buf_copy.empty()) [[unlikely]] {
            // is this really right??
            std::cout << "Max id: 0 at 0\n";
        } else {
            auto const get_id
                // get the id from an enumerated pointer to packet
                = [](auto const& x){return std::get<1>(x)->id;};
            auto const [max_index_id, max_element]
                = *std::ranges::max_element(buf_copy | std::views::enumerate,
                                            std::less<>{}, get_id);
            std::cout << "Max id: " << max_element->id
                      << " at " << max_index_id
                      << '\n';
        }
    }
}

int main()
{
    PacketBuffer<1000> ringbuf;
    std::jthread packet_monitor_thread(packet_monitor<decltype(ringbuf)>,
                                       std::ref(ringbuf));
    operate(ringbuf);
}
\$\endgroup\$
9
  • \$\begingroup\$ Instead of to_vector(), you could also consider just having two PacketBuffer objects, and having a way to atomically swap them. I also wonder if std::shared_ptr can be replaced with std::unique_ptr. \$\endgroup\$
    – G. Sliepen
    Commented Aug 19, 2023 at 9:08
  • \$\begingroup\$ That would reset the data so far (the consumer is looking at the most recent N packets, potentially overlapping with a previous consumption - I wouldn't have added producer-consumer tag to the question if I'd understood that to start with). I did initially try copying the buffer, but found the to_vector() approach simpler, because it doesn't require knowledge of the insert position in the consumer. If we didn't care about the index, we could just copy the storage (without locks if we changed to atomic shared pointers) and filter out nulls. \$\endgroup\$ Commented Aug 19, 2023 at 9:20
  • 1
    \$\begingroup\$ One thing we might change is to result.reserve(N) outside the lock so we're not blocking while allocating storage. Given the buffer is more likely to be full, that's only over-allocating in the rare case. \$\endgroup\$ Commented Aug 19, 2023 at 9:22
  • \$\begingroup\$ Hm if the consumer is just interested in max(something), then resetting the data so far wouldn't be an issue (it just needs to remember the last maximum), but I get your point. The insert position can be abstracted away from the consumer by adding some more helper functions to PacketBuffer. Well, there are more ways to avoid some memory allocations at the cost of adding more complexity. @Bula: because you only show us a simplification of your actual program, we can't really tell you what the best solution is for your actual problem. \$\endgroup\$
    – G. Sliepen
    Commented Aug 19, 2023 at 9:25
  • \$\begingroup\$ It's a rolling maximum, so we'd need some mechanism to expire the remembered value. To do that, we'd need the buffer itself to maintain that knowledge and update as each value is added, I think. \$\endgroup\$ Commented Aug 19, 2023 at 9:36

Not the answer you're looking for? Browse other questions tagged or ask your own question.