4
\$\begingroup\$

I decided to implement a vectorset, which is intended to be faster than std::set for the 3 fundamental operations, namely insert, erase, and lower_bound. The basic idea is to store some contiguous nodes in an array in order to improve cache locality, essentially "unrolling" the tree. (Of course this is a standard idea and it seems it is the motivation for B-trees, although it is definitely not a B-tree.) The goal is to have a slight performance improvement, and the implementation should be fairly high-level.

Update: it is different to boost.container's flat_set, because it seems flat_set uses only one underlying sequence container.

Some implementation points:

  • In some way it is a wrapper around std::map, allowing a simple design instead of reimplementing balanced trees from scratch. The (map) value is the array type, while the (map) key is the lowest (set) key in the array. The number of elements (set keys) in the array is bounded by a compile-time constant. This is the main invariant, and based on this some insertion strategy is used.
  • The array itself is a veque, a third-party single-header container which is a sort of std::vector with fast push_front() and faster insert, the idea being that you keep some capacity on both sides of the array. It has proven to be quite useful. Note that to avoid the dependency it is straightforward to replace it with a std::deque, but obviously the deque has been found to be slower.
  • Some features are not implemented (such as reverse iterators, merge, and an allocator template parameter). But I think the basic idea is there.
  • I don't know if I have implemented constness and forwarding in a fully correct manner. There's an unfortunate const_cast at the moment, which sounds wrong.
  • I used the rule of zero, since resources are managed by the std::map.
  • Instead of writing "manual" benchmarks, I decided to write a "trace" (basically a series of operations on the set structure, the operations being admittedly kind of random). I wrapped this in a very bare-bones executable. Then I used the command line benchmarking tool hyperfine to compare runtimes. This has the major disadvantage that it inherently also counts setup and teardown time, so speedup estimations may be faulty. At the same time it avoids boilerplate code or some benchmarking library to keep it simple, it's just to get an idea after all. A possible output is given at the end.

vectorset.hpp

#ifndef VECTORSET_VECTORSET_HPP
#define VECTORSET_VECTORSET_HPP

#include <map>
//see: https://github.com/Shmoopty/veque
#include "veque.hpp"

namespace vectorset{
    template<class Key, class Comparator = std::less<Key>, std::size_t bucketsize = 512>
    class vectorset {
    private:
        using Bucket = veque::veque<Key>;
        using BucketMap = std::map<Key, Bucket, Comparator>;
        BucketMap buckets{};
        std::size_t size_ = 0;
        Comparator comp{};
    public:
        std::size_t size() const noexcept{
            return size_;
        }

        bool empty() const noexcept{
            return size_==0;
        }

        class Iterator {
            friend class vectorset;
            std::size_t idx;
            typename BucketMap::const_iterator mapit;
        public:
            Iterator(const typename BucketMap::const_iterator &mapit, const std::size_t inner) :
                    mapit(mapit), idx(inner) {}

            const Key &operator*() const noexcept{
                return (mapit->second)[idx];
            }

            Iterator operator++() noexcept{
                ++idx;
                if (idx == mapit->second.size()) {
                    ++mapit;
                    idx = 0;
                }
                return *this;
            }

            Iterator operator--() noexcept{
                if (idx == 0) {
                    --mapit;
                    idx = mapit->second.size();
                }
                --idx;
                return *this;
            }

            bool operator==(const Iterator &other) const noexcept{
                return this->idx == other.idx && this->mapit == other.mapit;
            }

            bool operator!=(const Iterator &other) const noexcept{
                return !(*this == other);
            }
        };

        Iterator begin() const noexcept{
            return Iterator{buckets.begin(), 0};
        }

        Iterator end() const noexcept{
            return Iterator{buckets.end(), 0};
        }

        Iterator lower_bound(const Key &t) const noexcept{
            auto it = buckets.lower_bound(t);
            if (it == buckets.begin()) {
                return this->begin();
            }
            auto prev = it;
            --prev;
            auto &vec = prev->second;
            if (comp(vec.back(), t)) {
                // the key is between 2 buckets
                if (it != buckets.end()) {
                    return Iterator{it, 0};
                }
                return this->end();
            }
            auto innerit = std::lower_bound(vec.begin(), vec.end(), t, comp);
            //difference guaranteed to be positive
            std::size_t dist = std::size_t(innerit - vec.begin());
            return Iterator{prev, dist};
        }

