diff options
Diffstat (limited to 'src/common/threadsafe_queue.h')
-rw-r--r-- | src/common/threadsafe_queue.h | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 8430b9778..2c8c2b90e 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -14,7 +14,7 @@ #include <utility> namespace Common { -template <typename T> +template <typename T, bool with_stop_token = false> class SPSCQueue { public: SPSCQueue() { @@ -84,7 +84,7 @@ public: void Wait() { if (Empty()) { std::unique_lock lock{cv_mutex}; - cv.wait(lock, [this]() { return !Empty(); }); + cv.wait(lock, [this] { return !Empty(); }); } } @@ -95,6 +95,19 @@ public: return t; } + T PopWait(std::stop_token stop_token) { + if (Empty()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, stop_token, [this] { return !Empty(); }); + } + if (stop_token.stop_requested()) { + return T{}; + } + T t; + Pop(t); + return t; + } + // not thread-safe void Clear() { size.store(0); @@ -123,13 +136,13 @@ private: ElementPtr* read_ptr; std::atomic_size_t size{0}; std::mutex cv_mutex; - std::condition_variable cv; + std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv; }; // a simple thread-safe, // single reader, multiple writer queue -template <typename T> +template <typename T, bool with_stop_token = false> class MPSCQueue { public: [[nodiscard]] std::size_t Size() const { @@ -166,13 +179,17 @@ public: return spsc_queue.PopWait(); } + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + // not thread-safe void Clear() { spsc_queue.Clear(); } private: - SPSCQueue<T> spsc_queue; + SPSCQueue<T, with_stop_token> spsc_queue; std::mutex write_lock; }; } // namespace Common |