From 25429998e373c12287ae8da2a1c9c1bbe7bd7047 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Tue, 14 Jun 2022 08:57:19 -0400 Subject: bounded_threadsafe_queue: Use constexpr capacity and mask While this is the primary change, we also: - Remove the mpsc namespace and rename Queue to MPSCQueue - Make Slot a private struct within MPSCQueue - Remove the AlignedAllocator template argument, as we use std::allocator - Replace instances of mask + 1 with capacity, and mask + 2 with capacity + 1 --- src/common/bounded_threadsafe_queue.h | 159 ++++++++++++++++------------------ src/video_core/gpu_thread.h | 2 +- 2 files changed, 74 insertions(+), 87 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index e83064c7f..7e465549b 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -1,10 +1,7 @@ // SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp // SPDX-License-Identifier: MIT + #pragma once -#ifdef _MSC_VER -#pragma warning(push) -#pragma warning(disable : 4324) -#endif #include #include @@ -12,105 +9,63 @@ #include #include #include -#include #include #include #include namespace Common { -namespace mpsc { + #if defined(__cpp_lib_hardware_interference_size) constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size; #else constexpr size_t hardware_interference_size = 64; #endif -template -using AlignedAllocator = std::allocator; - -template -struct Slot { - ~Slot() noexcept { - if (turn.test()) { - destroy(); - } - } - - template - void construct(Args&&... args) noexcept { - static_assert(std::is_nothrow_constructible_v, - "T must be nothrow constructible with Args&&..."); - std::construct_at(reinterpret_cast(&storage), std::forward(args)...); - } - - void destroy() noexcept { - static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); - std::destroy_at(reinterpret_cast(&storage)); - } - - T&& move() noexcept { - return reinterpret_cast(storage); - } - - // Align to avoid false sharing between adjacent slots - alignas(hardware_interference_size) std::atomic_flag turn{}; - struct aligned_store { - struct type { - alignas(T) unsigned char data[sizeof(T)]; - }; - }; - typename aligned_store::type storage; -}; +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4324) +#endif -template >> -class Queue { +template +class MPSCQueue { public: - explicit Queue(const size_t capacity, const Allocator& allocator = Allocator()) - : allocator_(allocator) { - if (capacity < 1) { - throw std::invalid_argument("capacity < 1"); - } - // Ensure that the queue length is an integer power of 2 - // This is so that idx(i) can be a simple i & mask_ insted of i % capacity - // https://github.com/rigtorp/MPMCQueue/pull/36 - if (!std::has_single_bit(capacity)) { - throw std::invalid_argument("capacity must be an integer power of 2"); - } - - mask_ = capacity - 1; - + explicit MPSCQueue() : allocator{std::allocator>()} { // Allocate one extra slot to prevent false sharing on the last slot - slots_ = allocator_.allocate(mask_ + 2); + slots = allocator.allocate(capacity + 1); // Allocators are not required to honor alignment for over-aligned types // (see http://eel.is/c++draft/allocator.requirements#10) so we verify // alignment here - if (reinterpret_cast(slots_) % alignof(Slot) != 0) { - allocator_.deallocate(slots_, mask_ + 2); + if (reinterpret_cast(slots) % alignof(Slot) != 0) { + allocator.deallocate(slots, capacity + 1); throw std::bad_alloc(); } - for (size_t i = 0; i < mask_ + 1; ++i) { - std::construct_at(&slots_[i]); + for (size_t i = 0; i < capacity; ++i) { + std::construct_at(&slots[i]); } + static_assert(std::has_single_bit(capacity), "capacity must be an integer power of 2"); static_assert(alignof(Slot) == hardware_interference_size, "Slot must be aligned to cache line boundary to prevent false sharing"); static_assert(sizeof(Slot) % hardware_interference_size == 0, "Slot size must be a multiple of cache line size to prevent " "false sharing between adjacent slots"); - static_assert(sizeof(Queue) % hardware_interference_size == 0, + static_assert(sizeof(MPSCQueue) % hardware_interference_size == 0, "Queue size must be a multiple of cache line size to " "prevent false sharing between adjacent queues"); } - ~Queue() noexcept { - for (size_t i = 0; i < mask_ + 1; ++i) { - slots_[i].~Slot(); + ~MPSCQueue() noexcept { + for (size_t i = 0; i < capacity; ++i) { + std::destroy_at(&slots[i]); } - allocator_.deallocate(slots_, mask_ + 2); + allocator.deallocate(slots, capacity + 1); } - // non-copyable and non-movable - Queue(const Queue&) = delete; - Queue& operator=(const Queue&) = delete; + // The queue must be both non-copyable and non-movable + MPSCQueue(const MPSCQueue&) = delete; + MPSCQueue& operator=(const MPSCQueue&) = delete; + + MPSCQueue(MPSCQueue&&) = delete; + MPSCQueue& operator=(MPSCQueue&&) = delete; void Push(const T& v) noexcept { static_assert(std::is_nothrow_copy_constructible_v, @@ -125,8 +80,8 @@ public: void Pop(T& v, std::stop_token stop) noexcept { auto const tail = tail_.fetch_add(1); - auto& slot = slots_[idx(tail)]; - if (false == slot.turn.test()) { + auto& slot = slots[idx(tail)]; + if (!slot.turn.test()) { std::unique_lock lock{cv_mutex}; cv.wait(lock, stop, [&slot] { return slot.turn.test(); }); } @@ -137,12 +92,46 @@ public: } private: + template + struct Slot { + ~Slot() noexcept { + if (turn.test()) { + destroy(); + } + } + + template + void construct(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible_v, + "T must be nothrow constructible with Args&&..."); + std::construct_at(reinterpret_cast(&storage), std::forward(args)...); + } + + void destroy() noexcept { + static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); + std::destroy_at(reinterpret_cast(&storage)); + } + + U&& move() noexcept { + return reinterpret_cast(storage); + } + + // Align to avoid false sharing between adjacent slots + alignas(hardware_interference_size) std::atomic_flag turn{}; + struct aligned_store { + struct type { + alignas(U) unsigned char data[sizeof(U)]; + }; + }; + typename aligned_store::type storage; + }; + template void emplace(Args&&... args) noexcept { static_assert(std::is_nothrow_constructible_v, "T must be nothrow constructible with Args&&..."); auto const head = head_.fetch_add(1); - auto& slot = slots_[idx(head)]; + auto& slot = slots[idx(head)]; slot.turn.wait(true); slot.construct(std::forward(args)...); slot.turn.test_and_set(); @@ -150,31 +139,29 @@ private: } constexpr size_t idx(size_t i) const noexcept { - return i & mask_; + return i & mask; } - std::conditional_t cv; - std::mutex cv_mutex; - size_t mask_; - Slot* slots_; - [[no_unique_address]] Allocator allocator_; + static constexpr size_t mask = capacity - 1; // Align to avoid false sharing between head_ and tail_ alignas(hardware_interference_size) std::atomic head_{0}; alignas(hardware_interference_size) std::atomic tail_{0}; + std::mutex cv_mutex; + std::condition_variable_any cv; + + Slot* slots; + [[no_unique_address]] std::allocator> allocator; + static_assert(std::is_nothrow_copy_assignable_v || std::is_nothrow_move_assignable_v, "T must be nothrow copy or move assignable"); static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); }; -} // namespace mpsc - -template >> -using MPSCQueue = mpsc::Queue; - -} // namespace Common #ifdef _MSC_VER #pragma warning(pop) #endif + +} // namespace Common diff --git a/src/video_core/gpu_thread.h b/src/video_core/gpu_thread.h index ad9fd5eff..be0ac2214 100644 --- a/src/video_core/gpu_thread.h +++ b/src/video_core/gpu_thread.h @@ -98,7 +98,7 @@ struct CommandDataContainer { struct SynchState final { using CommandQueue = Common::MPSCQueue; std::mutex write_lock; - CommandQueue queue{512}; // size must be 2^n + CommandQueue queue; u64 last_fence{}; std::atomic signaled_fence{}; std::condition_variable_any cv; -- cgit v1.2.3