dispenso 1.6.0
A library for task parallelism
Loading...
Searching...
No Matches
mpmc_ring_buffer.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
27#pragma once
28
29#include <atomic>
30#include <cstddef>
31#include <cstdint>
32#include <new>
33#include <type_traits>
34#include <utility>
35
36#include <dispenso/platform.h>
37#include <dispenso/util.h>
38
39namespace dispenso {
40
167template <typename T, size_t Capacity = 16, bool RoundUpToPowerOfTwo = true>
168#if DISPENSO_HAS_CONCEPTS
169 requires std::move_constructible<T> && std::is_nothrow_move_constructible_v<T>
170#endif
172 static_assert(Capacity >= 2, "MpmcRingBuffer capacity must be at least 2");
173#if !DISPENSO_HAS_CONCEPTS
174 static_assert(
175 std::is_move_constructible<T>::value,
176 "MpmcRingBuffer element type must be move-constructible");
177 static_assert(
178 std::is_nothrow_move_constructible<T>::value,
179 "MpmcRingBuffer element type must be nothrow-move-constructible");
180#endif
181
182 public:
186 using value_type = T;
187
191 using size_type = size_t;
192
203 for (size_t i = 0; i < kBufferSize; ++i) {
204 slots_[i].seq.store(i, std::memory_order_relaxed);
205 }
206 }
207
212
216 MpmcRingBuffer& operator=(const MpmcRingBuffer&) = delete;
217
222
226 MpmcRingBuffer& operator=(MpmcRingBuffer&&) = delete;
227
237 // Drain and destroy any remaining elements.
238 // At destruction time there must be no concurrent access, so we can
239 // read head/tail relaxed and walk forward, destroying elements whose
240 // sequence numbers indicate they contain data.
241 size_t head = head_.load(std::memory_order_relaxed);
242 size_t tail = tail_.load(std::memory_order_relaxed);
243 while (head != tail) {
244 size_t pos = wrapIndex(head);
245 dataPtr(slots_[pos])->~T();
246 ++head;
247 }
248 }
249
272 bool try_push(T&& item) {
273 return emplaceImpl(std::move(item));
274 }
275
286#if DISPENSO_HAS_CONCEPTS
287 bool try_push(const T& item)
288 requires std::is_nothrow_copy_constructible_v<T>
289 {
290 return emplaceImpl(item);
291 }
292#else
293 template <typename U = T, std::enable_if_t<std::is_nothrow_copy_constructible<U>::value, int> = 0>
294 bool try_push(const T& item) {
295 return emplaceImpl(item);
296 }
297#endif
298
320#if DISPENSO_HAS_CONCEPTS
321 template <typename... Args>
322 requires std::is_nothrow_constructible_v<T, Args...>
323 bool try_emplace(Args&&... args) {
324 return emplaceImpl(std::forward<Args>(args)...);
325 }
326#else
327 template <
328 typename... Args,
329 std::enable_if_t<std::is_nothrow_constructible<T, Args...>::value, int> = 0>
330 bool try_emplace(Args&&... args) {
331 return emplaceImpl(std::forward<Args>(args)...);
332 }
333#endif
334
359 bool try_pop(T& item) {
360 // This overload move-*assigns* into the caller's object. If that throws after
361 // the head_ CAS below, the slot would be left unreleased (seq stuck at
362 // head+1) and its element leaked, since the destructor only walks [head,
363 // tail). Require nothrow move-assignment so the slot-release path can never
364 // be derailed. Types that are only nothrow-move-constructible can still use
365 // try_pop() (OpResult) or try_pop_into(), which move-construct.
366 static_assert(
367 std::is_nothrow_move_assignable<T>::value,
368 "MpmcRingBuffer::try_pop(T&) requires a nothrow-move-assignable T; "
369 "use try_pop() or try_pop_into() for nothrow-move-constructible-only types");
370 size_t head = head_.load(std::memory_order_relaxed);
371 // Fast empty-check: a relaxed tail load is much cheaper than the acquire
372 // slot.seq load below (esp. on weak-memory architectures: ldr vs ldar).
373 // Callers that poll many sources for work hit this path constantly.
374 if (head == tail_.load(std::memory_order_relaxed)) {
375 return false;
376 }
377 Slot& slot = slots_[wrapIndex(head)];
378 size_t seq = slot.seq.load(std::memory_order_acquire);
379 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(head + 1);
380 if (diff == 0) {
381 if (head_.compare_exchange_strong(head, head + 1, std::memory_order_relaxed)) {
382 T* elem = dataPtr(slot);
383 item = std::move(*elem);
384 elem->~T();
385 slot.seq.store(head + kBufferSize, std::memory_order_release);
386 return true;
387 }
388 }
389 return false;
390 }
391
413 OpResult<T> try_pop() {
414 size_t head = head_.load(std::memory_order_relaxed);
415 if (head == tail_.load(std::memory_order_relaxed)) {
416 return {};
417 }
418 Slot& slot = slots_[wrapIndex(head)];
419 size_t seq = slot.seq.load(std::memory_order_acquire);
420 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(head + 1);
421 if (diff == 0) {
422 if (head_.compare_exchange_strong(head, head + 1, std::memory_order_relaxed)) {
423 T* elem = dataPtr(slot);
424 OpResult<T> result(std::move(*elem));
425 elem->~T();
426 slot.seq.store(head + kBufferSize, std::memory_order_release);
427 return result;
428 }
429 }
430 return {};
431 }
432
448 bool try_pop_into(T* storage) {
449 size_t head = head_.load(std::memory_order_relaxed);
450 if (head == tail_.load(std::memory_order_relaxed)) {
451 return false;
452 }
453 Slot& slot = slots_[wrapIndex(head)];
454 size_t seq = slot.seq.load(std::memory_order_acquire);
455 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(head + 1);
456 if (diff == 0) {
457 if (head_.compare_exchange_strong(head, head + 1, std::memory_order_relaxed)) {
458 T* elem = dataPtr(slot);
459 new (storage) T(std::move(*elem));
460 elem->~T();
461 slot.seq.store(head + kBufferSize, std::memory_order_release);
462 return true;
463 }
464 }
465 return false;
466 }
467
496 size_type try_push_batch(T* items, size_type count) {
497 if (count == 0) {
498 return 0;
499 }
500 if (count > kBufferSize) {
501 count = kBufferSize;
502 }
503
504 size_t tail = tail_.load(std::memory_order_relaxed);
505
506 // Validate each slot in the reservation range.
507 size_t available = 0;
508 for (size_t i = 0; i < count; ++i) {
509 Slot& slot = slots_[wrapIndex(tail + i)];
510 size_t seq = slot.seq.load(std::memory_order_acquire);
511 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(tail + i);
512 if (diff != 0) {
513 break;
514 }
515 ++available;
516 }
517 if (available == 0) {
518 return 0;
519 }
520
521 if (tail_.compare_exchange_strong(tail, tail + available, std::memory_order_relaxed)) {
522 for (size_t i = 0; i < available; ++i) {
523 Slot& slot = slots_[wrapIndex(tail + i)];
524 new (dataPtr(slot)) T(std::move(items[i]));
525 slot.seq.store(tail + i + 1, std::memory_order_release);
526 }
527 return available;
528 }
529
530 return 0;
531 }
532
541 bool empty() const {
542 size_t head = head_.load(std::memory_order_relaxed);
543 size_t tail = tail_.load(std::memory_order_relaxed);
544 return head == tail;
545 }
546
555 bool full() const {
556 size_t head = head_.load(std::memory_order_relaxed);
557 size_t tail = tail_.load(std::memory_order_relaxed);
558 return (tail - head) >= kBufferSize;
559 }
560
570 size_type size() const {
571 size_t head = head_.load(std::memory_order_relaxed);
572 size_t tail = tail_.load(std::memory_order_relaxed);
573 return tail - head;
574 }
575
584 static constexpr size_type capacity() noexcept {
585 return kBufferSize;
586 }
587
588 private:
589 static constexpr size_t computeBufferSize() noexcept {
590 return RoundUpToPowerOfTwo ? static_cast<size_t>(detail::nextPow2(Capacity)) : Capacity;
591 }
592
593 static constexpr size_t kBufferSize = computeBufferSize();
594 static_assert(
595 (kBufferSize & (kBufferSize - 1)) == 0 || !RoundUpToPowerOfTwo,
596 "Internal error: kBufferSize must be power of two when RoundUpToPowerOfTwo is true");
597 static constexpr bool kIsPow2 = (kBufferSize & (kBufferSize - 1)) == 0;
598 static constexpr size_t kMask = kBufferSize - 1;
599
600 static size_t wrapIndex(size_t i) {
601 return kIsPow2 ? (i & kMask) : (i % kBufferSize);
602 }
603
604 // Shared single-slot fast path for try_push() and try_emplace(): reserve one slot with a single
605 // CAS on the tail, then construct T in place from the forwarded arguments. Intentionally
606 // unconstrained -- the public overloads carry the nothrow-construction constraints; this just
607 // centralizes the (otherwise identical) algorithm so it lives in one place. Fully inlined, so
608 // the forwarding adds no runtime cost on the hot path.
609 template <typename... Args>
610 bool emplaceImpl(Args&&... args) {
611 size_t tail = tail_.load(std::memory_order_relaxed);
612 Slot& slot = slots_[wrapIndex(tail)];
613 size_t seq = slot.seq.load(std::memory_order_acquire);
614 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(tail);
615 if (diff == 0) {
616 // ABA-free: tail_ is a monotonic 64-bit counter, so a successful CAS proves no other
617 // producer claimed this position since the load (see "Correctness & ABA-freedom" above).
618 // Fail-fast: a single attempt, no retry loop -- contention returns false, not corruption.
619 if (tail_.compare_exchange_strong(tail, tail + 1, std::memory_order_relaxed)) {
620 new (dataPtr(slot)) T(std::forward<Args>(args)...);
621 slot.seq.store(tail + 1, std::memory_order_release);
622 return true;
623 }
624 }
625 return false;
626 }
627
628 // The element buffer comes first: placing the over-aligned `data` ahead of `seq` keeps any
629 // padding T's alignment requires from landing *between* the two members, which can otherwise
630 // tip a slot that would have fit in one cache line over the boundary. The trailing
631 // alignas(kCacheLineSize) still rounds the whole slot up to a cache-line multiple to prevent
632 // false sharing between neighbors.
633 struct alignas(kCacheLineSize) Slot {
634 alignas(T) char data[sizeof(T)];
635 std::atomic<size_t> seq;
636 };
637
638 T* dataPtr(Slot& slot) {
639 return reinterpret_cast<T*>(slot.data);
640 }
641
642 const T* dataPtr(const Slot& slot) const {
643 return reinterpret_cast<const T*>(slot.data);
644 }
645
647 alignas(kCacheLineSize) std::atomic<size_t> head_{0};
648
650 alignas(kCacheLineSize) std::atomic<size_t> tail_{0};
651
653 Slot slots_[kBufferSize];
654};
655
656} // namespace dispenso
A lock-free multi-producer multi-consumer ring buffer with fixed capacity.
size_type try_push_batch(T *items, size_type count)
Attempts to push multiple elements into the buffer.
bool try_emplace(Args &&... args)
Attempts to construct an element in-place in the buffer.
bool try_push(const T &item)
Attempts to push an element into the buffer by copying.
bool try_pop(T &item)
Attempts to pop an element from the buffer.
MpmcRingBuffer()
Constructs an empty ring buffer.
bool full() const
Checks if the buffer is full.
OpResult< T > try_pop()
Attempts to pop an element from the buffer, returning an optional.
size_type size() const
Returns the approximate number of elements in the buffer.
MpmcRingBuffer(const MpmcRingBuffer &)=delete
Ring buffers are not copyable.
~MpmcRingBuffer()
Destroys the ring buffer.
static constexpr size_type capacity() noexcept
Returns the maximum number of elements the buffer can hold.
bool try_pop_into(T *storage)
Attempts to pop an element into uninitialized storage.
MpmcRingBuffer(MpmcRingBuffer &&)=delete
Ring buffers are not movable.
bool empty() const
Checks if the buffer is empty.
bool try_push(T &&item)
Attempts to push an element into the buffer by moving.
constexpr size_t kCacheLineSize
A constant that defines a safe number of bytes+alignment to avoid false sharing.
Definition platform.h:125