172 static_assert(Capacity >= 2,
"MpmcRingBuffer capacity must be at least 2");
173#if !DISPENSO_HAS_CONCEPTS
175 std::is_move_constructible<T>::value,
176 "MpmcRingBuffer element type must be move-constructible");
178 std::is_nothrow_move_constructible<T>::value,
179 "MpmcRingBuffer element type must be nothrow-move-constructible");
186 using value_type = T;
191 using size_type = size_t;
203 for (
size_t i = 0; i < kBufferSize; ++i) {
204 slots_[i].seq.store(i, std::memory_order_relaxed);
241 size_t head = head_.load(std::memory_order_relaxed);
242 size_t tail = tail_.load(std::memory_order_relaxed);
243 while (head != tail) {
244 size_t pos = wrapIndex(head);
245 dataPtr(slots_[pos])->~T();
273 return emplaceImpl(std::move(item));
286#if DISPENSO_HAS_CONCEPTS
288 requires std::is_nothrow_copy_constructible_v<T>
290 return emplaceImpl(item);
293 template <typename U = T, std::enable_if_t<std::is_nothrow_copy_constructible<U>::value,
int> = 0>
295 return emplaceImpl(item);
320#if DISPENSO_HAS_CONCEPTS
321 template <
typename... Args>
322 requires std::is_nothrow_constructible_v<T, Args...>
324 return emplaceImpl(std::forward<Args>(args)...);
329 std::enable_if_t<std::is_nothrow_constructible<T, Args...>::value,
int> = 0>
331 return emplaceImpl(std::forward<Args>(args)...);
367 std::is_nothrow_move_assignable<T>::value,
368 "MpmcRingBuffer::try_pop(T&) requires a nothrow-move-assignable T; "
369 "use try_pop() or try_pop_into() for nothrow-move-constructible-only types");
370 size_t head = head_.load(std::memory_order_relaxed);
374 if (head == tail_.load(std::memory_order_relaxed)) {
377 Slot& slot = slots_[wrapIndex(head)];
378 size_t seq = slot.seq.load(std::memory_order_acquire);
379 intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(head + 1);
381 if (head_.compare_exchange_strong(head, head + 1, std::memory_order_relaxed)) {
382 T* elem = dataPtr(slot);
383 item = std::move(*elem);
385 slot.seq.store(head + kBufferSize, std::memory_order_release);
414 size_t head = head_.load(std::memory_order_relaxed);
415 if (head == tail_.load(std::memory_order_relaxed)) {
418 Slot& slot = slots_[wrapIndex(head)];
419 size_t seq = slot.seq.load(std::memory_order_acquire);
420 intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(head + 1);
422 if (head_.compare_exchange_strong(head, head + 1, std::memory_order_relaxed)) {
423 T* elem = dataPtr(slot);
424 OpResult<T> result(std::move(*elem));
426 slot.seq.store(head + kBufferSize, std::memory_order_release);
449 size_t head = head_.load(std::memory_order_relaxed);
450 if (head == tail_.load(std::memory_order_relaxed)) {
453 Slot& slot = slots_[wrapIndex(head)];
454 size_t seq = slot.seq.load(std::memory_order_acquire);
455 intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(head + 1);
457 if (head_.compare_exchange_strong(head, head + 1, std::memory_order_relaxed)) {
458 T* elem = dataPtr(slot);
459 new (storage) T(std::move(*elem));
461 slot.seq.store(head + kBufferSize, std::memory_order_release);
500 if (count > kBufferSize) {
504 size_t tail = tail_.load(std::memory_order_relaxed);
507 size_t available = 0;
508 for (
size_t i = 0; i < count; ++i) {
509 Slot& slot = slots_[wrapIndex(tail + i)];
510 size_t seq = slot.seq.load(std::memory_order_acquire);
511 intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(tail + i);
517 if (available == 0) {
521 if (tail_.compare_exchange_strong(tail, tail + available, std::memory_order_relaxed)) {
522 for (
size_t i = 0; i < available; ++i) {
523 Slot& slot = slots_[wrapIndex(tail + i)];
524 new (dataPtr(slot)) T(std::move(items[i]));
525 slot.seq.store(tail + i + 1, std::memory_order_release);
542 size_t head = head_.load(std::memory_order_relaxed);
543 size_t tail = tail_.load(std::memory_order_relaxed);
556 size_t head = head_.load(std::memory_order_relaxed);
557 size_t tail = tail_.load(std::memory_order_relaxed);
558 return (tail - head) >= kBufferSize;
571 size_t head = head_.load(std::memory_order_relaxed);
572 size_t tail = tail_.load(std::memory_order_relaxed);
589 static constexpr size_t computeBufferSize() noexcept {
590 return RoundUpToPowerOfTwo ?
static_cast<size_t>(detail::nextPow2(Capacity)) : Capacity;
593 static constexpr size_t kBufferSize = computeBufferSize();
595 (kBufferSize & (kBufferSize - 1)) == 0 || !RoundUpToPowerOfTwo,
596 "Internal error: kBufferSize must be power of two when RoundUpToPowerOfTwo is true");
597 static constexpr bool kIsPow2 = (kBufferSize & (kBufferSize - 1)) == 0;
598 static constexpr size_t kMask = kBufferSize - 1;
600 static size_t wrapIndex(
size_t i) {
601 return kIsPow2 ? (i & kMask) : (i % kBufferSize);
609 template <
typename... Args>
610 bool emplaceImpl(Args&&... args) {
611 size_t tail = tail_.load(std::memory_order_relaxed);
612 Slot& slot = slots_[wrapIndex(tail)];
613 size_t seq = slot.seq.load(std::memory_order_acquire);
614 intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(tail);
619 if (tail_.compare_exchange_strong(tail, tail + 1, std::memory_order_relaxed)) {
620 new (dataPtr(slot)) T(std::forward<Args>(args)...);
621 slot.seq.store(tail + 1, std::memory_order_release);
633 struct alignas(kCacheLineSize) Slot {
634 alignas(T)
char data[
sizeof(T)];
635 std::atomic<size_t> seq;
638 T* dataPtr(Slot& slot) {
639 return reinterpret_cast<T*
>(slot.data);
642 const T* dataPtr(
const Slot& slot)
const {
643 return reinterpret_cast<const T*
>(slot.data);
653 Slot slots_[kBufferSize];