dispenso 1.6.0
A library for task parallelism
Loading...
Searching...
No Matches
task_set.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
15#pragma once
16
17namespace dispenso {
18enum class ParentCascadeCancel { kOff, kOn };
19
41// Implementation note: kHeavy routes through the pool's per-group steal rings
42// (schedulePlaced / scheduleBulkPlaced), which avoids consumer-side contention
43// on the central moodycamel queue at high core counts. kLightweight routes
44// through the central queue, which has lower per-task overhead but does not
45// scale past ~100 contending consumers.
46enum class TaskCost { kLightweight, kHeavy };
47} // namespace dispenso
48
49#include <dispenso/detail/task_set_impl.h>
50
51namespace dispenso {
52
53constexpr ssize_t kDefaultStealingMultiplier = 4;
54
66class TaskSet : public TaskSetBase {
67 public:
77 ThreadPool& p,
78 ParentCascadeCancel registerForParentCancel,
79 ssize_t stealingLoadMultiplier = kDefaultStealingMultiplier)
80 : TaskSetBase(p, registerForParentCancel, stealingLoadMultiplier),
81 token_(makeToken(p.work_)) {}
82
84 TaskSet(ThreadPool& p) : TaskSet(p, ParentCascadeCancel::kOff, kDefaultStealingMultiplier) {}
87 TaskSet(ThreadPool& p, ssize_t stealingLoadMultiplier)
88 : TaskSet(p, ParentCascadeCancel::kOff, stealingLoadMultiplier) {}
89
90 TaskSet(TaskSet&& other) = delete;
91 TaskSet& operator=(TaskSet&& other) = delete;
92
106 template <typename F>
107 DISPENSO_REQUIRES(OnceCallableFunc<F>)
108 void schedule(F&& f) {
109 if (DISPENSO_EXPECT(canceled(), false)) {
110 return;
111 }
112 if (outstandingTaskCount_.load(std::memory_order_relaxed) > taskSetLoadFactor_) {
113 f();
114 } else {
115 pool_.schedule(token_, packageTask(std::forward<F>(f)));
116 }
117 }
118
131 template <typename F>
132 DISPENSO_REQUIRES(OnceCallableFunc<F>)
133 void schedule(F&& f, ForceQueuingTag fq) {
134 pool_.schedule(token_, packageTask(std::forward<F>(f)), fq);
135 }
136
149 template <typename Generator>
150 void scheduleBulk(size_t count, Generator&& gen) {
151 scheduleBulkImpl(count, std::forward<Generator>(gen), &token_);
152 }
153
162 template <typename Generator>
163 void scheduleBulk(size_t count, Generator&& gen, ForceQueuingTag) {
164 scheduleBulkImplForceQueue(count, std::forward<Generator>(gen), &token_);
165 }
166
173 DISPENSO_DLL_ACCESS bool wait();
174
189 DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute);
190
196 void cancel() {
197 TaskSetBase::cancel();
198 }
199
205 bool canceled() const {
206 return TaskSetBase::canceled();
207 }
208
214 wait();
215 }
216
217 private:
218 DISPENSO_DLL_ACCESS moodycamel::ProducerToken makeToken(
219 moodycamel::ConcurrentQueue<OnceFunction>& pool);
220
221 moodycamel::ProducerToken token_;
222
223 template <typename Result>
224 friend class detail::FutureBase;
225 template <typename Result>
226 friend class detail::FutureImplBase;
227};
228
241class ConcurrentTaskSet : public TaskSetBase {
242 public:
253 ThreadPool& pool,
254 ParentCascadeCancel registerForParentCancel,
255 ssize_t stealingLoadMultiplier = kDefaultStealingMultiplier,
256 TaskCost cost = TaskCost::kHeavy)
257 : TaskSetBase(pool, registerForParentCancel, stealingLoadMultiplier), cost_(cost) {}
258
261 : ConcurrentTaskSet(p, ParentCascadeCancel::kOff, kDefaultStealingMultiplier) {}
265 : ConcurrentTaskSet(p, ParentCascadeCancel::kOff, kDefaultStealingMultiplier, cost) {}
268 ConcurrentTaskSet(ThreadPool& p, ssize_t stealingLoadMultiplier)
269 : ConcurrentTaskSet(p, ParentCascadeCancel::kOff, stealingLoadMultiplier) {}
271 ConcurrentTaskSet(ThreadPool& p, TaskCost cost, ssize_t stealingLoadMultiplier)
272 : ConcurrentTaskSet(p, ParentCascadeCancel::kOff, stealingLoadMultiplier, cost) {}
273
274 ConcurrentTaskSet(ConcurrentTaskSet&& other) = delete;
275 ConcurrentTaskSet& operator=(ConcurrentTaskSet&& other) = delete;
276
297 template <typename F>
298 DISPENSO_REQUIRES(OnceCallableFunc<F>)
300 F&& f,
301 bool skipRecheck = false,
302 float poolRecursiveLoadFactor = kDefaultPoolRecursiveLoadFactor) {
303 if (cost_ == TaskCost::kHeavy) {
304 schedulePlaced(std::forward<F>(f), skipRecheck, poolRecursiveLoadFactor);
305 return;
306 }
307 // Combined inline decision (mirrors scheduleBulkImpl):
308 // 1. TaskSet-level overload
309 // 2. Pool-recursive: pool threads inline when workRemaining_ exceeds
310 // numThreads * poolRecursiveLoadFactor
311 // 3. Pool global: non-recursive callers inline at the loose poolLoadFactor_
312 // After this check, use ForceQueuingTag to skip the redundant check
313 // in ThreadPool::schedule.
314 if (outstandingTaskCount_.load(std::memory_order_relaxed) > taskSetLoadFactor_ &&
315 DISPENSO_EXPECT(!canceled(), true) && detail::PerPoolPerThreadInfo::canInlineSchedule()) {
316 detail::InlineDepthGuard depthGuard;
317 f();
318 return;
319 }
320 if (!skipRecheck) {
321 ssize_t curWork = pool_.workRemaining_.load(std::memory_order_relaxed);
322 ssize_t quickFactor =
323 static_cast<ssize_t>(static_cast<float>(pool_.numThreads()) * poolRecursiveLoadFactor);
324 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(&pool_) && curWork > quickFactor) ||
325 curWork > pool_.poolLoadFactor_.load(std::memory_order_relaxed)) {
326 if (!detail::PerPoolPerThreadInfo::canInlineSchedule()) {
327 pool_.schedule(packageTask(std::forward<F>(f)), ForceQueuingTag());
328 return;
329 }
330 detail::InlineDepthGuard depthGuard;
331 f();
332 return;
333 }
334 }
335 pool_.schedule(packageTask(std::forward<F>(f)), ForceQueuingTag());
336 }
337
350 template <typename F>
351 DISPENSO_REQUIRES(OnceCallableFunc<F>)
352 void schedule(F&& f, ForceQueuingTag fq) {
353 if (cost_ == TaskCost::kHeavy) {
354 pool_.schedulePlaced(packageTask(std::forward<F>(f)), fq);
355 return;
356 }
357 pool_.schedule(packageTask(std::forward<F>(f)), fq);
358 }
359
372 template <typename Generator>
373 void scheduleBulk(size_t count, Generator&& gen) {
374 if (cost_ == TaskCost::kHeavy) {
375 scheduleBulkImplPlaced(count, std::forward<Generator>(gen));
376 return;
377 }
378 scheduleBulkImpl(count, std::forward<Generator>(gen), nullptr);
379 }
380
389 template <typename Generator>
390 void scheduleBulk(size_t count, Generator&& gen, ForceQueuingTag) {
391 scheduleBulkImplForceQueue(count, std::forward<Generator>(gen), nullptr);
392 }
393
398 DISPENSO_DLL_ACCESS bool wait();
399
413 DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute);
414
421 void cancel() {
422 TaskSetBase::cancel();
423 }
424
430 bool canceled() const {
431 return TaskSetBase::canceled();
432 }
433
439 wait();
440 }
441
442 private:
443 bool tryExecuteNext() {
444 return pool_.tryExecuteNext();
445 }
446
447 template <typename F>
448 DISPENSO_REQUIRES(OnceCallableFunc<F>)
449 void schedulePlaced(
450 F&& f,
451 bool skipRecheck = false,
452 float poolRecursiveLoadFactor = kDefaultPoolRecursiveLoadFactor) {
453 ssize_t placedThreshold = std::max(pool_.numThreads() + 1, taskSetLoadFactor_ / 2);
454 if (outstandingTaskCount_.load(std::memory_order_relaxed) > placedThreshold &&
455 DISPENSO_EXPECT(!canceled(), true) && detail::PerPoolPerThreadInfo::canInlineSchedule()) {
456 detail::InlineDepthGuard depthGuard;
457 f();
458 return;
459 }
460 if (!skipRecheck) {
461 ssize_t curWork = pool_.workRemaining_.load(std::memory_order_relaxed);
462 ssize_t quickFactor =
463 static_cast<ssize_t>(static_cast<float>(pool_.numThreads()) * poolRecursiveLoadFactor);
464 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(&pool_) && curWork > quickFactor) ||
465 curWork > pool_.poolLoadFactor_.load(std::memory_order_relaxed)) {
466 if (!detail::PerPoolPerThreadInfo::canInlineSchedule()) {
467 pool_.schedulePlaced(packageTask(std::forward<F>(f)), ForceQueuingTag());
468 return;
469 }
470 detail::InlineDepthGuard depthGuard;
471 f();
472 return;
473 }
474 }
475 pool_.schedulePlaced(packageTask(std::forward<F>(f)), ForceQueuingTag());
476 }
477
478 template <typename F>
479 DISPENSO_REQUIRES(OnceCallableFunc<F>)
480 void schedulePlaced(F&& f, ForceQueuingTag) {
481 pool_.schedulePlaced(packageTask(std::forward<F>(f)), ForceQueuingTag());
482 }
483
484 template <typename Result>
485 friend class detail::FutureBase;
486 template <typename Result>
487 friend class detail::FutureImplBase;
488
489 friend class detail::LimitGatedScheduler;
490
491 TaskCost cost_{TaskCost::kHeavy};
492};
493
499DISPENSO_DLL_ACCESS TaskSetBase* parentTaskSet();
500
501} // namespace dispenso
void schedule(F &&f, bool skipRecheck=false, float poolRecursiveLoadFactor=kDefaultPoolRecursiveLoadFactor)
Definition task_set.h:299
DISPENSO_DLL_ACCESS bool wait()
ConcurrentTaskSet(ThreadPool &p, ssize_t stealingLoadMultiplier)
Definition task_set.h:268
ConcurrentTaskSet(ThreadPool &pool, ParentCascadeCancel registerForParentCancel, ssize_t stealingLoadMultiplier=kDefaultStealingMultiplier, TaskCost cost=TaskCost::kHeavy)
Definition task_set.h:252
ConcurrentTaskSet(ThreadPool &p, TaskCost cost)
Definition task_set.h:264
ConcurrentTaskSet(ThreadPool &p, TaskCost cost, ssize_t stealingLoadMultiplier)
Definition task_set.h:271
void scheduleBulk(size_t count, Generator &&gen, ForceQueuingTag)
Definition task_set.h:390
void scheduleBulk(size_t count, Generator &&gen)
Definition task_set.h:373
ConcurrentTaskSet(ThreadPool &p)
Definition task_set.h:260
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
TaskSet(ThreadPool &p, ssize_t stealingLoadMultiplier)
Definition task_set.h:87
void scheduleBulk(size_t count, Generator &&gen)
Definition task_set.h:150
void schedule(F &&f)
Definition task_set.h:108
void scheduleBulk(size_t count, Generator &&gen, ForceQueuingTag)
Definition task_set.h:163
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
bool canceled() const
Definition task_set.h:205
TaskSet(ThreadPool &p, ParentCascadeCancel registerForParentCancel, ssize_t stealingLoadMultiplier=kDefaultStealingMultiplier)
Definition task_set.h:76
TaskSet(ThreadPool &p)
Definition task_set.h:84
DISPENSO_DLL_ACCESS bool wait()
DISPENSO_DLL_ACCESS TaskSetBase * parentTaskSet()