From 306840a5808cae10bf5d91e4b6e8a91cd619386b Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 03:19:25 -0400 Subject: bounded_threadsafe_queue: Use simplified impl of bounded queue Provides a simplified SPSC, MPSC, and MPMC bounded queue implementation using mutexes. --- src/common/bounded_threadsafe_queue.h | 311 ++++++++++++++++++++++------------ src/video_core/gpu_thread.cpp | 7 +- 2 files changed, 203 insertions(+), 115 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 14e887c70..e03427539 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -1,159 +1,246 @@ -// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp -// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: Copyright 2023 yuzu Emulator Project +// SPDX-License-Identifier: GPL-2.0-or-later #pragma once #include -#include #include -#include +#include #include #include -#include -#include #include "common/polyfill_thread.h" namespace Common { -#if defined(__cpp_lib_hardware_interference_size) -constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size; -#else -constexpr size_t hardware_interference_size = 64; -#endif +namespace detail { +constexpr size_t DefaultCapacity = 0x1000; +} // namespace detail + +template +class SPSCQueue { + static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); -template -class MPSCQueue { public: - explicit MPSCQueue() : allocator{std::allocator>()} { - // Allocate one extra slot to prevent false sharing on the last slot - slots = allocator.allocate(capacity + 1); - // Allocators are not required to honor alignment for over-aligned types - // (see http://eel.is/c++draft/allocator.requirements#10) so we verify - // alignment here - if (reinterpret_cast(slots) % alignof(Slot) != 0) { - allocator.deallocate(slots, capacity + 1); - throw std::bad_alloc(); - } - for (size_t i = 0; i < capacity; ++i) { - std::construct_at(&slots[i]); - } - static_assert(std::has_single_bit(capacity), "capacity must be an integer power of 2"); - static_assert(alignof(Slot) == hardware_interference_size, - "Slot must be aligned to cache line boundary to prevent false sharing"); - static_assert(sizeof(Slot) % hardware_interference_size == 0, - "Slot size must be a multiple of cache line size to prevent " - "false sharing between adjacent slots"); - static_assert(sizeof(MPSCQueue) % hardware_interference_size == 0, - "Queue size must be a multiple of cache line size to " - "prevent false sharing between adjacent queues"); - } - - ~MPSCQueue() noexcept { - for (size_t i = 0; i < capacity; ++i) { - std::destroy_at(&slots[i]); + void Push(T&& t) { + const size_t write_index = m_write_index.load(); + + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); } - allocator.deallocate(slots, capacity + 1); + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Push into the queue. + m_data[pos] = std::move(t); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); } - // The queue must be both non-copyable and non-movable - MPSCQueue(const MPSCQueue&) = delete; - MPSCQueue& operator=(const MPSCQueue&) = delete; + template + void Push(Args&&... args) { + const size_t write_index = m_write_index.load(); + + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + } - MPSCQueue(MPSCQueue&&) = delete; - MPSCQueue& operator=(MPSCQueue&&) = delete; + bool TryPop(T& t) { + return Pop(t); + } - void Push(const T& v) noexcept { - static_assert(std::is_nothrow_copy_constructible_v, - "T must be nothrow copy constructible"); - emplace(v); + void PopWait(T& t, std::stop_token stop_token) { + Wait(stop_token); + Pop(t); } - template >> - void Push(P&& v) noexcept { - emplace(std::forward