        std::pair<Iterator, bool> insert(const Key &t) noexcept{
            if (buckets.empty()) {
                auto insert_it = insert_new_node(t);
                insert_it->second.push_back(t);
                ++size_;
                return {this->begin(), true};
            }

            auto it = buckets.lower_bound(t);

            if (it == buckets.begin()) {
                if (!comp(t, it->first)) {
                    return {this->begin(), false};
                }
                std::size_t currsize = it->second.size();
                if (currsize == bucketsize) {
                    auto insert_it = insert_new_node(t);
                    insert_it->second.push_back(t);
                    ++size_;
                    return {{insert_it, 0}, true};
                }
                update_lower_bound(it, t);
                std::size_t idx = insert_into(it->second, t);
                ++size_;
                return {{it, idx}, true};
            }
            //note that in case we have an input that is larger that all keys,
            //it means we decrement the end iterator.
            auto prev = it;
            --prev;

            auto bucket_it = std::lower_bound(prev->second.begin(), prev->second.end(), t, comp);
            if (bucket_it != prev->second.end() && !comp(t, *bucket_it)) {
                return {{prev, std::size_t(bucket_it - prev->second.begin())}, false};
            }
            // special case when t == key of it
            if (it != buckets.end() && !comp(t, it->first)) {
                return {{it, 0}, false};
            }

            std::size_t prevsize = prev->second.size();
            if (prevsize < bucketsize) {
                std::size_t idx = insert_at(prev->second, bucket_it, t);
                ++size_;
                return {{prev, idx}, true};
            }

            // key needs to be in bucket but bucket is full, split bucket
            if (comp(t, prev->second.back())) {
                return split_and_insert(t, prev);
            }

            if (it != buckets.end() && it->second.size() + 1 < bucketsize) {
                auto back_val = prev->second.back();
                update_lower_bound(it, back_val);
                prev->second.pop_back();

                //we know that val was smaller than smallest bucket element
                auto &nextbucket = it->second;
                nextbucket.push_front(t);
                nextbucket.push_front(back_val);
                ++size_;
                return {{it, 1}, true};
            }
            auto insert_it = insert_new_node(t);
            insert_it->second.push_back(t);
            ++size_;
            return {{insert_it, 0}, true};
        }

        Iterator find(const Key &t) const noexcept{
            auto it = lower_bound(t);
            if (it != this->end() && comp(t, *it)) {
                it = this->end();
            }
            return it;
        }

        std::size_t erase(const Key &t) noexcept{
            auto lb = lower_bound(t);
            if (lb != this->end() && !comp(t, *lb)) {
                erase(lb);
                return 1;
            }
            return 0;
        }

        Iterator erase(Iterator it) noexcept{
            // - by assumption the input `it` is dereferenceable
            // - `it.mapit` is a `map::const_iterator`
            Bucket &vec = *const_cast<Bucket *>( &((it.mapit)->second));
            vec.erase(vec.begin() + it.idx);
            --size_;
            if (vec.empty()) {
                buckets.erase(it.mapit);
            } else if (it.idx == 0) {
                update_lower_bound(it.mapit, vec.front());
            }
            ++it;
            return it;
        }

        bool contains(const Key &t) const noexcept{
            return this->find(t) != this->end();
        }

        Iterator upper_bound(const Key &t) const noexcept{
            auto it = lower_bound(t);
            if (it != this->end() && !comp(t, *it)) {
                ++it;
            }
            return it;
        }

    private:
        std::pair<Iterator, bool> split_and_insert(const Key &t,
                                                   const typename BucketMap::iterator& prev) noexcept {
            auto &vec = prev->second;
            auto &mid = vec[vec.size() / 2];

            auto insert_it = insert_new_node(mid);
            auto &upperbucket = insert_it->second;
            for (size_t i = vec.size() / 2; i < vec.size(); ++i) {
                upperbucket.push_back(vec[i]);
            }
            //pop preserves the capacity
            size_t k = vec.size() - vec.size() / 2;
            for (size_t i = 0; i < k; ++i) {
                vec.pop_back();
            }
            if (comp(t, mid)) {
                size_t idx = insert_into(vec, t);
                ++size_;
                return {{prev, idx}, true};
            }
            size_t idx = insert_into(upperbucket, t);
            ++size_;
            return {{insert_it, idx}, true};
        }

        std::size_t insert_into(Bucket &bucket, const Key &val_) noexcept{
            auto innerit = std::lower_bound(bucket.begin(), bucket.end(), val_, comp);
            std::size_t idx = std::size_t(innerit - bucket.begin());
            bucket.insert(innerit, val_);
            return idx;
        }

        std::size_t insert_at(Bucket &bucket, typename Bucket::iterator &it, const Key &val) noexcept{
            auto idx = std::size_t(it - bucket.begin());
            bucket.insert(it, val);
            return idx;
        }

        typename BucketMap::iterator insert_new_node(const Key &val_) noexcept{
            auto res = buckets.insert({val_, {}});
            res.first->second.reserve(bucketsize);
            return res.first;
        }

