From 4dd6bcd2063909359b08771465c3641876ddf138 Mon Sep 17 00:00:00 2001 From: Levi Behunin Date: Thu, 2 Jun 2022 01:08:18 -0600 Subject: gpu_thread: Move to bounded queue --- src/common/bounded_threadsafe_queue.h | 180 ++++++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 src/common/bounded_threadsafe_queue.h (limited to 'src/common') diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h new file mode 100644 index 000000000..e83064c7f --- /dev/null +++ b/src/common/bounded_threadsafe_queue.h @@ -0,0 +1,180 @@ +// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp +// SPDX-License-Identifier: MIT +#pragma once +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4324) +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Common { +namespace mpsc { +#if defined(__cpp_lib_hardware_interference_size) +constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size; +#else +constexpr size_t hardware_interference_size = 64; +#endif + +template +using AlignedAllocator = std::allocator; + +template +struct Slot { + ~Slot() noexcept { + if (turn.test()) { + destroy(); + } + } + + template + void construct(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible_v, + "T must be nothrow constructible with Args&&..."); + std::construct_at(reinterpret_cast(&storage), std::forward(args)...); + } + + void destroy() noexcept { + static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); + std::destroy_at(reinterpret_cast(&storage)); + } + + T&& move() noexcept { + return reinterpret_cast(storage); + } + + // Align to avoid false sharing between adjacent slots + alignas(hardware_interference_size) std::atomic_flag turn{}; + struct aligned_store { + struct type { + alignas(T) unsigned char data[sizeof(T)]; + }; + }; + typename aligned_store::type storage; +}; + +template >> +class Queue { +public: + explicit Queue(const size_t capacity, const Allocator& allocator = Allocator()) + : allocator_(allocator) { + if (capacity < 1) { + throw std::invalid_argument("capacity < 1"); + } + // Ensure that the queue length is an integer power of 2 + // This is so that idx(i) can be a simple i & mask_ insted of i % capacity + // https://github.com/rigtorp/MPMCQueue/pull/36 + if (!std::has_single_bit(capacity)) { + throw std::invalid_argument("capacity must be an integer power of 2"); + } + + mask_ = capacity - 1; + + // Allocate one extra slot to prevent false sharing on the last slot + slots_ = allocator_.allocate(mask_ + 2); + // Allocators are not required to honor alignment for over-aligned types + // (see http://eel.is/c++draft/allocator.requirements#10) so we verify + // alignment here + if (reinterpret_cast(slots_) % alignof(Slot) != 0) { + allocator_.deallocate(slots_, mask_ + 2); + throw std::bad_alloc(); + } + for (size_t i = 0; i < mask_ + 1; ++i) { + std::construct_at(&slots_[i]); + } + static_assert(alignof(Slot) == hardware_interference_size, + "Slot must be aligned to cache line boundary to prevent false sharing"); + static_assert(sizeof(Slot) % hardware_interference_size == 0, + "Slot size must be a multiple of cache line size to prevent " + "false sharing between adjacent slots"); + static_assert(sizeof(Queue) % hardware_interference_size == 0, + "Queue size must be a multiple of cache line size to " + "prevent false sharing between adjacent queues"); + } + + ~Queue() noexcept { + for (size_t i = 0; i < mask_ + 1; ++i) { + slots_[i].~Slot(); + } + allocator_.deallocate(slots_, mask_ + 2); + } + + // non-copyable and non-movable + Queue(const Queue&) = delete; + Queue& operator=(const Queue&) = delete; + + void Push(const T& v) noexcept { + static_assert(std::is_nothrow_copy_constructible_v, + "T must be nothrow copy constructible"); + emplace(v); + } + + template >> + void Push(P&& v) noexcept { + emplace(std::forward

(v)); + } + + void Pop(T& v, std::stop_token stop) noexcept { + auto const tail = tail_.fetch_add(1); + auto& slot = slots_[idx(tail)]; + if (false == slot.turn.test()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, stop, [&slot] { return slot.turn.test(); }); + } + v = slot.move(); + slot.destroy(); + slot.turn.clear(); + slot.turn.notify_one(); + } + +private: + template + void emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible_v, + "T must be nothrow constructible with Args&&..."); + auto const head = head_.fetch_add(1); + auto& slot = slots_[idx(head)]; + slot.turn.wait(true); + slot.construct(std::forward(args)...); + slot.turn.test_and_set(); + cv.notify_one(); + } + + constexpr size_t idx(size_t i) const noexcept { + return i & mask_; + } + + std::conditional_t cv; + std::mutex cv_mutex; + size_t mask_; + Slot* slots_; + [[no_unique_address]] Allocator allocator_; + + // Align to avoid false sharing between head_ and tail_ + alignas(hardware_interference_size) std::atomic head_{0}; + alignas(hardware_interference_size) std::atomic tail_{0}; + + static_assert(std::is_nothrow_copy_assignable_v || std::is_nothrow_move_assignable_v, + "T must be nothrow copy or move assignable"); + + static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); +}; +} // namespace mpsc + +template >> +using MPSCQueue = mpsc::Queue; + +} // namespace Common + +#ifdef _MSC_VER +#pragma warning(pop) +#endif -- cgit v1.2.3