(v)); + T PopWait(std::stop_token stop_token) { + Wait(stop_token); + T t; + Pop(t); + return t; } - void Pop(T& v, std::stop_token stop) noexcept { - auto const tail = tail_.fetch_add(1); - auto& slot = slots[idx(tail)]; - if (!slot.turn.test()) { - std::unique_lock lock{cv_mutex}; - Common::CondvarWait(cv, lock, stop, [&slot] { return slot.turn.test(); }); + void Clear() { + while (!Empty()) { + Pop(); } - v = slot.move(); - slot.destroy(); - slot.turn.clear(); - slot.turn.notify_one(); + } + + bool Empty() const { + return m_read_index.load() == m_write_index.load(); + } + + size_t Size() const { + return m_write_index.load() - m_read_index.load(); } private: - template - struct Slot { - ~Slot() noexcept { - if (turn.test()) { - destroy(); - } - } + void Pop() { + const size_t read_index = m_read_index.load(); - template - void construct(Args&&... args) noexcept { - static_assert(std::is_nothrow_constructible_v, - "T must be nothrow constructible with Args&&..."); - std::construct_at(reinterpret_cast(&storage), std::forward(args)...); + // Check if the queue is empty. + if (read_index == m_write_index.load()) { + return; } - void destroy() noexcept { - static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); - std::destroy_at(reinterpret_cast(&storage)); - } + // Determine the position to read from. + const size_t pos = read_index % Capacity; + + // Pop the data off the queue, deleting it. + std::destroy_at(std::addressof(m_data[pos])); + + // Increment the read index. + ++m_read_index; + } - U&& move() noexcept { - return reinterpret_cast(storage); + bool Pop(T& t) { + const size_t read_index = m_read_index.load(); + + // Check if the queue is empty. + if (read_index == m_write_index.load()) { + return false; } - // Align to avoid false sharing between adjacent slots - alignas(hardware_interference_size) std::atomic_flag turn{}; - struct aligned_store { - struct type { - alignas(U) unsigned char data[sizeof(U)]; - }; - }; - typename aligned_store::type storage; - }; + // Determine the position to read from. + const size_t pos = read_index % Capacity; - template - void emplace(Args&&... args) noexcept { - static_assert(std::is_nothrow_constructible_v, - "T must be nothrow constructible with Args&&..."); - auto const head = head_.fetch_add(1); - auto& slot = slots[idx(head)]; - slot.turn.wait(true); - slot.construct(std::forward(args)...); - slot.turn.test_and_set(); - cv.notify_one(); + // Pop the data off the queue, moving it. + t = std::move(m_data[pos]); + + // Increment the read index. + ++m_read_index; + + return true; } - constexpr size_t idx(size_t i) const noexcept { - return i & mask; + void Wait(std::stop_token stop_token) { + std::unique_lock lock{cv_mutex}; + Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); } - static constexpr size_t mask = capacity - 1; + alignas(128) std::atomic_size_t m_read_index{0}; + alignas(128) std::atomic_size_t m_write_index{0}; - // Align to avoid false sharing between head_ and tail_ - alignas(hardware_interference_size) std::atomic head_{0}; - alignas(hardware_interference_size) std::atomic tail_{0}; + std::array m_data; - std::mutex cv_mutex; std::condition_variable_any cv; + std::mutex cv_mutex; +}; + +template +class MPSCQueue { +public: + void Push(T&& t) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::move(t)); + } + + template + void Push(Args&&... args) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::forward(args)...); + } + + bool TryPop(T& t) { + return spsc_queue.TryPop(t); + } + + void PopWait(T& t, std::stop_token stop_token) { + spsc_queue.PopWait(t, stop_token); + } + + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + + void Clear() { + spsc_queue.Clear(); + } + + bool Empty() { + return spsc_queue.Empty(); + } + + size_t Size() { + return spsc_queue.Size(); + } + +private: + SPSCQueue spsc_queue; + std::mutex write_mutex; +}; + +template +class MPMCQueue { +public: + void Push(T&& t) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::move(t)); + } - Slot* slots; - [[no_unique_address]] std::allocator> allocator; + template + void Push(Args&&... args) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::forward(args)...); + } - static_assert(std::is_nothrow_copy_assignable_v || std::is_nothrow_move_assignable_v, - "T must be nothrow copy or move assignable"); + bool TryPop(T& t) { + std::scoped_lock lock{read_mutex}; + return spsc_queue.TryPop(t); + } + + void PopWait(T& t, std::stop_token stop_token) { + std::scoped_lock lock{read_mutex}; + spsc_queue.PopWait(t, stop_token); + } - static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); + T PopWait(std::stop_token stop_token) { + std::scoped_lock lock{read_mutex}; + return spsc_queue.PopWait(stop_token); + } + + void Clear() { + std::scoped_lock lock{read_mutex}; + spsc_queue.Clear(); + } + + bool Empty() { + std::scoped_lock lock{read_mutex}; + return spsc_queue.Empty(); + } + + size_t Size() { + std::scoped_lock lock{read_mutex}; + return spsc_queue.Size(); + } + +private: + SPSCQueue spsc_queue; + std::mutex write_mutex; + std::mutex read_mutex; }; } // namespace Common diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp index f52f9e28f..469a59cf9 100644 --- a/src/video_core/gpu_thread.cpp +++ b/src/video_core/gpu_thread.cpp @@ -31,9 +31,10 @@ static void RunThread(std::stop_token stop_token, Core::System& system, auto current_context = context.Acquire(); VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer(); + CommandDataContainer next; + while (!stop_token.stop_requested()) { - CommandDataContainer next; - state.queue.Pop(next, stop_token); + state.queue.PopWait(next, stop_token); if (stop_token.stop_requested()) { break; } @@ -117,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) { std::unique_lock lk(state.write_lock); const u64 fence{++state.last_fence}; - state.queue.Push(CommandDataContainer(std::move(command_data), fence, block)); + state.queue.Push(std::move(command_data), fence, block); if (block) { Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] { -- cgit v1.2.3 From f28ca5361f5242f5b65a5c54bdde55639fba71f5 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 03:20:05 -0400 Subject: logging: Make use of bounded queue --- src/common/logging/backend.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index 2a3bded40..e1ce9db99 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -28,7 +28,7 @@ #ifdef _WIN32 #include "common/string_util.h" #endif -#include "common/threadsafe_queue.h" +#include "common/bounded_threadsafe_queue.h" namespace Common::Log { @@ -204,11 +204,11 @@ public: void PushEntry(Class log_class, Level log_level, const char* filename, unsigned int line_num, const char* function, std::string&& message) { - if (!filter.CheckMessage(log_class, log_level)) + if (!filter.CheckMessage(log_class, log_level)) { return; - const Entry& entry = - CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)); - message_queue.Push(entry); + } + message_queue.Push( + CreateEntry(log_class, log_level, filename, line_num, function, std::move(message))); } private: @@ -225,7 +225,7 @@ private: ForEachBackend([&entry](Backend& backend) { backend.Write(entry); }); }; while (!stop_token.stop_requested()) { - entry = message_queue.PopWait(stop_token); + message_queue.PopWait(entry, stop_token); if (entry.filename != nullptr) { write_logs(); } @@ -233,7 +233,7 @@ private: // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a // case where a system is repeatedly spamming logs even on close. int max_logs_to_write = filter.IsDebug() ? INT_MAX : 100; - while (max_logs_to_write-- && message_queue.Pop(entry)) { + while (max_logs_to_write-- && message_queue.TryPop(entry)) { write_logs(); } }); @@ -273,7 +273,7 @@ private: ColorConsoleBackend color_console_backend{}; FileBackend file_backend; - MPSCQueue message_queue{}; + MPSCQueue message_queue{}; std::chrono::steady_clock::time_point time_origin{std::chrono::steady_clock::now()}; std::jthread backend_thread; }; -- cgit v1.2.3 From 15d573194c95b95ccf4a5480d8e40a7765a00929 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 04:01:47 -0400 Subject: bounded_threadsafe_queue: Add TryPush --- src/common/bounded_threadsafe_queue.h | 71 +++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index e03427539..eb88cc1d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,6 +22,55 @@ class SPSCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); public: + bool TryPush(T&& t) { + const size_t write_index = m_write_index.load(); + + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Push into the queue. + m_data[pos] = std::move(t); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + + template + bool TryPush(Args&&... args) { + const size_t write_index = m_write_index.load(); + + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + void Push(T&& t) { const size_t write_index = m_write_index.load(); @@ -153,6 +202,17 @@ private: template class MPSCQueue { public: + bool TryPush(T&& t) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::move(t)); + } + + template + bool TryPush(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::forward(args)...); + } + void Push(T&& t) { std::scoped_lock lock{write_mutex}; spsc_queue.Push(std::move(t)); @@ -196,6 +256,17 @@ private: template class MPMCQueue { public: + bool TryPush(T&& t) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::move(t)); + } + + template + bool TryPush(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::forward(args)...); + } + void Push(T&& t) { std::scoped_lock lock{write_mutex}; spsc_queue.Push(std::move(t)); -- cgit v1.2.3 From 407dc917f170cc9d08f3f1f9bdeb9e44ddebc653 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 14:24:18 -0400 Subject: bounded_threadsafe_queue: Deduplicate and add PushModes Adds the PushModes Try and Wait to allow producers to specify how they want to push their data to the queue if the queue is full. If the queue is full: - Try will fail to push to the queue, returning false. Try only returns true if it successfully pushes to the queue. This may result in items not being pushed into the queue. - Wait will wait until a slot is available to push to the queue, resulting in potential for deadlock if a consumer is not running. --- src/common/bounded_threadsafe_queue.h | 170 +++++++++++++++++----------------- src/common/logging/backend.cpp | 2 +- src/video_core/gpu_thread.cpp | 2 +- 3 files changed, 86 insertions(+), 88 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index eb88cc1d1..975215863 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -23,60 +23,76 @@ class SPSCQueue { public: bool TryPush(T&& t) { - const size_t write_index = m_write_index.load(); - - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; + return Push(std::move(t)); + } - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + template + bool TryEmplace(Args&&... args) { + return Emplace(std::forward(args)...); + } - return true; + void PushWait(T&& t) { + Push(std::move(t)); } template - bool TryPush(Args&&... args) { - const size_t write_index = m_write_index.load(); + void EmplaceWait(Args&&... args) { + Emplace(std::forward(args)...); + } - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } + bool TryPop(T& t) { + return Pop(t); + } - // Determine the position to write to. - const size_t pos = write_index % Capacity; + void PopWait(T& t, std::stop_token stop_token) { + Wait(stop_token); + Pop(t); + } - // Emplace into the queue. - std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + T PopWait(std::stop_token stop_token) { + Wait(stop_token); + T t; + Pop(t); + return t; + } - // Increment the write index. - ++m_write_index; + void Clear() { + while (!Empty()) { + Pop(); + } + } - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + bool Empty() const { + return m_read_index.load() == m_write_index.load(); + } - return true; + size_t Size() const { + return m_write_index.load() - m_read_index.load(); } - void Push(T&& t) { +private: + enum class PushMode { + Try, + Wait, + Count, + }; + + template + bool Push(T&& t) { const size_t write_index = m_write_index.load(); - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); } // Determine the position to write to. @@ -91,15 +107,26 @@ public: // Notify the consumer that we have pushed into the queue. std::scoped_lock lock{cv_mutex}; cv.notify_one(); + + return true; } - template - void Push(Args&&... args) { + template + bool Emplace(Args&&... args) { const size_t write_index = m_write_index.load(); - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); } // Determine the position to write to. @@ -114,39 +141,10 @@ public: // Notify the consumer that we have pushed into the queue. std::scoped_lock lock{cv_mutex}; cv.notify_one(); - } - - bool TryPop(T& t) { - return Pop(t); - } - - void PopWait(T& t, std::stop_token stop_token) { - Wait(stop_token); - Pop(t); - } - - T PopWait(std::stop_token stop_token) { - Wait(stop_token); - T t; - Pop(t); - return t; - } - void Clear() { - while (!Empty()) { - Pop(); - } - } - - bool Empty() const { - return m_read_index.load() == m_write_index.load(); - } - - size_t Size() const { - return m_write_index.load() - m_read_index.load(); + return true; } -private: void Pop() { const size_t read_index = m_read_index.load(); @@ -208,20 +206,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { @@ -262,20 +260,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index e1ce9db99..f96c7c222 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -207,7 +207,7 @@ public: if (!filter.CheckMessage(log_class, log_level)) { return; } - message_queue.Push( + message_queue.EmplaceWait( CreateEntry(log_class, log_level, filename, line_num, function, std::move(message))); } diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp index 469a59cf9..3c5317777 100644 --- a/src/video_core/gpu_thread.cpp +++ b/src/video_core/gpu_thread.cpp @@ -118,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) { std::unique_lock lk(state.write_lock); const u64 fence{++state.last_fence}; - state.queue.Push(std::move(command_data), fence, block); + state.queue.EmplaceWait(std::move(command_data), fence, block); if (block) { Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] { -- cgit v1.2.3 From 8c56481249ed1bc0b46bca3aec0c7e86495c5d3a Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 14:48:01 -0400 Subject: bounded_threadsafe_queue: Add producer cv to avoid busy waiting --- src/common/bounded_threadsafe_queue.h | 46 ++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 975215863..0fb2f42d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -45,12 +45,12 @@ public: } void PopWait(T& t, std::stop_token stop_token) { - Wait(stop_token); + ConsumerWait(stop_token); Pop(t); } T PopWait(std::stop_token stop_token) { - Wait(stop_token); + ConsumerWait(stop_token); T t; Pop(t); return t; @@ -88,9 +88,10 @@ private: } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } + std::unique_lock lock{producer_cv_mutex}; + producer_cv.wait(lock, [this, write_index] { + return (write_index - m_read_index.load()) < Capacity; + }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); } @@ -105,8 +106,8 @@ private: ++m_write_index; // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + std::scoped_lock lock{consumer_cv_mutex}; + consumer_cv.notify_one(); return true; } @@ -122,9 +123,10 @@ private: } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } + std::unique_lock lock{producer_cv_mutex}; + producer_cv.wait(lock, [this, write_index] { + return (write_index - m_read_index.load()) < Capacity; + }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); } @@ -139,8 +141,8 @@ private: ++m_write_index; // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + std::scoped_lock lock{consumer_cv_mutex}; + consumer_cv.notify_one(); return true; } @@ -161,6 +163,10 @@ private: // Increment the read index. ++m_read_index; + + // Notify the producer that we have popped off the queue. + std::unique_lock lock{producer_cv_mutex}; + producer_cv.notify_one(); } bool Pop(T& t) { @@ -180,12 +186,16 @@ private: // Increment the read index. ++m_read_index; + // Notify the producer that we have popped off the queue. + std::scoped_lock lock{producer_cv_mutex}; + producer_cv.notify_one(); + return true; } - void Wait(std::stop_token stop_token) { - std::unique_lock lock{cv_mutex}; - Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); + void ConsumerWait(std::stop_token stop_token) { + std::unique_lock lock{consumer_cv_mutex}; + Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); } alignas(128) std::atomic_size_t m_read_index{0}; @@ -193,8 +203,10 @@ private: std::array m_data; - std::condition_variable_any cv; - std::mutex cv_mutex; + std::condition_variable_any producer_cv; + std::mutex producer_cv_mutex; + std::condition_variable_any consumer_cv; + std::mutex consumer_cv_mutex; }; template -- cgit v1.2.3 From 197d7565603b2e8274f5c176f73b468ce6aa46a6 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 15:17:21 -0400 Subject: bounded_threadsafe_queue: Refactor Pop Introduces PopModes to bring waiting logic into Pop, similar to Push. --- src/common/bounded_threadsafe_queue.h | 202 +++++++++++----------------------- 1 file changed, 62 insertions(+), 140 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 0fb2f42d1..bd87aa09b 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,52 +22,38 @@ class SPSCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); public: - bool TryPush(T&& t) { - return Push(std::move(t)); - } - template bool TryEmplace(Args&&... args) { return Emplace(std::forward(args)...); } - void PushWait(T&& t) { - Push(std::move(t)); - } - template void EmplaceWait(Args&&... args) { Emplace(std::forward(args)...); } bool TryPop(T& t) { - return Pop(t); + return Pop(t); + } + + void PopWait(T& t) { + Pop(t); } void PopWait(T& t, std::stop_token stop_token) { - ConsumerWait(stop_token); - Pop(t); + Pop(t, stop_token); } - T PopWait(std::stop_token stop_token) { - ConsumerWait(stop_token); + T PopWait() { T t; - Pop(t); + Pop(t); return t; } - void Clear() { - while (!Empty()) { - Pop(); - } - } - - bool Empty() const { - return m_read_index.load() == m_write_index.load(); - } - - size_t Size() const { - return m_write_index.load() - m_read_index.load(); + T PopWait(std::stop_token stop_token) { + T t; + Pop(t, stop_token); + return t; } private: @@ -77,55 +63,27 @@ private: Count, }; - template - bool Push(T&& t) { - const size_t write_index = m_write_index.load(); - - if constexpr (Mode == PushMode::Try) { - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - } else if constexpr (Mode == PushMode::Wait) { - // Wait until we have free slots to write to. - std::unique_lock lock{producer_cv_mutex}; - producer_cv.wait(lock, [this, write_index] { - return (write_index - m_read_index.load()) < Capacity; - }); - } else { - static_assert(Mode < PushMode::Count, "Invalid PushMode."); - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{consumer_cv_mutex}; - consumer_cv.notify_one(); - - return true; - } + enum class PopMode { + Try, + Wait, + WaitWithStopToken, + Count, + }; template bool Emplace(Args&&... args) { - const size_t write_index = m_write_index.load(); + const size_t write_index = m_write_index.load(std::memory_order::relaxed); if constexpr (Mode == PushMode::Try) { // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { + if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) { return false; } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. std::unique_lock lock{producer_cv_mutex}; producer_cv.wait(lock, [this, write_index] { - return (write_index - m_read_index.load()) < Capacity; + return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity; }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); @@ -147,34 +105,32 @@ private: return true; } - void Pop() { - const size_t read_index = m_read_index.load(); - - // Check if the queue is empty. - if (read_index == m_write_index.load()) { - return; - } - - // Determine the position to read from. - const size_t pos = read_index % Capacity; - - // Pop the data off the queue, deleting it. - std::destroy_at(std::addressof(m_data[pos])); - - // Increment the read index. - ++m_read_index; - - // Notify the producer that we have popped off the queue. - std::unique_lock lock{producer_cv_mutex}; - producer_cv.notify_one(); - } - - bool Pop(T& t) { - const size_t read_index = m_read_index.load(); + template + bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) { + const size_t read_index = m_read_index.load(std::memory_order::relaxed); - // Check if the queue is empty. - if (read_index == m_write_index.load()) { - return false; + if constexpr (Mode == PopMode::Try) { + // Check if the queue is empty. + if (read_index == m_write_index.load(std::memory_order::acquire)) { + return false; + } + } else if constexpr (Mode == PopMode::Wait) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + consumer_cv.wait(lock, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + } else if constexpr (Mode == PopMode::WaitWithStopToken) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + if (stop_token.stop_requested()) { + return false; + } + } else { + static_assert(Mode < PopMode::Count, "Invalid PopMode."); } // Determine the position to read from. @@ -193,11 +149,6 @@ private: return true; } - void ConsumerWait(std::stop_token stop_token) { - std::unique_lock lock{consumer_cv_mutex}; - Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); - } - alignas(128) std::atomic_size_t m_read_index{0}; alignas(128) std::atomic_size_t m_write_index{0}; @@ -212,22 +163,12 @@ private: template class MPSCQueue { public: - bool TryPush(T&& t) { - std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::move(t)); - } - template bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; return spsc_queue.TryEmplace(std::forward(args)...); } - void PushWait(T&& t) { - std::scoped_lock lock{write_mutex}; - spsc_queue.PushWait(std::move(t)); - } - template void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; @@ -238,24 +179,20 @@ public: return spsc_queue.TryPop(t); } - void PopWait(T& t, std::stop_token stop_token) { - spsc_queue.PopWait(t, stop_token); + void PopWait(T& t) { + spsc_queue.PopWait(t); } - T PopWait(std::stop_token stop_token) { - return spsc_queue.PopWait(stop_token); - } - - void Clear() { - spsc_queue.Clear(); + void PopWait(T& t, std::stop_token stop_token) { + spsc_queue.PopWait(t, stop_token); } - bool Empty() { - return spsc_queue.Empty(); + T PopWait() { + return spsc_queue.PopWait(); } - size_t Size() { - return spsc_queue.Size(); + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); } private: @@ -266,22 +203,12 @@ private: template class MPMCQueue { public: - bool TryPush(T&& t) { - std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::move(t)); - } - template bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; return spsc_queue.TryEmplace(std::forward(args)...); } - void PushWait(T&& t) { - std::scoped_lock lock{write_mutex}; - spsc_queue.PushWait(std::move(t)); - } - template void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; @@ -293,29 +220,24 @@ public: return spsc_queue.TryPop(t); } - void PopWait(T& t, std::stop_token stop_token) { - std::scoped_lock lock{read_mutex}; - spsc_queue.PopWait(t, stop_token); - } - - T PopWait(std::stop_token stop_token) { + void PopWait(T& t) { std::scoped_lock lock{read_mutex}; - return spsc_queue.PopWait(stop_token); + spsc_queue.PopWait(t); } - void Clear() { + void PopWait(T& t, std::stop_token stop_token) { std::scoped_lock lock{read_mutex}; - spsc_queue.Clear(); + spsc_queue.PopWait(t, stop_token); } - bool Empty() { + T PopWait() { std::scoped_lock lock{read_mutex}; - return spsc_queue.Empty(); + return spsc_queue.PopWait(); } - size_t Size() { + T PopWait(std::stop_token stop_token) { std::scoped_lock lock{read_mutex}; - return spsc_queue.Size(); + return spsc_queue.PopWait(stop_token); } private: -- cgit v1.2.3