        void update_lower_bound(const typename BucketMap::const_iterator &it, const Key &value_new) noexcept{
            auto node = buckets.extract(it);
            node.key() = value_new;
            buckets.insert(std::move(node));
        }
    };
}
#endif //VECTORSET_VECTORSET_HPP

setperformance.cpp

#include <iostream>
#include <numeric>
#include <random>
#include <set>
#include "vectorset.hpp"

struct TestData {
    std::vector<int> numbers;

    explicit TestData(const size_t N, int seed = 0, int start =1) {
        numbers = std::vector<int>(N);
        std::iota(numbers.begin(), numbers.end(), start);
        if(seed !=0) {
            std::shuffle(numbers.begin(), numbers.end(),
                         std::mt19937_64(seed));
        }
    }
};

template<typename Container>
void run_trace(int N) {
    TestData td(N, 5);
    TestData next(N, 42, N+1);

    Container c{};

    for (const auto &number: td.numbers) {
        c.insert(number);
    }
    long acc = 0;
    for (const auto &number: td.numbers) {
        auto it = c.lower_bound(number);
        acc += *it;
    }

    int step = 50;
    int k =0;
    for (int i = 0; i < N ;) {
        for (int j = 0; j < step && i < N; ++j, ++i) {
            c.erase(td.numbers[i]);
        }
        for (int j = 0; j < step && k < N; ++j, ++k) {
            c.insert(next.numbers[k]);
        }
        acc += *c.begin();
    }

    for (const auto &number: td.numbers) {
        auto it = c.upper_bound(number);
        acc += *it;
    }
    c.insert(-1);
    for (const auto &number: next.numbers) {
        c.erase(number);
    }
    acc += *c.begin();
    while( ! c.empty()){
        c.erase(*c.begin());
    }
    std::cout << "accumulator: " << acc << "\n";
}

int main(int argc, char *argv[]) {
    if (argc > 2) {
        std::string container = argv[1];

        if ( !(container == "set" || container == "vectorset") ) {
            std::cout << "specified container '" << container << "' invalid\n";
            return 1;
        }
        int N;
        try {
            N = std::stoi(argv[2]);
        } catch (std::invalid_argument &) {
            std::cout << "invalid iterations parameter: " << argv[2] << "\n";
            return 1;
        }
        if (container == "set") {
            run_trace<std::set<int>>(N);
        }else{
        run_trace<vectorset::vectorset<int>>(N);
        }

    }
    return 0;
}

possible output:

hyperfine -w 5 './setperformance set 200000' './setperformance vectorset 200000'
Benchmark 1: ./setperformance set 200000
  Time (mean ± σ):     757.6 ms ±  17.0 ms    [User: 734.0 ms, System: 17.4 ms]
  Range (min … max):   733.1 ms … 791.4 ms    10 runs
 
Benchmark 2: ./setperformance vectorset 200000
  Time (mean ± σ):     199.4 ms ±   8.0 ms    [User: 191.1 ms, System: 5.9 ms]
  Range (min … max):   185.7 ms … 212.0 ms    13 runs
 
Summary
  './setperformance vectorset 200000' ran
    3.80 ± 0.17 times faster than './setperformance set 200000'

\$\endgroup\$
3
  • \$\begingroup\$ Have you checked out boost.container’s flatset? It has the same idea as yours. \$\endgroup\$ Commented Jun 9, 2022 at 9:13
  • \$\begingroup\$ I think I came across it, haven't tried it though. But from its description it appears it is not the same idea, since Boost's flatset simply inserts into 1 array, whereas here there are multiple arrays. The idea is to avoid expensive array insertions. \$\endgroup\$
    – verven
    Commented Jun 9, 2022 at 19:36
  • \$\begingroup\$ I always use Boost's iterator_adaptor or iterator_facade to define iterator types, since there are a lot of little requirements to make something technically an iterator in various C++ Standards, which helps them easily be used by template code. \$\endgroup\$
    – aschepler
    Commented Jun 10, 2022 at 14:31

1 Answer 1

2
\$\begingroup\$

Enable compiler warnings and fix them

My compiler warns about the wrong order of the initializer list for the constructor of vectorset::Iterator. I would simply swap the declaration of idx and mapit to fix this.

Naming

While in general, the way you named types, variables and functions is good, there are some inconsistencies. Some type names are in all lower case, some start with upper case. For example, there's vectorset but it's iterator is named Iterator. If you want to follow the naming conventions of the STL, then keep type names in lower case. The exception is for template arguments like Key and Comparator; those are capitalized.

Then there are some variables that end with an underscore. You only do this when there is a conflict between a member variable and a member function, like size_ and size(). However, I would try to be consistent here and use the same prefix or postfix for all member variables. Also, if function parameters and local variables normally don't get a prefix, make sure none of them do, like val_ in insert_new_node()

