diff options
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/common/threadsafe_queue.h | 122 |
2 files changed, 123 insertions, 0 deletions
diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 7e83e64b0..447d7198c 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -76,6 +76,7 @@ set(HEADERS telemetry.h thread.h thread_queue_list.h + threadsafe_queue.h timer.h vector_math.h ) diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h new file mode 100644 index 000000000..a0c731e8c --- /dev/null +++ b/src/common/threadsafe_queue.h @@ -0,0 +1,122 @@ +// Copyright 2010 Dolphin Emulator Project +// Licensed under GPLv2+ +// Refer to the license.txt file included. + +#pragma once + +// a simple lockless thread-safe, +// single reader, single writer queue + +#include <algorithm> +#include <atomic> +#include <cstddef> +#include <mutex> +#include "common/common_types.h" + +namespace Common { +template <typename T, bool NeedSize = true> +class SPSCQueue { +public: + SPSCQueue() : size(0) { + write_ptr = read_ptr = new ElementPtr(); + } + ~SPSCQueue() { + // this will empty out the whole queue + delete read_ptr; + } + + u32 Size() const { + static_assert(NeedSize, "using Size() on FifoQueue without NeedSize"); + return size.load(); + } + + bool Empty() const { + return !read_ptr->next.load(); + } + T& Front() const { + return read_ptr->current; + } + template <typename Arg> + void Push(Arg&& t) { + // create the element, add it to the queue + write_ptr->current = std::forward<Arg>(t); + // set the next pointer to a new element ptr + // then advance the write pointer + ElementPtr* new_ptr = new ElementPtr(); + write_ptr->next.store(new_ptr, std::memory_order_release); + write_ptr = new_ptr; + if (NeedSize) + size++; + } + + void Pop() { + if (NeedSize) + size--; + ElementPtr* tmpptr = read_ptr; + // advance the read pointer + read_ptr = tmpptr->next.load(); + // set the next element to nullptr to stop the recursive deletion + tmpptr->next.store(nullptr); + delete tmpptr; // this also deletes the element + } + + bool Pop(T& t) { + if (Empty()) + return false; + + if (NeedSize) + size--; + + ElementPtr* tmpptr = read_ptr; + read_ptr = tmpptr->next.load(std::memory_order_acquire); + t = std::move(tmpptr->current); + tmpptr->next.store(nullptr); + delete tmpptr; + return true; + } + + // not thread-safe + void Clear() { + size.store(0); + delete read_ptr; + write_ptr = read_ptr = new ElementPtr(); + } + +private: + // stores a pointer to element + // and a pointer to the next ElementPtr + class ElementPtr { + public: + ElementPtr() : next(nullptr) {} + ~ElementPtr() { + ElementPtr* next_ptr = next.load(); + + if (next_ptr) + delete next_ptr; + } + + T current; + std::atomic<ElementPtr*> next; + }; + + ElementPtr* write_ptr; + ElementPtr* read_ptr; + std::atomic<u32> size; +}; + +// a simple thread-safe, +// single reader, multiple writer queue + +template <typename T, bool NeedSize = true> +class MPSCQueue : public SPSCQueue<T, NeedSize> { +public: + template <typename Arg> + void Push(Arg&& t) { + std::lock_guard<std::mutex> lock(write_lock); + SPSCQueue<T, NeedSize>::Push(t); + } + +private: + std::mutex write_lock; +}; +} // namespace Common |