dispenso 1.5.1
A library for task parallelism
Loading...
Searching...
No Matches
thread_pool.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
15#pragma once
16
17#include <atomic>
18#include <cassert>
19#include <condition_variable>
20#include <cstdlib>
21#include <deque>
22#include <iterator>
23#include <mutex>
24#include <thread>
25
26#include <moodycamel/concurrentqueue.h>
27
28#include <dispenso/detail/epoch_waiter.h>
29#include <dispenso/detail/per_thread_info.h>
31#include <dispenso/platform.h>
33
34namespace dispenso {
35
36#if !defined(DISPENSO_WAKEUP_ENABLE)
37#if defined(_WIN32) || defined(__linux__) || defined(__MACH__)
38#define DISPENSO_WAKEUP_ENABLE 1
39#else
40#define DISPENSO_WAKEUP_ENABLE 0
41#endif // platform
42#endif // DISPENSO_WAKEUP_ENABLE
43
44#if !defined(DISPENSO_POLL_PERIOD_US)
45#if defined(_WIN32)
46#define DISPENSO_POLL_PERIOD_US 1000
47#else
48#if !(DISPENSO_WAKEUP_ENABLE)
49#define DISPENSO_POLL_PERIOD_US 200
50#else
51#define DISPENSO_POLL_PERIOD_US (1 << 15) // Determined empirically good on dual Xeon Linux
52#endif // DISPENSO_WAKEUP_ENABLE
53#endif // PLATFORM
54#endif // DISPENSO_POLL_PERIOD_US
55
56constexpr uint32_t kDefaultSleepLenUs = DISPENSO_POLL_PERIOD_US;
57
58constexpr bool kDefaultWakeupEnable = DISPENSO_WAKEUP_ENABLE;
59
65struct ForceQueuingTag {};
66
72class alignas(kCacheLineSize) ThreadPool {
73 public:
81 DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier = 32);
82
101 template <class Rep, class Period>
103 bool enable,
104 const std::chrono::duration<Rep, Period>& sleepDuration =
105 std::chrono::microseconds(kDefaultSleepLenUs)) {
106 setSignalingWake(
107 enable,
108 static_cast<uint32_t>(
109 std::chrono::duration_cast<std::chrono::microseconds>(sleepDuration).count()));
110 }
111
118 DISPENSO_DLL_ACCESS void resize(ssize_t n) {
119 std::lock_guard<std::mutex> lk(threadsMutex_);
120 resizeLocked(n);
121 }
122
129 ssize_t numThreads() const {
130 return numThreads_.load(std::memory_order_relaxed);
131 }
132
141 template <typename F>
142 DISPENSO_REQUIRES(OnceCallableFunc<F>)
143 void schedule(F&& f);
144
153 template <typename F>
154 DISPENSO_REQUIRES(OnceCallableFunc<F>)
155 void schedule(F&& f, ForceQueuingTag);
156
169 template <typename Generator>
170 void scheduleBulk(size_t count, Generator&& gen);
171
177 DISPENSO_DLL_ACCESS ~ThreadPool();
178
179 private:
180 class PerThreadData {
181 public:
182 void setThread(std::thread&& t);
183
184 bool running();
185
186 void stop();
187
188 ~PerThreadData();
189
190 public:
191 alignas(kCacheLineSize) std::thread thread_;
192 std::atomic<bool> running_{true};
193 };
194
195 DISPENSO_DLL_ACCESS uint32_t wait(uint32_t priorEpoch);
196 DISPENSO_DLL_ACCESS void wake();
197 DISPENSO_DLL_ACCESS void wakeN(ssize_t n);
198
199 void setSignalingWake(bool enable, uint32_t sleepDurationUs) {
200 std::lock_guard<std::mutex> lk(threadsMutex_);
201 ssize_t currentPoolSize = numThreads();
202 resizeLocked(0);
203 enableEpochWaiter_.store(enable, std::memory_order_release);
204 sleepLengthUs_.store(sleepDurationUs, std::memory_order_release);
205 resizeLocked(currentPoolSize);
206 }
207
208 DISPENSO_DLL_ACCESS void resizeLocked(ssize_t n);
209
210 void executeNext(OnceFunction work);
211
212 DISPENSO_DLL_ACCESS void threadLoop(PerThreadData& threadData);
213
214 bool tryExecuteNext();
215 bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
216
217 template <typename F>
218 void schedule(moodycamel::ProducerToken& token, F&& f);
219
220 template <typename F>
221 void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
222
223 // Core bulk enqueue: unconditionally stage, enqueue, and wake for a chunk of tasks.
224 // Caller is responsible for load factor checks. Count should be small (e.g. <= 2*numThreads).
225 // When a producer token is provided, uses token-based enqueue for better throughput.
226 template <typename Generator>
227 void
228 scheduleBulkEnqueue(size_t count, Generator&& gen, moodycamel::ProducerToken* token = nullptr);
229
230 // The non-atomic read of two separate atomics (numSleeping_ and workRemaining_) can
231 // race, potentially causing a missed wake in rare cases. This is benign: the epoch
232 // waiter's sleep timeout (sleepLengthUs_) provides a safety net, so a missed wake only
233 // delays wakeup by up to that duration, never causes a hang. Subsequent schedule()
234 // calls will trigger wakes.
235 void conditionallyWake() {
236 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
237 ssize_t sleeping = numSleeping_.load(std::memory_order_acquire);
238 if (sleeping > 0) {
239 // Only wake if there's more pending work than awake threads can handle.
240 // Don't count the caller — it may be a pool thread that will continue
241 // dequeuing its own work, not processing what it just submitted.
242 ssize_t awake = numThreads_.load(std::memory_order_relaxed) - sleeping;
243 ssize_t pending = workRemaining_.load(std::memory_order_relaxed);
244 if (pending > awake) {
245 wake();
246 }
247 }
248 }
249 }
250
251 public:
252 // If we are not yet C++17, we provide aligned new/delete to avoid false sharing.
253#if __cplusplus < 201703L
254 static void* operator new(size_t sz) {
255 return detail::alignedMalloc(sz);
256 }
257 static void operator delete(void* ptr) {
258 return detail::alignedFree(ptr);
259 }
260#endif // __cplusplus
261
262 private:
263 // Number of tasks each thread accumulates before flushing workRemaining_.
264 // This batching reduces atomic contention in threadLoop, but inflates
265 // workRemaining_ by up to kWorkBatchSize * numThreads, so poolLoadFactor_
266 // must be reduced accordingly for accurate load-shedding in schedule().
267 static constexpr int kWorkBatchSize = 8;
268
269 mutable std::mutex threadsMutex_;
270 std::deque<PerThreadData> threads_;
271 size_t poolLoadMultiplier_;
272
273 // These atomics are read frequently in the hot schedule() path, so they need
274 // cache-line alignment to avoid false sharing with the mutex/deque above.
275 alignas(kCacheLineSize) std::atomic<ssize_t> poolLoadFactor_;
276 std::atomic<ssize_t> numThreads_;
277
278 moodycamel::ConcurrentQueue<OnceFunction> work_;
279
280 alignas(kCacheLineSize) std::atomic<ssize_t> numSleeping_{0};
281
282 alignas(kCacheLineSize) std::atomic<ssize_t> workRemaining_{0};
283
284 alignas(kCacheLineSize) detail::EpochWaiter epochWaiter_;
285 alignas(kCacheLineSize) std::atomic<bool> enableEpochWaiter_{kDefaultWakeupEnable};
286 std::atomic<uint32_t> sleepLengthUs_{kDefaultSleepLenUs};
287
288#if defined DISPENSO_DEBUG
289 alignas(kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
290#endif // NDEBUG
291
292 friend class ConcurrentTaskSet;
293 friend class TaskSet;
294 friend class TaskSetBase;
295};
296
302DISPENSO_DLL_ACCESS ThreadPool& globalThreadPool();
303
309DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads);
310
311// ----------------------------- Implementation details -------------------------------------
312
313template <typename F>
314DISPENSO_REQUIRES(OnceCallableFunc<F>)
315inline void ThreadPool::schedule(F&& f) {
316 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
317 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
318 quickLoadFactor += quickLoadFactor / 2;
319 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(this) && curWork > quickLoadFactor) ||
320 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
321 f();
322 } else {
323 schedule(std::forward<F>(f), ForceQueuingTag());
324 }
325}
326
327template <typename F>
328DISPENSO_REQUIRES(OnceCallableFunc<F>)
329inline void ThreadPool::schedule(F&& f, ForceQueuingTag) {
330 if (auto* token =
331 static_cast<moodycamel::ProducerToken*>(detail::PerPoolPerThreadInfo::producer(this))) {
332 schedule(*token, std::forward<F>(f), ForceQueuingTag());
333 return;
334 }
335
336 if (!numThreads_.load(std::memory_order_relaxed)) {
337 f();
338 return;
339 }
340 workRemaining_.fetch_add(1, std::memory_order_release);
341 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
342 bool enqueued = work_.enqueue({std::forward<F>(f)});
343 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
344 (void)(enqueued); // unused
345 assert(enqueued);
346
347 conditionallyWake();
348}
349
350template <typename F>
351inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f) {
352 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
353 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
354 quickLoadFactor += quickLoadFactor / 2;
355 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(this) && curWork > quickLoadFactor) ||
356 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
357 f();
358 } else {
359 schedule(token, std::forward<F>(f), ForceQueuingTag());
360 }
361}
362
363template <typename F>
364inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag) {
365 if (!numThreads_.load(std::memory_order_relaxed)) {
366 f();
367 return;
368 }
369 workRemaining_.fetch_add(1, std::memory_order_release);
370 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
371 bool enqueued = work_.enqueue(token, {std::forward<F>(f)});
372 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
373 (void)(enqueued); // unused
374 assert(enqueued);
375
376 conditionallyWake();
377}
378
379inline bool ThreadPool::tryExecuteNext() {
380 OnceFunction next;
381 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
382 bool dequeued = work_.try_dequeue(next);
383 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
384 if (dequeued) {
385 executeNext(std::move(next));
386 return true;
387 }
388 return false;
389}
390
391inline bool ThreadPool::tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token) {
392 OnceFunction next;
393 if (work_.try_dequeue_from_producer(token, next)) {
394 executeNext(std::move(next));
395 return true;
396 }
397 return false;
398}
399
400inline void ThreadPool::executeNext(OnceFunction next) {
401 next();
402 workRemaining_.fetch_add(-1, std::memory_order_relaxed);
403}
404
405namespace detail {
406// Generating iterator for scheduleBulkEnqueue. Produces OnceFunction objects
407// on-the-fly during enqueue_bulk, avoiding the need for a staging buffer.
408// moodycamel's enqueue_bulk uses single-pass input iterator semantics.
409template <typename Generator>
410struct BulkGenIter {
411 using difference_type = std::ptrdiff_t;
412 using value_type = OnceFunction;
413 using pointer = OnceFunction*;
414 using reference = OnceFunction&;
415 using iterator_category = std::input_iterator_tag;
416
417 Generator* gen;
418 size_t index;
419 OnceFunction operator*() {
420 return (*gen)(index);
421 }
422 BulkGenIter& operator++() {
423 ++index;
424 return *this;
425 }
426 BulkGenIter operator++(int) {
427 BulkGenIter tmp = *this;
428 ++index;
429 return tmp;
430 }
431};
432} // namespace detail
433
434template <typename Generator>
435void ThreadPool::scheduleBulkEnqueue(
436 size_t count,
437 Generator&& gen,
438 moodycamel::ProducerToken* token) {
439 detail::BulkGenIter<typename std::remove_reference<Generator>::type> it{&gen, 0};
440
441 // Single atomic update + bulk enqueue
442 workRemaining_.fetch_add(static_cast<ssize_t>(count), std::memory_order_release);
443
444 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
445 bool enqueued;
446 if (token) {
447 enqueued = work_.enqueue_bulk(*token, it, count);
448 } else {
449 enqueued = work_.enqueue_bulk(it, count);
450 }
451 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
452 (void)(enqueued);
453 assert(enqueued);
454
455 // Wake appropriate threads
456 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
457 ssize_t sleeping = numSleeping_.load(std::memory_order_acquire);
458 ssize_t toWake = std::min(sleeping, static_cast<ssize_t>(count));
459 if (toWake > 0) {
460 wakeN(toWake);
461 }
462 }
463}
464
465template <typename Generator>
466void ThreadPool::scheduleBulk(size_t count, Generator&& gen) {
467 if (count == 0) {
468 return;
469 }
470
471 ssize_t numPool = numThreads_.load(std::memory_order_relaxed);
472 if (!numPool) {
473 // No threads in pool - execute all inline
474 for (size_t i = 0; i < count; ++i) {
475 gen(i)();
476 }
477 return;
478 }
479
480 // Process in chunks, interleaving enqueue and inline execution based on load.
481 size_t chunkSize = static_cast<size_t>(numPool) + static_cast<size_t>(numPool) / 2;
482 if (chunkSize < 1) {
483 chunkSize = 1;
484 }
485 size_t i = 0;
486 while (i < count) {
487 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
488 ssize_t loadFactor = poolLoadFactor_.load(std::memory_order_relaxed);
489 if (curWork > loadFactor) {
490 // Over load factor - execute one task inline, then re-check
491 gen(i)();
492 ++i;
493 } else {
494 // Under load factor - enqueue a chunk
495 ssize_t room = loadFactor - curWork;
496 size_t toEnqueue = std::min({count - i, chunkSize, static_cast<size_t>(room)});
497 if (toEnqueue == 0) {
498 toEnqueue = 1;
499 }
500 size_t base = i;
501 scheduleBulkEnqueue(toEnqueue, [&gen, base](size_t j) { return gen(base + j); });
502 i += toEnqueue;
503 }
504 }
505}
506
507} // namespace dispenso
void setSignalingWake(bool enable, const std::chrono::duration< Rep, Period > &sleepDuration=std::chrono::microseconds(kDefaultSleepLenUs))
DISPENSO_DLL_ACCESS ~ThreadPool()
DISPENSO_DLL_ACCESS void resize(ssize_t n)
ssize_t numThreads() const
DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier=32)
constexpr size_t kCacheLineSize
A constant that defines a safe number of bytes+alignment to avoid false sharing.
Definition platform.h:97
DISPENSO_DLL_ACCESS ThreadPool & globalThreadPool()
DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads)