dispenso 1.6.0
A library for task parallelism
Loading...
Searching...
No Matches
chase_lev_deque.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
25#pragma once
26
27#include <atomic>
28#include <cstddef>
29#include <cstdint>
30#include <cstring>
31#include <new>
32#include <type_traits>
33#include <utility>
34
35#include <dispenso/platform.h>
37
38namespace dispenso {
39
96template <typename T, size_t Capacity = 32>
97#if DISPENSO_HAS_CONCEPTS
98 requires std::is_trivially_copyable_v<T>
99#endif
101 static_assert(Capacity >= 1, "ChaseLevDeque capacity must be at least 1");
102 static_assert((Capacity & (Capacity - 1)) == 0, "ChaseLevDeque capacity must be a power of two");
103#if !DISPENSO_HAS_CONCEPTS
104 static_assert(
105 std::is_trivially_copyable<T>::value,
106 "ChaseLevDeque requires a trivially copyable element type. "
107 "Wrap move-only payloads in a raw pointer or POD container.");
108#endif
109
110 public:
111 using value_type = T;
112 using size_type = size_t;
113
118 ChaseLevDeque() = default;
119
120 ChaseLevDeque(const ChaseLevDeque&) = delete;
121 ChaseLevDeque& operator=(const ChaseLevDeque&) = delete;
122 ChaseLevDeque(ChaseLevDeque&&) = delete;
123 ChaseLevDeque& operator=(ChaseLevDeque&&) = delete;
124
131 ~ChaseLevDeque() = default;
132
133 // ---------------------------------------------------------------------------
134 // Owner-only operations: push and pop.
135 // ---------------------------------------------------------------------------
136
147 bool try_push(const T& item) {
148 const int64_t b = bottom_.load(std::memory_order_relaxed);
149 const int64_t t = top_.load(std::memory_order_acquire);
150 if (b - t >= static_cast<int64_t>(Capacity)) {
151 return false;
152 }
153 // The slot write is ordered after a prior stealer's CAS on top_ (see try_steal):
154 // a stealer that read this slot tentatively must have CAS'd top_ before any
155 // subsequent push could wrap around to this index. TSAN doesn't model the
156 // through-CAS HB chain on non-atomic slot accesses, so annotate as benign.
157 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
158 *slotPtr(b) = item;
159 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
160 // Release on bottom_ publishes the slot write to any acquire load of bottom_
161 // (i.e., the load in try_steal).
162 bottom_.store(b + 1, std::memory_order_release);
163 return true;
164 }
165
176 bool try_pop(T& out) {
177 const int64_t b = bottom_.load(std::memory_order_relaxed) - 1;
178 bottom_.store(b, std::memory_order_relaxed);
179 // Seq-cst fence orders our bottom store before the top load, matching steal's fence.
180 std::atomic_thread_fence(std::memory_order_seq_cst);
181 int64_t t = top_.load(std::memory_order_relaxed);
182
183 if (t > b) {
184 // Empty. Restore bottom.
185 bottom_.store(b + 1, std::memory_order_relaxed);
186 return false;
187 }
188 // Read directly into out. Safe because T is trivially copyable; on CAS loss,
189 // out is unspecified per the try_* contract.
190 out = *slotPtr(b);
191 if (t < b) {
192 // Multiple elements remained; no race possible at this slot.
193 return true;
194 }
195 // t == b: last element. Race with stealers via CAS on top.
196 bottom_.store(b + 1, std::memory_order_relaxed);
197 return top_.compare_exchange_strong(
198 t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
199 }
200
208 bool try_pop_into(T* storage) {
209 const int64_t b = bottom_.load(std::memory_order_relaxed) - 1;
210 bottom_.store(b, std::memory_order_relaxed);
211 std::atomic_thread_fence(std::memory_order_seq_cst);
212 int64_t t = top_.load(std::memory_order_relaxed);
213
214 if (t > b) {
215 bottom_.store(b + 1, std::memory_order_relaxed);
216 return false;
217 }
218 if (t < b) {
219 std::memcpy(storage, slotPtr(b), sizeof(T));
220 return true;
221 }
222 bottom_.store(b + 1, std::memory_order_relaxed);
223 alignas(T) char tmp[sizeof(T)];
224 std::memcpy(tmp, slotPtr(b), sizeof(T));
225 if (!top_.compare_exchange_strong(
226 t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
227 return false;
228 }
229 std::memcpy(storage, tmp, sizeof(T));
230 return true;
231 }
232
233 // ---------------------------------------------------------------------------
234 // Stealer operations: callable from any thread, including the owner.
235 // ---------------------------------------------------------------------------
236
247 bool try_steal(T& out) {
248 int64_t t = top_.load(std::memory_order_acquire);
249 // Seq-cst fence pairs with the fence in try_pop: ensures we observe an updated
250 // bottom if the owner has decremented past us.
251 std::atomic_thread_fence(std::memory_order_seq_cst);
252 const int64_t b = bottom_.load(std::memory_order_acquire);
253 if (t >= b) {
254 return false;
255 }
256 // Read directly into out before CAS. Safe because T is trivially copyable —
257 // slot[t] is unchanged whether we win or lose. On CAS loss, out is unspecified
258 // per the try_* contract. The non-atomic slot read is ordered against the
259 // owner's earlier slot write via the chain: owner writes slot[b] → owner
260 // bottom_.store(b+1, release) → our bottom_.load(acquire) above synchronizes
261 // with that release → slot read happens-before stealer CAS. The seq_cst CAS
262 // on top_ orders us against other concurrent stealers. TSAN doesn't model
263 // this chain through non-atomic reads, so annotate as benign.
264 DISPENSO_TSAN_ANNOTATE_IGNORE_READS_BEGIN();
265 out = *slotPtr(t);
266 DISPENSO_TSAN_ANNOTATE_IGNORE_READS_END();
267 return top_.compare_exchange_strong(
268 t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
269 }
270
275 bool try_steal_into(T* storage) {
276 int64_t t = top_.load(std::memory_order_acquire);
277 std::atomic_thread_fence(std::memory_order_seq_cst);
278 const int64_t b = bottom_.load(std::memory_order_acquire);
279 if (t >= b) {
280 return false;
281 }
282 // Tentative copy via memcpy (T is trivially copyable). Discarded on CAS loss.
283 // See try_steal for the reasoning behind the TSAN annotations.
284 alignas(T) char tmp[sizeof(T)];
285 DISPENSO_TSAN_ANNOTATE_IGNORE_READS_BEGIN();
286 std::memcpy(tmp, slotPtr(t), sizeof(T));
287 DISPENSO_TSAN_ANNOTATE_IGNORE_READS_END();
288 if (!top_.compare_exchange_strong(
289 t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
290 return false;
291 }
292 std::memcpy(storage, tmp, sizeof(T));
293 return true;
294 }
295
296 // ---------------------------------------------------------------------------
297 // Observers. Snapshots only; results may be immediately stale.
298 // ---------------------------------------------------------------------------
299
301 bool empty() const {
302 const int64_t b = bottom_.load(std::memory_order_acquire);
303 const int64_t t = top_.load(std::memory_order_acquire);
304 return b <= t;
305 }
306
308 size_type size() const {
309 const int64_t b = bottom_.load(std::memory_order_acquire);
310 const int64_t t = top_.load(std::memory_order_acquire);
311 return b > t ? static_cast<size_type>(b - t) : 0;
312 }
313
315 static constexpr size_type capacity() noexcept {
316 return Capacity;
317 }
318
319 private:
320 static constexpr size_t kMask = Capacity - 1;
321 static constexpr size_t kStorageAlign =
322 (alignof(T) > kCacheLineSize) ? alignof(T) : kCacheLineSize;
323
324 T* slotPtr(int64_t index) {
325 return reinterpret_cast<T*>(&storage_[(static_cast<size_t>(index) & kMask) * sizeof(T)]);
326 }
327
331 alignas(kCacheLineSize) std::atomic<int64_t> top_{0};
332
334 alignas(kCacheLineSize) std::atomic<int64_t> bottom_{0};
335
337 alignas(kStorageAlign) char storage_[sizeof(T) * Capacity];
338};
339
340} // namespace dispenso
A lock-free SPMC bounded work-stealing deque.
bool try_pop(T &out)
Pops the most recently pushed element (LIFO, owner-only).
bool try_steal_into(T *storage)
Steals the oldest element into uninitialized storage (FIFO).
bool try_pop_into(T *storage)
Pops the most recently pushed element into uninitialized storage (LIFO, owner-only)....
static constexpr size_type capacity() noexcept
Returns the maximum number of elements the deque can hold.
bool empty() const
Returns true if the deque appears empty.
~ChaseLevDeque()=default
Destroys the deque. Trivially-copyable elements need no explicit teardown.
bool try_steal(T &out)
Steals the oldest element from the top of the deque (FIFO).
ChaseLevDeque()=default
Constructs an empty deque. Storage is uninitialized; elements are constructed in-place when pushed.
size_type size() const
Returns the apparent number of elements in the deque.
bool try_push(const T &item)
Pushes an element onto the bottom of the deque (owner-only).
constexpr size_t kCacheLineSize
A constant that defines a safe number of bytes+alignment to avoid false sharing.
Definition platform.h:125