summaryrefslogtreecommitdiffstats
path: root/src/common/bounded_threadsafe_queue.h
blob: e83064c7f0ef30229b9971ed7b96e3ce92c59063 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se>
// SPDX-License-Identifier: MIT
#pragma once
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4324)
#endif

#include <atomic>
#include <bit>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <new>
#include <stdexcept>
#include <stop_token>
#include <type_traits>
#include <utility>

namespace Common {
namespace mpsc {
#if defined(__cpp_lib_hardware_interference_size)
constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size;
#else
constexpr size_t hardware_interference_size = 64;
#endif

template <typename T>
using AlignedAllocator = std::allocator<T>;

template <typename T>
struct Slot {
    ~Slot() noexcept {
        if (turn.test()) {
            destroy();
        }
    }

    template <typename... Args>
    void construct(Args&&... args) noexcept {
        static_assert(std::is_nothrow_constructible_v<T, Args&&...>,
                      "T must be nothrow constructible with Args&&...");
        std::construct_at(reinterpret_cast<T*>(&storage), std::forward<Args>(args)...);
    }

    void destroy() noexcept {
        static_assert(std::is_nothrow_destructible_v<T>, "T must be nothrow destructible");
        std::destroy_at(reinterpret_cast<T*>(&storage));
    }

    T&& move() noexcept {
        return reinterpret_cast<T&&>(storage);
    }

    // Align to avoid false sharing between adjacent slots
    alignas(hardware_interference_size) std::atomic_flag turn{};
    struct aligned_store {
        struct type {
            alignas(T) unsigned char data[sizeof(T)];
        };
    };
    typename aligned_store::type storage;
};

template <typename T, typename Allocator = AlignedAllocator<Slot<T>>>
class Queue {
public:
    explicit Queue(const size_t capacity, const Allocator& allocator = Allocator())
        : allocator_(allocator) {
        if (capacity < 1) {
            throw std::invalid_argument("capacity < 1");
        }
        // Ensure that the queue length is an integer power of 2
        // This is so that idx(i) can be a simple i & mask_ insted of i % capacity
        // https://github.com/rigtorp/MPMCQueue/pull/36
        if (!std::has_single_bit(capacity)) {
            throw std::invalid_argument("capacity must be an integer power of 2");
        }

        mask_ = capacity - 1;

        // Allocate one extra slot to prevent false sharing on the last slot
        slots_ = allocator_.allocate(mask_ + 2);
        // Allocators are not required to honor alignment for over-aligned types
        // (see http://eel.is/c++draft/allocator.requirements#10) so we verify
        // alignment here
        if (reinterpret_cast<uintptr_t>(slots_) % alignof(Slot<T>) != 0) {
            allocator_.deallocate(slots_, mask_ + 2);
            throw std::bad_alloc();
        }
        for (size_t i = 0; i < mask_ + 1; ++i) {
            std::construct_at(&slots_[i]);
        }
        static_assert(alignof(Slot<T>) == hardware_interference_size,
                      "Slot must be aligned to cache line boundary to prevent false sharing");
        static_assert(sizeof(Slot<T>) % hardware_interference_size == 0,
                      "Slot size must be a multiple of cache line size to prevent "
                      "false sharing between adjacent slots");
        static_assert(sizeof(Queue) % hardware_interference_size == 0,
                      "Queue size must be a multiple of cache line size to "
                      "prevent false sharing between adjacent queues");
    }

    ~Queue() noexcept {
        for (size_t i = 0; i < mask_ + 1; ++i) {
            slots_[i].~Slot();
        }
        allocator_.deallocate(slots_, mask_ + 2);
    }

    // non-copyable and non-movable
    Queue(const Queue&) = delete;
    Queue& operator=(const Queue&) = delete;

    void Push(const T& v) noexcept {
        static_assert(std::is_nothrow_copy_constructible_v<T>,
                      "T must be nothrow copy constructible");
        emplace(v);
    }

    template <typename P, typename = std::enable_if_t<std::is_nothrow_constructible_v<T, P&&>>>
    void Push(P&& v) noexcept {
        emplace(std::forward<P>(v));
    }

    void Pop(T& v, std::stop_token stop) noexcept {
        auto const tail = tail_.fetch_add(1);
        auto& slot = slots_[idx(tail)];
        if (false == slot.turn.test()) {
            std::unique_lock lock{cv_mutex};
            cv.wait(lock, stop, [&slot] { return slot.turn.test(); });
        }
        v = slot.move();
        slot.destroy();
        slot.turn.clear();
        slot.turn.notify_one();
    }

private:
    template <typename... Args>
    void emplace(Args&&... args) noexcept {
        static_assert(std::is_nothrow_constructible_v<T, Args&&...>,
                      "T must be nothrow constructible with Args&&...");
        auto const head = head_.fetch_add(1);
        auto& slot = slots_[idx(head)];
        slot.turn.wait(true);
        slot.construct(std::forward<Args>(args)...);
        slot.turn.test_and_set();
        cv.notify_one();
    }

    constexpr size_t idx(size_t i) const noexcept {
        return i & mask_;
    }

    std::conditional_t<true, std::condition_variable_any, std::condition_variable> cv;
    std::mutex cv_mutex;
    size_t mask_;
    Slot<T>* slots_;
    [[no_unique_address]] Allocator allocator_;

    // Align to avoid false sharing between head_ and tail_
    alignas(hardware_interference_size) std::atomic<size_t> head_{0};
    alignas(hardware_interference_size) std::atomic<size_t> tail_{0};

    static_assert(std::is_nothrow_copy_assignable_v<T> || std::is_nothrow_move_assignable_v<T>,
                  "T must be nothrow copy or move assignable");

    static_assert(std::is_nothrow_destructible_v<T>, "T must be nothrow destructible");
};
} // namespace mpsc

template <typename T, typename Allocator = mpsc::AlignedAllocator<mpsc::Slot<T>>>
using MPSCQueue = mpsc::Queue<T, Allocator>;

} // namespace Common

#ifdef _MSC_VER
#pragma warning(pop)
#endif