summaryrefslogtreecommitdiffstats
path: root/src/common/threadsafe_queue.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/common/threadsafe_queue.h122
1 files changed, 122 insertions, 0 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h
new file mode 100644
index 000000000..a0c731e8c
--- /dev/null
+++ b/src/common/threadsafe_queue.h
@@ -0,0 +1,122 @@
+// Copyright 2010 Dolphin Emulator Project
+// Licensed under GPLv2+
+// Refer to the license.txt file included.
+
+#pragma once
+
+// a simple lockless thread-safe,
+// single reader, single writer queue
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <mutex>
+#include "common/common_types.h"
+
+namespace Common {
+template <typename T, bool NeedSize = true>
+class SPSCQueue {
+public:
+ SPSCQueue() : size(0) {
+ write_ptr = read_ptr = new ElementPtr();
+ }
+ ~SPSCQueue() {
+ // this will empty out the whole queue
+ delete read_ptr;
+ }
+
+ u32 Size() const {
+ static_assert(NeedSize, "using Size() on FifoQueue without NeedSize");
+ return size.load();
+ }
+
+ bool Empty() const {
+ return !read_ptr->next.load();
+ }
+ T& Front() const {
+ return read_ptr->current;
+ }
+ template <typename Arg>
+ void Push(Arg&& t) {
+ // create the element, add it to the queue
+ write_ptr->current = std::forward<Arg>(t);
+ // set the next pointer to a new element ptr
+ // then advance the write pointer
+ ElementPtr* new_ptr = new ElementPtr();
+ write_ptr->next.store(new_ptr, std::memory_order_release);
+ write_ptr = new_ptr;
+ if (NeedSize)
+ size++;
+ }
+
+ void Pop() {
+ if (NeedSize)
+ size--;
+ ElementPtr* tmpptr = read_ptr;
+ // advance the read pointer
+ read_ptr = tmpptr->next.load();
+ // set the next element to nullptr to stop the recursive deletion
+ tmpptr->next.store(nullptr);
+ delete tmpptr; // this also deletes the element
+ }
+
+ bool Pop(T& t) {
+ if (Empty())
+ return false;
+
+ if (NeedSize)
+ size--;
+
+ ElementPtr* tmpptr = read_ptr;
+ read_ptr = tmpptr->next.load(std::memory_order_acquire);
+ t = std::move(tmpptr->current);
+ tmpptr->next.store(nullptr);
+ delete tmpptr;
+ return true;
+ }
+
+ // not thread-safe
+ void Clear() {
+ size.store(0);
+ delete read_ptr;
+ write_ptr = read_ptr = new ElementPtr();
+ }
+
+private:
+ // stores a pointer to element
+ // and a pointer to the next ElementPtr
+ class ElementPtr {
+ public:
+ ElementPtr() : next(nullptr) {}
+ ~ElementPtr() {
+ ElementPtr* next_ptr = next.load();
+
+ if (next_ptr)
+ delete next_ptr;
+ }
+
+ T current;
+ std::atomic<ElementPtr*> next;
+ };
+
+ ElementPtr* write_ptr;
+ ElementPtr* read_ptr;
+ std::atomic<u32> size;
+};
+
+// a simple thread-safe,
+// single reader, multiple writer queue
+
+template <typename T, bool NeedSize = true>
+class MPSCQueue : public SPSCQueue<T, NeedSize> {
+public:
+ template <typename Arg>
+ void Push(Arg&& t) {
+ std::lock_guard<std::mutex> lock(write_lock);
+ SPSCQueue<T, NeedSize>::Push(t);
+ }
+
+private:
+ std::mutex write_lock;
+};
+} // namespace Common