5

Can any expert help me understand the need for the following members of CNode in src/net.h

std::deque<CSerializeData> vSendMsg;
std::deque<CInv> vRecvGetData;
std::deque<CNetMessage> vRecvMsg;

If I understand the source code correct, vRecvMsg is for storing the input messages and vSendMsg is a queue for output messages.

  • Why do we have different form of queues [i.e. CNetMessage and CSerializeData] for input and output?
  • What is the purpose of vRevGetData? Can you please help me understand also the concept behind the usage of this ?

1 Answer 1

6

The question is from 2015; some things have changed since then.

CNetMessage is a transport protocol agnostic message container. It contains the received message data (DataStream), time of message receipt, payload size, and other information. It is used to decompose messages from the network. See the following code:

bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
{
    complete = false;
    const auto time = GetTime<std::chrono::microseconds>();
    LOCK(cs_vRecv);
    m_last_recv = std::chrono::duration_cast<std::chrono::seconds>(time);
    nRecvBytes += msg_bytes.size();
    while (msg_bytes.size() > 0) {
        // absorb network data
        if (!m_transport->ReceivedBytes(msg_bytes)) {
            // Serious transport problem, disconnect from the peer.
            return false;
        }

        if (m_transport->ReceivedMessageComplete()) {
            // decompose a transport agnostic CNetMessage from the deserializer
            bool reject_message{false};
            CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message);
            if (reject_message) {
                // Message deserialization failed. Drop the message but don't disconnect the peer.
                // store the size of the corrupt message
                mapRecvBytesPerMsgType.at(NET_MESSAGE_TYPE_OTHER) += msg.m_raw_message_size;
                continue;
            }

            // Store received bytes per message type.
            // To prevent a memory DOS, only allow known message types.
            auto i = mapRecvBytesPerMsgType.find(msg.m_type);
            if (i == mapRecvBytesPerMsgType.end()) {
                i = mapRecvBytesPerMsgType.find(NET_MESSAGE_TYPE_OTHER);
            }
            assert(i != mapRecvBytesPerMsgType.end());
            i->second += msg.m_raw_message_size;

            // push the message to the process queue,
            vRecvMsg.push_back(std::move(msg));

            complete = true;
        }
    }

    return true;
}

After receiving the message, we push it to vRecvMsg and then put it in a queue to be processed. In ProcessMessage, we get the message data (DataStream) and process it according to the message type.

bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
{
    AssertLockHeld(g_msgproc_mutex);

    PeerRef peer = GetPeerRef(pfrom->GetId());
    if (peer == nullptr) return false;

    {
        LOCK(peer->m_getdata_requests_mutex);
        if (!peer->m_getdata_requests.empty()) {
            ProcessGetData(*pfrom, *peer, interruptMsgProc);
        }
    }

    const bool processed_orphan = ProcessOrphanTx(*peer);

    if (pfrom->fDisconnect)
        return false;

    if (processed_orphan) return true;

    // this maintains the order of responses
    // and prevents m_getdata_requests to grow unbounded
    {
        LOCK(peer->m_getdata_requests_mutex);
        if (!peer->m_getdata_requests.empty()) return true;
    }

    // Don't bother if send buffer is too full to respond anyway
    if (pfrom->fPauseSend) return false;

    auto poll_result{pfrom->PollMessage()};
    if (!poll_result) {
        // No message to process
        return false;
    }

    CNetMessage& msg{poll_result->first};
    bool fMoreWork = poll_result->second;

    TRACE6(net, inbound_message,
        pfrom->GetId(),
        pfrom->m_addr_name.c_str(),
        pfrom->ConnectionTypeAsString().c_str(),
        msg.m_type.c_str(),
        msg.m_recv.size(),
        msg.m_recv.data()
    );

    if (m_opts.capture_messages) {
        CaptureMessage(pfrom->addr, msg.m_type, MakeUCharSpan(msg.m_recv), /*is_incoming=*/true);
    }

    try {
        ProcessMessage(*pfrom, msg.m_type, msg.m_recv, msg.m_time, interruptMsgProc);
        if (interruptMsgProc) return false;
        {
            LOCK(peer->m_getdata_requests_mutex);
            if (!peer->m_getdata_requests.empty()) fMoreWork = true;
        }
        // Does this peer has an orphan ready to reconsider?
        // (Note: we may have provided a parent for an orphan provided
        //  by another peer that was already processed; in that case,
        //  the extra work may not be noticed, possibly resulting in an
        //  unnecessary 100ms delay)
        if (m_orphanage.HaveTxToReconsider(peer->m_id)) fMoreWork = true;
    } catch (const std::exception& e) {
        LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size, e.what(), typeid(e).name());
    } catch (...) {
        LogPrint(BCLog::NET, "%s(%s, %u bytes): Unknown exception caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size);
    }

    return fMoreWork;
}

In the case of vSendMsg, this is a vector of CSerializedNetMsg. CSerializedNetMsg structure is simple. It holds the message data (a vector of unsigned char) and its type. As its name indicates, it represents the serialized message. You can notice in the codebase that NetMsg::Make is commonly used to construct it. This function accepts a string parameter representing the message type and any other parameter except that will be used to compose the msg data.

namespace NetMsg {
    template <typename... Args>
    CSerializedNetMsg Make(std::string msg_type, Args&&... args)
    {
        CSerializedNetMsg msg;
        msg.m_type = std::move(msg_type);
        VectorWriter{msg.data, 0, std::forward<Args>(args)...};
        return msg;
    }
} // namespace NetMsg

Now, what I understand about the difference between CSerializedNetMsg and CNetMessage is:

  1. CSerializedNetMsg seems lighter.
  2. CNetMessage has additional members (m_time, m_message_size, m_raw_message_size)
  3. Only one CNetMessage object for the same message will exist. CSerializedNetMsg has a specific method for copies.

Although they seem similar, they have specific characteristics for their purposes. For example, it is important to deal with received messages with CNetMessage, among other reasons, to know exactly the size of the data we receive to track the process queue size. Copy from CSerializedNetMsg can be useful when sending the same message for more than one node.

    m_connman.ForEachNode([this, pindex, &lazy_ser, &hashBlock](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) {
        AssertLockHeld(::cs_main);

        if (pnode->GetCommonVersion() < INVALID_CB_NO_BAN_VERSION || pnode->fDisconnect)
            return;
        ProcessBlockAvailability(pnode->GetId());
        CNodeState &state = *State(pnode->GetId());
        // If the peer has, or we announced to them the previous block already,
        // but we don't think they have this one, go ahead and announce it
        if (state.m_requested_hb_cmpctblocks && !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->pprev)) {

            LogPrint(BCLog::NET, "%s sending header-and-ids %s to peer=%d\n", "PeerManager::NewPoWValidBlock",
                    hashBlock.ToString(), pnode->GetId());

            const CSerializedNetMsg& ser_cmpctblock{lazy_ser.get()};
            PushMessage(*pnode, ser_cmpctblock.Copy());
            state.pindexBestHeaderSent = pindex;
        }
    });

About vRecvGetData, it was removed in #19911. Now we have m_getdata_requests as a member of Peer, when a node receives a GETDATA message, it stores the INVs into m_getdata_requests. It is used to control what a node requested of us (e.g., a transaction).

Not the answer you're looking for? Browse other questions tagged or ask your own question.