It's also weird to see const Key &val. That immediately makes me wonder whether this is a key or a value. I know it doesn't matter for a set, but if you use the type Key, I would ensure the variable is named key. If you do want to talk about values, then I recommend adding using value_type = Key and writing const value_type &val.

Avoid abbreviating too much, lest the meaning of a variable can no longer be discerned from its name. In particular, instead of t, write key.

Finally, it's a bit silly in my opinion to put a class vectorset in a namespace vectorset. That's just redundant. Putting your own classes in a namespace is still a good practice, but then I would use something more genral that you can use for multiple classes, like namespace verven or namespace fast_containers.

Too much noexcept

It is great to see functions marked const and noexcept. However, some functions are noexcept when they shouldn't. Consider insert(): the underlying containers might throw if they cannot allocate memory. By marking your insert() noexcept, that means the exception cannot be caught anymore, but instead will cause the program to terminate immediately. Remove noexcept from all functions that do something that might throw. This means checking which std::map and veque functions they call, but also think about Comparator and the constructor of Key potentially throwing. It might be instructive to check which members of std::set are noexcept.

Missing functionality

Some features are not implemented (such as reverse iterators, merge(), and an allocator template parameter). But I think the basic idea is there.

There are some other things missing that might prevent vectorset from being used as a drop-in replacement for std::set. For example, emplace(), but also the templated overloads of find() and related functions: especially for a set it is important that you allow comparing a Key against an arbitrary type. Consider a Key which is a struct consisting of a field that is a unique ID of some kind, plus some other fields with related information. In that case, you want to be able to search using the unique ID, without having to cast that to a Key first. For this to work, you have to write an overload like:

template<typename K>
Iterator find(const K &x) const {
    auto it = lower_bound(x);
    ...
}

And similarly, lower_bound() has to be templated of course, so the reference to K x can be passed all the way down to the std::map's lower_bound().

Unnecessary use of this->

It is almost never necessary to use this-> in C++. While I do think it makes the comparison operators are bit nicer to read (like in this->idx == other.idx), in other situations I don't think it offer any benefit.

Possibility to use of more C++17 features

Your code already requires C++17 or later. That means there are a few language features you could use that you are not using already. The first is using if-statements with initializers, for example:

std::size_t erase(const Key &t) noexcept{
    if (auto lb = lower_bound(t); lb != this->end() && !comp(t, *lb)) {
        erase(lb);
        return 1;
    } else {
        return 0;
    }
}

This leads to slightly more symmetric code, and limits the scope of some temporaries. You can also use structured bindings:

auto it = buckets.lower_bound(t);
...
auto &[it_key, bucket] = *it;
if (!comp(t, it_key)) {
    return {begin(), false};
}
std::size_t cursize = bucket.size();

Performance

Just a FYI: I tested the code with a regular std::deque instead of veque, and found that the performance of vectorset is then only 6% ± 2% higher than a std::set. This seems to suggest most of the performance difference you saw is largely due to veque, and without it there is little reason to use a vectorset.

\$\endgroup\$
3
  • \$\begingroup\$ Thanks, +1. Regarding performance: I don't really understand your point, because vectorset was never meant to be used with an std::deque so it's unsurprising that it's slower. However, strangely I somehow never tried it with simple std::vector until now. Turns out it is as fast as with veque. But generally I also tried it on another machine (with veque and vector), and the speedups were inconsequential (or rather, runtimes varied a lot). So I was too quick to jump to conclusions and I'll forget about this project. But your feedback was instructive. \$\endgroup\$
    – verven
    Commented Jun 13, 2022 at 22:37
  • \$\begingroup\$ Make sure you compile with optimizations on (with GCC or Clang, -O3 -march=native). It is very interesting though that changing the vector implementation results in this big difference in performance. It also makes me wonder if run_trace() is perhaps unrealistic. Also consider splitting up the test into multiple parts that are measured separately, like speed of linear and random insertion, speed of lookups, speed of deletions. Using a tool like perf you can try to find out which parts of your code are contributing the most to the run time. \$\endgroup\$
    – G. Sliepen
    Commented Jun 14, 2022 at 5:45
  • \$\begingroup\$ I completely agree that the tests should be much more granular, but right now it's just to get an idea. Nevertheless I retried it properly on a different machine and I get a ~2.5x speedup. If I may ask, did you try it with a std::vector or similar? (push_front needs to be changed to insert(v.begin()... of course) I'm a bit confused about what you tried out, and I would be a bit worried if you still get an insignificant speedup (if you tried something other than std::deque). \$\endgroup\$
    – verven
    Commented Jun 17, 2022 at 0:02

Not the answer you're looking for? Browse other questions tagged or ask your own question.