dispenso 1.5.1
A library for task parallelism
Loading...
Searching...
No Matches
spsc_ring_buffer.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
21#pragma once
22
23#include <atomic>
24#include <cstddef>
25#include <new>
26#include <type_traits>
27#include <utility>
28
29#include <dispenso/platform.h>
30#include <dispenso/util.h>
31
32namespace dispenso {
33
34// TODO(bbudge): Add kDynamicCapacity specialization that heap-allocates storage
35// with runtime-determined capacity.
36
110template <typename T, size_t Capacity = 16, bool RoundUpToPowerOfTwo = true>
112 static_assert(Capacity >= 1, "SPSCRingBuffer capacity must be at least 1");
113 static_assert(
114 std::is_move_constructible<T>::value,
115 "SPSCRingBuffer element type must be move-constructible");
116
117 public:
121 using value_type = T;
122
126 using size_type = size_t;
127
137 SPSCRingBuffer() = default;
138
147
152 SPSCRingBuffer& operator=(const SPSCRingBuffer&) = delete;
153
162
167 SPSCRingBuffer& operator=(SPSCRingBuffer&&) = delete;
168
179 // Destroy any remaining elements
180 size_t head = head_.load(std::memory_order_relaxed);
181 size_t tail = tail_.load(std::memory_order_relaxed);
182 while (head != tail) {
183 elementAt(head)->~T();
184 head = increment(head);
185 }
186 }
187
212 bool try_push(T&& item) {
213 const size_t currentTail = tail_.load(std::memory_order_relaxed);
214 const size_t nextTail = increment(currentTail);
215
216 // Check if buffer is full
217 if (nextTail == head_.load(std::memory_order_acquire)) {
218 return false;
219 }
220
221 // Construct element in-place
222 new (elementAt(currentTail)) T(std::move(item));
223 tail_.store(nextTail, std::memory_order_release);
224 return true;
225 }
226
241 bool try_push(const T& item) {
242 const size_t currentTail = tail_.load(std::memory_order_relaxed);
243 const size_t nextTail = increment(currentTail);
244
245 // Check if buffer is full
246 if (nextTail == head_.load(std::memory_order_acquire)) {
247 return false;
248 }
249
250 // Construct element in-place via copy
251 new (elementAt(currentTail)) T(item);
252 tail_.store(nextTail, std::memory_order_release);
253 return true;
254 }
255
277 template <typename... Args>
278 bool try_emplace(Args&&... args) {
279 const size_t currentTail = tail_.load(std::memory_order_relaxed);
280 const size_t nextTail = increment(currentTail);
281
282 // Check if buffer is full
283 if (nextTail == head_.load(std::memory_order_acquire)) {
284 return false;
285 }
286
287 // Construct element in-place
288 new (elementAt(currentTail)) T(std::forward<Args>(args)...);
289 tail_.store(nextTail, std::memory_order_release);
290 return true;
291 }
292
317 bool try_pop(T& item) {
318 const size_t currentHead = head_.load(std::memory_order_relaxed);
319
320 // Check if buffer is empty
321 if (currentHead == tail_.load(std::memory_order_acquire)) {
322 return false;
323 }
324
325 T* elem = elementAt(currentHead);
326 item = std::move(*elem);
327 elem->~T();
328 head_.store(increment(currentHead), std::memory_order_release);
329 return true;
330 }
331
358 OpResult<T> try_pop() {
359 const size_t currentHead = head_.load(std::memory_order_relaxed);
360
361 // Check if buffer is empty
362 if (currentHead == tail_.load(std::memory_order_acquire)) {
363 return {};
364 }
365
366 T* elem = elementAt(currentHead);
367 OpResult<T> result(std::move(*elem));
368 elem->~T();
369 head_.store(increment(currentHead), std::memory_order_release);
370 return result;
371 }
372
398 bool try_pop_into(T* storage) {
399 const size_t currentHead = head_.load(std::memory_order_relaxed);
400
401 // Check if buffer is empty
402 if (currentHead == tail_.load(std::memory_order_acquire)) {
403 return false;
404 }
405
406 T* elem = elementAt(currentHead);
407 new (storage) T(std::move(*elem));
408 elem->~T();
409 head_.store(increment(currentHead), std::memory_order_release);
410 return true;
411 }
412
437 template <typename InputIt>
438 size_type try_push_batch(InputIt first, InputIt last) {
439 const size_t currentTail = tail_.load(std::memory_order_relaxed);
440 const size_t currentHead = head_.load(std::memory_order_acquire);
441
442 // Calculate available space (actual capacity is kBufferSize - 1)
443 size_t available;
444 if (currentTail >= currentHead) {
445 // Tail is ahead of or at head: available = capacity - (tail - head)
446 available = (kBufferSize - 1) - (currentTail - currentHead);
447 } else {
448 // Tail has wrapped: available = head - tail - 1
449 available = currentHead - currentTail - 1;
450 }
451
452 if (available == 0) {
453 return 0;
454 }
455
456 // Push as many items as possible
457 size_t count = 0;
458 size_t tailPos = currentTail;
459 for (; first != last && count < available; ++first, ++count) {
460 new (elementAt(tailPos)) T(std::move(*first));
461 tailPos = increment(tailPos);
462 }
463
464 if (count > 0) {
465 tail_.store(tailPos, std::memory_order_release);
466 }
467 return count;
468 }
469
495 template <typename OutputIt>
496 size_type try_pop_batch(OutputIt dest, size_type maxCount) {
497 const size_t currentHead = head_.load(std::memory_order_relaxed);
498 const size_t currentTail = tail_.load(std::memory_order_acquire);
499
500 // Calculate available items
501 size_t available;
502 if (currentTail >= currentHead) {
503 available = currentTail - currentHead;
504 } else {
505 available = kBufferSize - currentHead + currentTail;
506 }
507
508 if (available == 0) {
509 return 0;
510 }
511
512 // Pop as many items as requested and available
513 size_t count = std::min(available, maxCount);
514 size_t headPos = currentHead;
515 for (size_t i = 0; i < count; ++i, ++dest) {
516 T* elem = elementAt(headPos);
517 *dest = std::move(*elem);
518 elem->~T();
519 headPos = increment(headPos);
520 }
521
522 if (count > 0) {
523 head_.store(headPos, std::memory_order_release);
524 }
525 return count;
526 }
527
539 bool empty() const {
540 return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire);
541 }
542
554 bool full() const {
555 return increment(tail_.load(std::memory_order_acquire)) ==
556 head_.load(std::memory_order_acquire);
557 }
558
572 size_type size() const {
573 const size_t head = head_.load(std::memory_order_acquire);
574 const size_t tail = tail_.load(std::memory_order_acquire);
575 // Handle wrap-around: if tail < head, we've wrapped
576 return (tail >= head) ? (tail - head) : (kBufferSize - head + tail);
577 }
578
591 static constexpr size_type capacity() noexcept {
592 return kBufferSize - 1;
593 }
594
595 private:
602 static constexpr size_t computeBufferSize() noexcept {
603 return RoundUpToPowerOfTwo ? static_cast<size_t>(detail::nextPow2(Capacity + 1)) : Capacity + 1;
604 }
605
613 static constexpr size_t kBufferSize = computeBufferSize();
614
620 static constexpr bool kIsPowerOfTwo = (kBufferSize & (kBufferSize - 1)) == 0;
621
627 static constexpr size_t kMask = kBufferSize - 1;
628
638 static constexpr size_t increment(size_t index) noexcept {
639 return kIsPowerOfTwo ? ((index + 1) & kMask) : ((index + 1) % kBufferSize);
640 }
641
645 T* elementAt(size_t index) {
646 return reinterpret_cast<T*>(&storage_[index * sizeof(T)]);
647 }
648
649 const T* elementAt(size_t index) const {
650 return reinterpret_cast<const T*>(&storage_[index * sizeof(T)]);
651 }
652
654 alignas(kCacheLineSize) std::atomic<size_t> head_{0};
655
657 alignas(kCacheLineSize) std::atomic<size_t> tail_{0};
658
661 alignas(T) char storage_[sizeof(T) * kBufferSize];
662};
663
664} // namespace dispenso
A lock-free single-producer single-consumer ring buffer with fixed capacity.
size_type try_push_batch(InputIt first, InputIt last)
Attempts to push multiple elements into the buffer.
bool try_pop_into(T *storage)
Attempts to pop an element into uninitialized storage.
bool try_push(const T &item)
Attempts to push an element into the buffer by copying.
SPSCRingBuffer(SPSCRingBuffer &&)=delete
Ring buffers are not movable.
bool try_push(T &&item)
Attempts to push an element into the buffer by moving.
~SPSCRingBuffer()
Destroys the ring buffer.
SPSCRingBuffer()=default
Constructs an empty ring buffer.
bool try_emplace(Args &&... args)
Attempts to construct an element in-place in the buffer.
static constexpr size_type capacity() noexcept
Returns the maximum number of elements the buffer can hold.
bool try_pop(T &item)
Attempts to pop an element from the buffer.
size_type size() const
Returns the current number of elements in the buffer.
size_type try_pop_batch(OutputIt dest, size_type maxCount)
Attempts to pop multiple elements from the buffer.
OpResult< T > try_pop()
Attempts to pop an element from the buffer, returning an optional.
SPSCRingBuffer(const SPSCRingBuffer &)=delete
Ring buffers are not copyable.
bool empty() const
Checks if the buffer is empty.
bool full() const
Checks if the buffer is full.
constexpr size_t kCacheLineSize
A constant that defines a safe number of bytes+alignment to avoid false sharing.
Definition platform.h:97