dispenso 1.6.0
A library for task parallelism
Loading...
Searching...
No Matches
thread_pool.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
15#pragma once
16
17#include <atomic>
18#include <cassert>
19#include <condition_variable>
20#include <cstdlib>
21#include <deque>
22#include <iterator>
23#include <mutex>
24#include <thread>
25
26#include <moodycamel/concurrentqueue.h>
27
29#include <dispenso/cpu_set.h>
30#include <dispenso/detail/math.h>
31#include <dispenso/detail/per_thread_info.h>
32#include <dispenso/detail/thread_pool_wake.h>
35#include <dispenso/platform.h>
37
38namespace dispenso {
39
40namespace detail {
41// Relaxed atomic load with TSAN happens-after annotation.
42// Semantically equivalent to memory_order_consume (which compilers promote to
43// acquire). On real hardware, address-dependent loads are naturally ordered —
44// you can't dereference a pointer before loading it. The relaxed load avoids
45// the acquire fence cost on weakly-ordered architectures (ARM: ldr vs ldar).
46// The TSAN annotation establishes the happens-before edge that the C++ abstract
47// machine requires but hardware provides for free via dependency ordering.
48template <typename T>
49T* consumeLoad(std::atomic<T*>& ptr) {
50 T* p = ptr.load(std::memory_order_relaxed);
51 DISPENSO_TSAN_ANNOTATE_HAPPENS_AFTER(&ptr);
52 return p;
53}
54} // namespace detail
55
56namespace detail {
57template <typename Result>
58class FutureBase;
59template <typename Result>
60class FutureImplBase;
61} // namespace detail
62
63#if !defined(DISPENSO_WAKEUP_ENABLE)
64#if defined(_WIN32) || defined(__linux__) || defined(__MACH__)
65#define DISPENSO_WAKEUP_ENABLE 1
66#else
67#define DISPENSO_WAKEUP_ENABLE 0
68#endif // platform
69#endif // DISPENSO_WAKEUP_ENABLE
70
71// Poll-mode timeout: the interval at which idle threads wake to check for work
72// when explicit wake signaling is disabled. Short because polling is the only
73// mechanism that discovers newly scheduled work.
74#if !defined(DISPENSO_POLL_PERIOD_US)
75#if defined(_WIN32)
76#define DISPENSO_POLL_PERIOD_US 1000
77#else
78#define DISPENSO_POLL_PERIOD_US 200
79#endif
80#endif // DISPENSO_POLL_PERIOD_US
81
82// Wake-mode backstop: in wake mode, threads are woken explicitly via
83// futex/WaitOnAddress/ulock. This timeout bounds worst-case latency from rare
84// races (e.g., a thread entering sleep between wakeAll's epoch bump and its
85// own enterSleep). Uniform across all platforms — the wake system handles
86// normal-path latency; this is purely a safety net.
87#if !defined(DISPENSO_WAKE_BACKSTOP_US)
88#define DISPENSO_WAKE_BACKSTOP_US 100000
89#endif
90
91constexpr bool kDefaultWakeupEnable = DISPENSO_WAKEUP_ENABLE;
92
93constexpr uint32_t kDefaultSleepLenUs =
94 kDefaultWakeupEnable ? DISPENSO_WAKE_BACKSTOP_US : DISPENSO_POLL_PERIOD_US;
95
101struct ForceQueuingTag {};
102
108class DISPENSO_CACHELINE_ALIGNED ThreadPool {
109 public:
117 DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier = 32);
118
137 template <class Rep, class Period>
139 bool enable,
140 const std::chrono::duration<Rep, Period>& sleepDuration =
141 std::chrono::microseconds(kDefaultSleepLenUs)) {
142 setSignalingWake(
143 enable,
144 static_cast<uint32_t>(
145 std::chrono::duration_cast<std::chrono::microseconds>(sleepDuration).count()));
146 }
147
154 DISPENSO_DLL_ACCESS void resize(ssize_t n) DISPENSO_NO_THREAD_SAFETY_ANALYSIS {
155 std::lock_guard<std::mutex> lk(threadsMutex_);
156 resizeLocked(n);
157 }
158
165 ssize_t numThreads() const {
166 return numThreads_.load(std::memory_order_relaxed);
167 }
168
177 template <typename F>
178 DISPENSO_REQUIRES(OnceCallableFunc<F>)
179 void schedule(F&& f);
180
189 template <typename F>
190 DISPENSO_REQUIRES(OnceCallableFunc<F>)
191 void schedule(F&& f, ForceQueuingTag);
192
205 template <typename Generator>
206 void scheduleBulk(size_t count, Generator&& gen);
207
213 DISPENSO_DLL_ACCESS ~ThreadPool();
214
229 class AwakeRef {
230 public:
231 AwakeRef() = default;
233 explicit AwakeRef(ThreadPool* pool) : pool_(pool) {
234 if (pool_) {
235 pool_->keepAwakeCount_.fetch_add(1, std::memory_order_acq_rel);
236 }
237 }
238 AwakeRef(const AwakeRef&) = delete;
239 AwakeRef& operator=(const AwakeRef&) = delete;
241 AwakeRef(AwakeRef&& other) noexcept : pool_(other.pool_) {
242 other.pool_ = nullptr;
243 }
244 AwakeRef& operator=(AwakeRef&& other) noexcept {
245 reset();
246 pool_ = other.pool_;
247 other.pool_ = nullptr;
248 return *this;
249 }
250 ~AwakeRef() {
251 reset();
252 }
254 void reset() {
255 if (pool_) {
256 pool_->keepAwakeCount_.fetch_sub(1, std::memory_order_release);
257 pool_ = nullptr;
258 }
259 }
260
261 private:
262 ThreadPool* pool_ = nullptr;
263 };
264
271 return AwakeRef(this);
272 }
273
274 private:
275 class PerThreadData {
276 public:
277 void setThread(std::thread&& t);
278
279 bool running();
280
281 void stop();
282
283 ~PerThreadData();
284
285 alignas(kCacheLineSize) std::thread thread_;
286 std::atomic<bool> running_{true};
287 };
288
289 DISPENSO_DLL_ACCESS uint32_t waitOnThread(int32_t threadIdx, uint32_t priorEpoch);
290
291 void setSignalingWake(bool enable, uint32_t sleepDurationUs) DISPENSO_NO_THREAD_SAFETY_ANALYSIS {
292 std::lock_guard<std::mutex> lk(threadsMutex_);
293 ssize_t currentPoolSize = numThreads();
294 resizeLocked(0);
295 enableEpochWaiter_.store(enable, std::memory_order_release);
296 sleepLengthUs_.store(sleepDurationUs, std::memory_order_release);
297 resizeLocked(currentPoolSize);
298 }
299
300 DISPENSO_DLL_ACCESS void resizeLocked(ssize_t n);
301
302 void executeNext(OnceFunction work);
303
304 template <bool kUseWakeSleep>
305 void threadLoopImpl(PerThreadData& threadData, int32_t ringIndex);
306
307 void threadLoopWake(PerThreadData& threadData, int32_t ringIndex) {
308 threadLoopImpl<true>(threadData, ringIndex);
309 }
310 void threadLoopPoll(PerThreadData& threadData, int32_t ringIndex) {
311 threadLoopImpl<false>(threadData, ringIndex);
312 }
313
314 void markWorkDone(bool& isWorking);
315 void markIdle(bool& isWorking);
316
317 bool tryExecuteNext();
318 bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
319 bool tryExecuteNextFromRings(size_t& startRing);
320
321 // Load-factor check shared by all inline-or-queue schedule overloads.
322 DISPENSO_INLINE bool shouldRunInline();
323
324 // Core scheduling: central queue + bulk-like wake (default, throughput-oriented).
325 DISPENSO_INLINE void scheduleImpl(OnceFunction task, moodycamel::ProducerToken* token);
326
327 // Placed scheduling: proactive wake → steal ring → central queue.
328 // Higher per-call cost but better latency for individual tasks (futures, pipelines).
329 DISPENSO_INLINE void scheduleImplPlaced(OnceFunction task, moodycamel::ProducerToken* token);
330
331 // Shared body for all ForceQueuingTag overloads. kPlaced selects
332 // scheduleImplPlaced (true) vs scheduleImpl (false).
333 template <bool kPlaced, typename F>
334 inline void forceEnqueue(F&& f, moodycamel::ProducerToken* token);
335
336 template <typename F>
337 void schedule(moodycamel::ProducerToken& token, F&& f);
338
339 template <typename F>
340 void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
341
342 template <typename F>
343 void schedulePlaced(moodycamel::ProducerToken& token, F&& f);
344
345 template <typename F>
346 void schedulePlaced(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
347
348 // Placed scheduling: public-like API but private — only for internal dispenso callers.
349 template <typename F>
350 DISPENSO_REQUIRES(OnceCallableFunc<F>)
351 void schedulePlaced(F&& f);
352
353 template <typename F>
354 DISPENSO_REQUIRES(OnceCallableFunc<F>)
355 void schedulePlaced(F&& f, ForceQueuingTag);
356
357 // Shared body for scheduleBulk/scheduleBulkPlaced. kPlaced selects
358 // placed path (scheduleImplPlaced per-task) vs central queue (scheduleBulkEnqueue).
359 template <bool kPlaced, typename Generator>
360 void scheduleBulkImpl(size_t count, Generator&& gen);
361
362 // Bulk placed scheduling: chunked submit through the steal-ring path. Internal only —
363 // exposed via ConcurrentTaskSet's scheduleBulk under TaskCost::kHeavy routing.
364 template <typename Generator>
365 void scheduleBulkPlaced(size_t count, Generator&& gen);
366
367 // Core bulk enqueue: unconditionally stage, enqueue, and wake for a chunk of tasks.
368 // Caller is responsible for load factor checks. Count should be small (e.g. <= 2*numThreads).
369 // When a producer token is provided, uses token-based enqueue for better throughput.
370 template <typename Generator>
371 void
372 scheduleBulkEnqueue(size_t count, Generator&& gen, moodycamel::ProducerToken* token = nullptr);
373
374 // Wake enough threads to handle pending work. Uses PoolWakeState's budget-
375 // limited cascade for efficient parallel waking.
376 // Missed wakes are benign: the EpochWaiter's sleep timeout provides a safety
377 // net, so a missed wake only delays wakeup by up to that duration.
378 void conditionallyWake() {
379 auto* ws = detail::consumeLoad(wakeState_);
380 if (enableEpochWaiter_.load(std::memory_order_acquire) && ws) {
381 int32_t sleeping = ws->totalSleeping();
382 if (sleeping > 0) {
383 ssize_t pending = workRemaining_.load(std::memory_order_relaxed);
384 ssize_t numT = numThreads_.load(std::memory_order_relaxed);
385 ssize_t awake = numT - static_cast<ssize_t>(sleeping);
386 if (pending > awake) {
387 ws->claimAndWakeOne();
388 }
389 }
390 }
391 }
392
393 public:
394 // If we are not yet C++17, we provide aligned new/delete to avoid false sharing.
395#if __cplusplus < 201703L
396 static void* operator new(size_t sz) {
397 return detail::alignedMalloc(sz);
398 }
399 static void operator delete(void* ptr) {
400 return detail::alignedFree(ptr);
401 }
402#endif // __cplusplus
403
404 private:
405 // Per-thread ring buffer type for fork-join scheduling.
406 // 16 slots matches kAuto's oversubscription factor and fits in one cache line group.
407 using Ring = MpmcRingBuffer<OnceFunction, 16>;
408
409 // Steal ring configuration.
410 // Slots per thread: base capacity before sharing multiplier.
411 static constexpr size_t kStealSlotsPerThread = 4;
412 // Sharing factor: threads per steal ring (aligned with wake group size).
413#if defined(DISPENSO_TUNE_STEAL_RING_SHARING)
414 static constexpr size_t kStealRingSharing = DISPENSO_TUNE_STEAL_RING_SHARING;
415#else
416 // Matches the wake group-size default (8). See docs/design/wake_tuning.md.
417 static constexpr size_t kStealRingSharing = 8;
418#endif
419 static constexpr size_t kStealRingCapacity = kStealSlotsPerThread * kStealRingSharing;
420 using StealRing = MpmcRingBuffer<OnceFunction, kStealRingCapacity>;
421
422 // Cross-ring steal gate: workers only probe other rings' has-work bitmask
423 // after `failCount` consecutive empty pops on their own ring. Preserves
424 // placed-scheduling locality during steady-state operation; the threshold
425 // (~kSpinCheckInterval / 2) means we steal cross-ring only after sustained
426 // local idle, by which point our own ring's locality is exhausted anyway.
427#if defined(DISPENSO_TUNE_CROSS_RING_FAIL_THRESHOLD)
428 static constexpr int kCrossRingFailThreshold = DISPENSO_TUNE_CROSS_RING_FAIL_THRESHOLD;
429#else
430 static constexpr int kCrossRingFailThreshold = 32;
431#endif
432
433 // Enqueue a task to the central concurrent queue with optional producer token.
434 // Handles TSAN annotations. Used by scheduleBulkToRings for overflow tasks.
435 // moodycamel::ConcurrentQueue::enqueue returns false only on allocation
436 // failure; we propagate that as std::bad_alloc so callers don't silently
437 // drop work (which would deadlock TaskSet::wait via inflated outstanding
438 // counts). Out-of-memory recovery from individual small allocs is not
439 // tractable in general; throwing lets the application unwind or terminate.
440 DISPENSO_INLINE void enqueueToCentralQueue(OnceFunction task, moodycamel::ProducerToken* token) {
441 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
442 bool enqueued;
443 if (token) {
444 enqueued = work_.enqueue(*token, std::move(task));
445 } else {
446 enqueued = work_.enqueue(std::move(task));
447 }
448 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
449 if (DISPENSO_EXPECT(!enqueued, false)) {
450#if defined(__cpp_exceptions)
451 throw std::bad_alloc();
452#else
453 std::abort();
454#endif
455 }
456 // Mark queue as possibly-non-empty so spinning workers will try_dequeue.
457 centralQueueNonEmpty_.store(true, std::memory_order_relaxed);
458 }
459
460 // Push task i to ring i (linear layout) for fork-join scheduling.
461 // Tasks that don't fit in their ring go to central queue via fallbackToken.
462 // Handles workRemaining_ accounting and waking (one batch wake at the end).
463 // outstandingTaskCount_ must be managed by the caller.
464 template <typename Generator>
465 void scheduleBulkToRings(size_t count, Generator&& gen, moodycamel::ProducerToken* fallbackToken);
466
467 template <typename Generator>
468 DISPENSO_INLINE void scheduleBulkToRingsFastPath(
469 size_t count,
470 size_t ringCount,
471 Generator&& gen,
472 moodycamel::ProducerToken* fallbackToken);
473
474 template <typename Generator>
475 DISPENSO_INLINE void scheduleBulkToRingsBatched(
476 size_t count,
477 size_t ringCount,
478 size_t tasksPerRing,
479 Generator&& gen,
480 moodycamel::ProducerToken* fallbackToken);
481
482 // Shared work-finding logic for both loop variants. Checks own ring,
483 // central queue, and steal ring as third tier.
484 // Returns true if work was found and executed.
485 // preferRing: sticky hint — true = try ring first, false = try central queue first.
486 // failCount: consecutive failures finding work — used to gate cross-ring stealing
487 // so we preserve placed-scheduling locality during steady-state operation.
488 DISPENSO_INLINE bool tryFindAndExecuteWork(
489 Ring& myRing,
490 StealRing& myStealRing,
491 size_t myStealIdx,
492 moodycamel::ConsumerToken& ctoken,
493 bool& preferRing,
494 int failCount,
495 bool checkQueue = true);
496
497 // Number of tasks each thread accumulates before flushing workRemaining_.
498 // This batching reduces atomic contention in threadLoop, but inflates
499 // workRemaining_ by up to kWorkBatchSize * numThreads, so poolLoadFactor_
500 // must be reduced accordingly for accurate load-shedding in schedule().
501 static constexpr int kWorkBatchSize = 8;
502
503 // Minimum spinning threads before we skip waking a sleeper. If fewer than
504 // this many threads are spinning, we wake a sleeper to ensure coverage.
505 // Higher = more aggressive waking; lower = trust spinners more.
506 static constexpr int32_t kSpinnerWakeThreshold = 2;
507
508 mutable std::mutex threadsMutex_;
509 std::deque<PerThreadData> threads_;
510 size_t poolLoadMultiplier_;
511
512 // These atomics are read frequently in the hot schedule() path, so they need
513 // cache-line alignment to avoid false sharing with the mutex/deque above.
514 alignas(kCacheLineSize) std::atomic<ssize_t> poolLoadFactor_;
515 std::atomic<ssize_t> numThreads_;
516
517 moodycamel::ConcurrentQueue<OnceFunction> work_;
518
519 // Approximate flag indicating the central queue may have work. Set (store
520 // true) after enqueue, cleared (store false) when try_dequeue finds the queue
521 // empty. Used to gate try_dequeue in tryFindAndExecuteWork: a relaxed load
522 // replaces the expensive try_dequeue CAS when the queue is known empty,
523 // eliminating CAS contention from idle spinning threads. No atomic RMW —
524 // only plain stores and loads — so no contention on the flag itself.
525 // Brief false-negatives (flag cleared while an enqueue is in flight) are
526 // bounded to one spin iteration and self-correcting.
527 alignas(kCacheLineSize) std::atomic<bool> centralQueueNonEmpty_{false};
528
529 // Refcount of outstanding AwakeRef handles. When > 0, worker threads skip
530 // the sleep transition at the end of their spin window and continue
531 // spinning. Bumped by AwakeRef ctor, decremented by dtor. Workers read with
532 // relaxed ordering — a brief stale-true read just costs one extra spin
533 // iteration; a brief stale-false read is bounded by the spin window and
534 // self-corrects on the next iteration.
535 alignas(kCacheLineSize) std::atomic<int32_t> keepAwakeCount_{0};
536
537 alignas(kCacheLineSize) std::atomic<ssize_t> workRemaining_{0};
538
539 alignas(kCacheLineSize) std::atomic<bool> enableEpochWaiter_{kDefaultWakeupEnable};
540 std::atomic<uint32_t> sleepLengthUs_{kDefaultSleepLenUs};
541
542 // Per-thread wake infrastructure: EpochWaiters, sleep masks, budget cascade.
543 // Atomic pointer — schedule paths load with relaxed ordering.
544 //
545 // Retired PoolWakeState objects are kept alive for the lifetime of the pool
546 // (grow-only graveyard); they are intentionally NOT freed at resize. schedule()
547 // reads wakeState_ lock-free (no threadsMutex_) and dereferences it (e.g.
548 // ws->totalSleeping()), so freeing a retired generation while a concurrent
549 // schedule() may still hold that pointer is a use-after-free. resize() joins all
550 // *pool* threads before swapping wakeState_, but external (non-pool) schedule()
551 // callers race the swap, so a retired object must outlive any such in-flight
552 // reader. Without a safe-reclamation protocol, never freeing during operation is
553 // the only correct option (an earlier bounded variant that freed old
554 // generations was reverted after TSAN caught exactly this free-vs-read race).
555 //
556 // Cost: one PoolWakeState (~O(numThreads)) is retained per resize() until the
557 // pool is destroyed. resize() is expected to be rare, so this is bounded in
558 // practice; only a process performing hundreds of thousands of resizes would
559 // accumulate meaningful memory. See docs/design/roadmap.md ("Bounded
560 // PoolWakeState reclamation") for the planned asymmetric-fence / hazard-pointer
561 // scheme to bound this without taxing the schedule() hot path.
562 std::atomic<detail::PoolWakeState*> wakeState_{nullptr};
563 std::vector<decltype(detail::makeAligned<detail::PoolWakeState>(0))> wakeStateGraveyard_;
564
565 // Per-thread rings for fork-join scheduling. ConcurrentObjectArena provides
566 // stable pointers (grow-only, never freed), eliminating the need for a resize
567 // lock on the schedule path. Threads check own ring first in the steal order.
568 ConcurrentObjectArena<Ring> rings_;
569 std::atomic<size_t> numRings_{0};
570
571 // Steal rings for non-locality work distribution.
572 // Populated by schedule() (both proactive wake and no-sleeper paths).
573 // Consumed in tryFindAndExecuteWork (third tier) and outer thread loop.
574 //
575 // stealRingSharing_: threads per steal ring (default kStealRingSharing).
576 // Ring capacity = kStealSlotsPerThread * kStealRingSharing.
577 ConcurrentObjectArena<StealRing> stealRings_;
578 std::atomic<size_t> numStealRings_{0};
579 size_t stealRingSharing_{kStealRingSharing};
580
581 // Sparse hint for which steal rings have work. Bit i set means
582 // stealRings_[i] may have work. Set on push (idempotent fetch_or); lazily
583 // cleared by consumers that find a ring empty after popping or that
584 // observe an empty ring at scan time. False positives are benign (just a
585 // wasted try_pop); false negatives are not possible because every
586 // successful push sets the bit before the work is observable.
587 // Pools with >64 steal rings (512+ threads at kStealRingSharing=8) degrade
588 // gracefully: rings beyond index 63 still receive work via try_push, but
589 // aren't tracked in this bitmask, so cross-ring stealing falls back to
590 // local-ring-only polling for those rings.
591 static constexpr size_t kMaxStealRings = 64;
592 alignas(kCacheLineSize) std::atomic<uint64_t> stealRingsWithWork_{0};
593
594 // Threads not currently in their inner work loop (spinning or sleeping).
595 // Incremented when a thread exhausts work (exits inner loop with nothing found).
596 // Decremented when a thread finds work (enters inner loop) or at thread exit.
597 // Updated at burst boundaries (not per-task), so contention is low.
598 // Used by schedule paths to skip wake calls when spinners exist.
599 alignas(kCacheLineSize) std::atomic<int32_t> numNotWorking_{0};
600
601#if defined DISPENSO_DEBUG
602 alignas(kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
603#endif // DISPENSO_DEBUG
604
605 friend class ConcurrentTaskSet;
606 friend class TaskSet;
607 friend class TaskSetBase;
608
609 template <typename Result>
610 friend class detail::FutureBase;
611 template <typename Result>
612 friend class detail::FutureImplBase;
613};
614
620DISPENSO_DLL_ACCESS ThreadPool& globalThreadPool();
621
627DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads);
628
629// ----------------------------- Implementation details -------------------------------------
630
631DISPENSO_INLINE bool ThreadPool::shouldRunInline() {
632 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
633 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
634 quickLoadFactor += quickLoadFactor / 2;
635 return (detail::PerPoolPerThreadInfo::isPoolRecursive(this) && curWork > quickLoadFactor) ||
636 (curWork > poolLoadFactor_.load(std::memory_order_relaxed));
637}
638
639template <bool kPlaced, typename F>
640inline void ThreadPool::forceEnqueue(F&& f, moodycamel::ProducerToken* token) {
641 if (!numThreads_.load(std::memory_order_relaxed)) {
642 f();
643 return;
644 }
645 workRemaining_.fetch_add(1, std::memory_order_release);
646 if (kPlaced) {
647 scheduleImplPlaced({std::forward<F>(f)}, token);
648 } else {
649 scheduleImpl({std::forward<F>(f)}, token);
650 }
651}
652
653template <typename F>
654DISPENSO_REQUIRES(OnceCallableFunc<F>)
655inline void ThreadPool::schedule(F&& f) {
656 if (shouldRunInline()) {
657 f();
658 } else {
659 schedule(std::forward<F>(f), ForceQueuingTag());
660 }
661}
662
663template <typename F>
664DISPENSO_REQUIRES(OnceCallableFunc<F>)
665inline void ThreadPool::schedule(F&& f, ForceQueuingTag) {
666 auto* token =
667 static_cast<moodycamel::ProducerToken*>(detail::PerPoolPerThreadInfo::producer(this));
668 forceEnqueue<false>(std::forward<F>(f), token);
669}
670
671template <typename F>
672inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f) {
673 if (shouldRunInline()) {
674 f();
675 } else {
676 schedule(token, std::forward<F>(f), ForceQueuingTag());
677 }
678}
679
680template <typename F>
681inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag) {
682 forceEnqueue<false>(std::forward<F>(f), &token);
683}
684
685template <typename F>
686DISPENSO_REQUIRES(OnceCallableFunc<F>)
687inline void ThreadPool::schedulePlaced(F&& f) {
688 if (shouldRunInline()) {
689 f();
690 } else {
691 schedulePlaced(std::forward<F>(f), ForceQueuingTag());
692 }
693}
694
695template <typename F>
696DISPENSO_REQUIRES(OnceCallableFunc<F>)
697inline void ThreadPool::schedulePlaced(F&& f, ForceQueuingTag) {
698 auto* token =
699 static_cast<moodycamel::ProducerToken*>(detail::PerPoolPerThreadInfo::producer(this));
700 forceEnqueue<true>(std::forward<F>(f), token);
701}
702
703template <typename F>
704inline void ThreadPool::schedulePlaced(moodycamel::ProducerToken& token, F&& f) {
705 if (shouldRunInline()) {
706 f();
707 } else {
708 schedulePlaced(token, std::forward<F>(f), ForceQueuingTag());
709 }
710}
711
712template <typename F>
713inline void ThreadPool::schedulePlaced(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag) {
714 forceEnqueue<true>(std::forward<F>(f), &token);
715}
716
717DISPENSO_INLINE void ThreadPool::scheduleImpl(OnceFunction task, moodycamel::ProducerToken* token) {
718 enqueueToCentralQueue(std::move(task), token);
719
720 // Wake when pending work exceeds awake threads. Each schedule()
721 // call wakes at most one sleeper — matching baseline's wake-per-task
722 // cadence but using per-thread futexes. The one-at-a-time approach
723 // avoids thundering herd on the central queue while still waking
724 // threads proportionally to submitted work over a burst.
725 auto* ws = detail::consumeLoad(wakeState_);
726 if (enableEpochWaiter_.load(std::memory_order_acquire) && ws) {
727 int32_t sleeping = ws->totalSleeping();
728 if (sleeping > 0) {
729 ssize_t pending = workRemaining_.load(std::memory_order_relaxed);
730 ssize_t numT = numThreads_.load(std::memory_order_relaxed);
731 ssize_t awake = numT - static_cast<ssize_t>(sleeping);
732 if (pending > awake) {
733 ws->claimAndWakeOne();
734 }
735 }
736 }
737}
738
739DISPENSO_INLINE void ThreadPool::scheduleImplPlaced(
740 OnceFunction task,
741 moodycamel::ProducerToken* token) {
742 // Proactive wake: claim a sleeping thread and push to its steal ring.
743 auto* ws = detail::consumeLoad(wakeState_);
744 if (enableEpochWaiter_.load(std::memory_order_acquire) && ws) {
745 int32_t sleeping = ws->totalSleeping();
746 if (sleeping > 0 &&
747 numNotWorking_.load(std::memory_order_relaxed) - sleeping < kSpinnerWakeThreshold) {
748 int32_t wokeThread = ws->claimAndWakeOne();
749 if (wokeThread >= 0) {
750 size_t stealIdx = static_cast<size_t>(wokeThread) / stealRingSharing_;
751 if (stealIdx < numStealRings_.load(std::memory_order_relaxed) &&
752 stealRings_[stealIdx].try_push(std::move(task))) {
753 if (stealIdx < kMaxStealRings) {
754 stealRingsWithWork_.fetch_or(uint64_t{1} << stealIdx, std::memory_order_release);
755 }
756 return;
757 }
758 }
759 }
760 }
761
762 // Central queue fallback.
763 enqueueToCentralQueue(std::move(task), token);
764
765 conditionallyWake();
766}
767
768inline bool ThreadPool::tryExecuteNext() {
769 OnceFunction next;
770 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
771 bool dequeued = work_.try_dequeue(next);
772 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
773 if (dequeued) {
774 executeNext(std::move(next));
775 return true;
776 }
777 return false;
778}
779
780inline bool ThreadPool::tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token) {
781 OnceFunction next;
782 if (work_.try_dequeue_from_producer(token, next)) {
783 executeNext(std::move(next));
784 return true;
785 }
786 return false;
787}
788
789inline bool ThreadPool::tryExecuteNextFromRings(size_t& startRing) {
790 OnceFunction task;
791 // Acquire pairs with resizeLocked's numRings_.store(release), which is published
792 // only after the new rings are fully constructed in the arena. A relaxed load
793 // could observe the grown count without the rings' construction being visible,
794 // letting us index a not-yet-constructed ring (UB; SIGILL on weak-memory targets
795 // like arm64).
796 size_t n = numRings_.load(std::memory_order_acquire);
797 for (size_t i = 0; i < n; ++i) {
798 size_t idx = (startRing + i) % n;
799 if (rings_[idx].try_pop(task)) {
800 startRing = idx;
801 executeNext(std::move(task));
802 return true;
803 }
804 }
805 startRing = 0;
806 return false;
807}
808
809inline void ThreadPool::executeNext(OnceFunction next) {
810 next();
811 workRemaining_.fetch_add(-1, std::memory_order_relaxed);
812}
813
814DISPENSO_INLINE bool ThreadPool::tryFindAndExecuteWork(
815 Ring& myRing,
816 StealRing& myStealRing,
817 size_t myStealIdx,
818 moodycamel::ConsumerToken& ctoken,
819 bool& preferRing,
820 int failCount,
821 bool checkQueue) {
822 OnceFunction task;
823
824 if (preferRing) {
825 bool fromRing = myRing.try_pop(task);
826 if (fromRing) {
827 task();
828 return true;
829 }
830 if (checkQueue && centralQueueNonEmpty_.load(std::memory_order_relaxed)) {
831 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
832 bool got = work_.try_dequeue(ctoken, task);
833 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
834 if (got) {
835 preferRing = false;
836 task();
837 return true;
838 }
839 // Empty on observation; clear flag (relaxed, plain store).
840 centralQueueNonEmpty_.store(false, std::memory_order_relaxed);
841 }
842 if (!myStealRing.empty() && myStealRing.try_pop(task)) {
843 task();
844 return true;
845 }
846 if (failCount >= kCrossRingFailThreshold) {
847 uint64_t mask = stealRingsWithWork_.load(std::memory_order_acquire);
848 if (mask != 0) {
849 if (myStealIdx < kMaxStealRings) {
850 mask &= ~(uint64_t{1} << myStealIdx);
851 }
852 if (mask != 0) {
853 int target = detail::countTrailingZeros(mask);
854 if (stealRings_[static_cast<size_t>(target)].try_pop(task)) {
855 task();
856 return true;
857 }
858 stealRingsWithWork_.fetch_and(~(uint64_t{1} << target), std::memory_order_relaxed);
859 }
860 }
861 }
862 } else {
863 if (checkQueue && centralQueueNonEmpty_.load(std::memory_order_relaxed)) {
864 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
865 bool got = work_.try_dequeue(ctoken, task);
866 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
867 if (got) {
868 task();
869 return true;
870 }
871 centralQueueNonEmpty_.store(false, std::memory_order_relaxed);
872 }
873 bool fromRing = myRing.try_pop(task);
874 if (fromRing) {
875 preferRing = true;
876 task();
877 return true;
878 }
879 }
880
881 return false;
882}
883
884template <typename Generator>
885DISPENSO_INLINE void ThreadPool::scheduleBulkToRingsFastPath(
886 size_t count,
887 size_t ringCount,
888 Generator&& gen,
889 moodycamel::ProducerToken* fallbackToken) {
890#if !defined(DISPENSO_DISABLE_CASCADE_WAKERANGE)
891 // Pattern C cascade: wrap each cascade-host thread's task in a lambda
892 // that wakes its target group BEFORE running user work. Producer issues
893 // one wake-all on the seed group; the woken threads cascade in parallel
894 // to all other groups, then run their own user work.
895 auto* wsCascade = detail::consumeLoad(wakeState_);
896 bool useCascade = enableEpochWaiter_.load(std::memory_order_acquire) && wsCascade &&
897 wsCascade->totalSleeping() > 0;
898 for (size_t ring = 0; ring < count && ring < ringCount; ++ring) {
899 OnceFunction task = gen(ring);
900 int32_t target = useCascade
901 ? wsCascade->cascadeTargetFor(static_cast<int32_t>(ring), static_cast<int32_t>(count))
902 : -1;
903 if (target >= 0) {
904 OnceFunction wrapped = [wsCascade, target, inner = std::move(task)]() mutable {
905 wsCascade->cascadeWake(target);
906 inner();
907 };
908 if (!rings_[ring].try_push(std::move(wrapped))) {
909 enqueueToCentralQueue(std::move(wrapped), fallbackToken);
910 }
911 } else {
912 if (!rings_[ring].try_push(std::move(task))) {
913 enqueueToCentralQueue(std::move(task), fallbackToken);
914 }
915 }
916 }
917#else
918 for (size_t ring = 0; ring < count && ring < ringCount; ++ring) {
919 OnceFunction task = gen(ring);
920 if (!rings_[ring].try_push(std::move(task))) {
921 enqueueToCentralQueue(std::move(task), fallbackToken);
922 }
923 }
924#endif
925}
926
927template <typename Generator>
928DISPENSO_INLINE void ThreadPool::scheduleBulkToRingsBatched(
929 size_t count,
930 size_t ringCount,
931 size_t tasksPerRing,
932 Generator&& gen,
933 moodycamel::ProducerToken* fallbackToken) {
934 constexpr size_t kMaxStage = Ring::capacity();
935 size_t taskIdx = 0;
936 for (size_t ring = 0; ring < ringCount && taskIdx < count; ++ring) {
937 size_t blockEnd = std::min(taskIdx + tasksPerRing, count);
938 size_t blockSize = blockEnd - taskIdx;
939
940 size_t toStage = std::min(blockSize, kMaxStage);
941 OnceFunction staged[kMaxStage];
942 for (size_t j = 0; j < toStage; ++j) {
943 staged[j] = gen(taskIdx + j);
944 }
945
946 size_t pushed = rings_[ring].try_push_batch(staged, toStage);
947
948 for (size_t j = pushed; j < toStage; ++j) {
949 enqueueToCentralQueue(std::move(staged[j]), fallbackToken);
950 }
951
952 for (size_t j = taskIdx + toStage; j < blockEnd; ++j) {
953 enqueueToCentralQueue(gen(j), fallbackToken);
954 }
955 taskIdx += blockSize;
956 }
957}
958
959template <typename Generator>
960void ThreadPool::scheduleBulkToRings(
961 size_t count,
962 Generator&& gen,
963 moodycamel::ProducerToken* fallbackToken) {
964 if (count == 0) {
965 return;
966 }
967 assert(count <= numRings_.load(std::memory_order_relaxed));
968
969 workRemaining_.fetch_add(static_cast<ssize_t>(count), std::memory_order_release);
970
971 // Acquire: see tryExecuteNextFromRings. Pairs with the release store in
972 // resizeLocked so we observe the freshly-constructed rings, not merely the
973 // updated count.
974 size_t ringCount = numRings_.load(std::memory_order_acquire);
975 size_t tasksPerRing = (count + ringCount - 1) / ringCount;
976
977 if (tasksPerRing <= 1) {
978 scheduleBulkToRingsFastPath(count, ringCount, std::forward<Generator>(gen), fallbackToken);
979 } else {
980 scheduleBulkToRingsBatched(
981 count, ringCount, tasksPerRing, std::forward<Generator>(gen), fallbackToken);
982 }
983
984 auto* ws = detail::consumeLoad(wakeState_);
985 if (enableEpochWaiter_.load(std::memory_order_acquire) && ws) {
986#if !defined(DISPENSO_DISABLE_CASCADE_WAKERANGE)
987 ws->cascadeWakeSeed(static_cast<int32_t>(count));
988#else
989 ws->wakeRange(static_cast<int32_t>(count));
990#endif
991 }
992}
993
994namespace detail {
995// Generating iterator for scheduleBulkEnqueue. Produces OnceFunction objects
996// on-the-fly during enqueue_bulk, avoiding the need for a staging buffer.
997// moodycamel's enqueue_bulk uses single-pass input iterator semantics.
998template <typename Generator>
999struct BulkGenIter {
1000 using difference_type = std::ptrdiff_t;
1001 using value_type = OnceFunction;
1002 using pointer = OnceFunction*;
1003 using reference = OnceFunction&;
1004 using iterator_category = std::input_iterator_tag;
1005
1006 Generator* gen;
1007 size_t index;
1008 OnceFunction operator*() {
1009 return (*gen)(index);
1010 }
1011 BulkGenIter& operator++() {
1012 ++index;
1013 return *this;
1014 }
1015 BulkGenIter operator++(int) {
1016 BulkGenIter tmp = *this;
1017 ++index;
1018 return tmp;
1019 }
1020};
1021} // namespace detail
1022
1023template <typename Generator>
1024void ThreadPool::scheduleBulkEnqueue(
1025 size_t count,
1026 Generator&& gen,
1027 moodycamel::ProducerToken* token) {
1028 detail::BulkGenIter<typename std::remove_reference<Generator>::type> it{&gen, 0};
1029
1030 // Single atomic update + bulk enqueue
1031 workRemaining_.fetch_add(static_cast<ssize_t>(count), std::memory_order_release);
1032
1033 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
1034 bool enqueued;
1035 if (token) {
1036 enqueued = work_.enqueue_bulk(*token, it, count);
1037 } else {
1038 enqueued = work_.enqueue_bulk(it, count);
1039 }
1040 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
1041 if (DISPENSO_EXPECT(!enqueued, false)) {
1042 workRemaining_.fetch_sub(static_cast<ssize_t>(count), std::memory_order_relaxed);
1043#if defined(__cpp_exceptions)
1044 throw std::bad_alloc();
1045#else
1046 std::abort();
1047#endif
1048 }
1049 // Mark queue as possibly-non-empty so spinning workers will try_dequeue.
1050 centralQueueNonEmpty_.store(true, std::memory_order_relaxed);
1051
1052 // Wake appropriate threads. Cap by actual sleeping count to avoid over-waking.
1053 // Spinning threads (numNotWorking - totalSleeping) will find enqueued work
1054 // naturally, so only wake enough sleepers to cover the deficit beyond the
1055 // spinner threshold.
1056 auto* ws = detail::consumeLoad(wakeState_);
1057 if (enableEpochWaiter_.load(std::memory_order_acquire) && ws) {
1058 int32_t sleeping = ws->totalSleeping();
1059 if (sleeping > 0) {
1060 int32_t notWorking = numNotWorking_.load(std::memory_order_relaxed);
1061 int32_t spinning = std::max(int32_t{0}, notWorking - sleeping);
1062 // Only count spinners beyond the threshold as "covering" tasks
1063 int32_t effectiveSpinners = std::max(int32_t{0}, spinning - kSpinnerWakeThreshold + 1);
1064 int32_t toWake = std::max(int32_t{0}, static_cast<int32_t>(count) - effectiveSpinners);
1065 toWake = std::min(toWake, sleeping);
1066 if (toWake <= ws->branchFactor()) {
1067 // Small N: direct claim+wake, no cascade overhead.
1068 for (int32_t i = 0; i < toWake; ++i) {
1069 if (ws->claimAndWakeOne() < 0) {
1070 break;
1071 }
1072 }
1073 } else {
1074 // Large N: Pattern C cascade. Producer wakes seed g0; cascade-host
1075 // threads wake their target groups in parallel as they spin up.
1076 // Central-queue work is found by the standard tryFindAndExecuteWork
1077 // loop on each woken thread — no per-thread ring pre-staging needed.
1078 ws->cascadeWakeSeed(toWake);
1079 }
1080 }
1081 }
1082}
1083
1084template <bool kPlaced, typename Generator>
1085void ThreadPool::scheduleBulkImpl(size_t count, Generator&& gen) {
1086 if (count == 0) {
1087 return;
1088 }
1089
1090 ssize_t numPool = numThreads_.load(std::memory_order_relaxed);
1091 if (!numPool) {
1092 for (size_t i = 0; i < count; ++i) {
1093 gen(i)();
1094 }
1095 return;
1096 }
1097
1098 // Process in chunks, interleaving enqueue and inline execution based on load.
1099 size_t chunkSize = static_cast<size_t>(numPool) + static_cast<size_t>(numPool) / 2;
1100 size_t i = 0;
1101 while (i < count) {
1102 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
1103 ssize_t loadFactor = poolLoadFactor_.load(std::memory_order_relaxed);
1104 if (curWork > loadFactor) {
1105 gen(i)();
1106 ++i;
1107 } else {
1108 ssize_t room = loadFactor - curWork;
1109 size_t toEnqueue = std::min({count - i, chunkSize, static_cast<size_t>(room)});
1110 if (toEnqueue == 0) {
1111 toEnqueue = 1;
1112 }
1113 size_t base = i;
1114 if (kPlaced) {
1115 workRemaining_.fetch_add(static_cast<ssize_t>(toEnqueue), std::memory_order_release);
1116 for (size_t j = 0; j < toEnqueue; ++j) {
1117 scheduleImplPlaced({gen(base + j)}, nullptr);
1118 }
1119 } else {
1120 scheduleBulkEnqueue(toEnqueue, [&gen, base](size_t j) { return gen(base + j); });
1121 }
1122 i += toEnqueue;
1123 }
1124 }
1125}
1126
1127template <typename Generator>
1128void ThreadPool::scheduleBulk(size_t count, Generator&& gen) {
1129 scheduleBulkImpl<false>(count, std::forward<Generator>(gen));
1130}
1131
1132template <typename Generator>
1133void ThreadPool::scheduleBulkPlaced(size_t count, Generator&& gen) {
1134 scheduleBulkImpl<true>(count, std::forward<Generator>(gen));
1135}
1136
1137} // namespace dispenso
static constexpr size_type capacity() noexcept
Returns the maximum number of elements the buffer can hold.
AwakeRef(AwakeRef &&other) noexcept
Move-construct, transferring the keep-awake reference from other.
AwakeRef(ThreadPool *pool)
Acquire a keep-awake reference on pool, incrementing its keep-awake count.
void reset()
Release the held keep-awake reference, if any.
void setSignalingWake(bool enable, const std::chrono::duration< Rep, Period > &sleepDuration=std::chrono::microseconds(kDefaultSleepLenUs))
DISPENSO_DLL_ACCESS ~ThreadPool()
ssize_t numThreads() const
DISPENSO_DLL_ACCESS void resize(ssize_t n) DISPENSO_NO_THREAD_SAFETY_ANALYSIS
void scheduleBulk(size_t count, Generator &&gen)
DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier=32)
constexpr size_t kCacheLineSize
A constant that defines a safe number of bytes+alignment to avoid false sharing.
Definition platform.h:125
DISPENSO_DLL_ACCESS ThreadPool & globalThreadPool()
DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads)