18enum class ParentCascadeCancel { kOff, kOn };
49#include <dispenso/detail/task_set_impl.h>
53constexpr ssize_t kDefaultStealingMultiplier = 4;
78 ParentCascadeCancel registerForParentCancel,
79 ssize_t stealingLoadMultiplier = kDefaultStealingMultiplier)
80 : TaskSetBase(p, registerForParentCancel, stealingLoadMultiplier),
81 token_(makeToken(p.work_)) {}
88 :
TaskSet(p, ParentCascadeCancel::kOff, stealingLoadMultiplier) {}
106 template <
typename F>
107 DISPENSO_REQUIRES(OnceCallableFunc<F>)
109 if (DISPENSO_EXPECT(
canceled(),
false)) {
112 if (outstandingTaskCount_.load(std::memory_order_relaxed) > taskSetLoadFactor_) {
115 pool_.schedule(token_, packageTask(std::forward<F>(f)));
131 template <
typename F>
132 DISPENSO_REQUIRES(OnceCallableFunc<F>)
134 pool_.schedule(token_, packageTask(std::forward<F>(f)), fq);
149 template <
typename Generator>
151 scheduleBulkImpl(count, std::forward<Generator>(gen), &token_);
162 template <
typename Generator>
164 scheduleBulkImplForceQueue(count, std::forward<Generator>(gen), &token_);
173 DISPENSO_DLL_ACCESS
bool wait();
189 DISPENSO_DLL_ACCESS
bool tryWait(
size_t maxToExecute);
197 TaskSetBase::cancel();
206 return TaskSetBase::canceled();
218 DISPENSO_DLL_ACCESS moodycamel::ProducerToken makeToken(
219 moodycamel::ConcurrentQueue<OnceFunction>& pool);
221 moodycamel::ProducerToken token_;
223 template <
typename Result>
224 friend class detail::FutureBase;
225 template <
typename Result>
226 friend class detail::FutureImplBase;
254 ParentCascadeCancel registerForParentCancel,
255 ssize_t stealingLoadMultiplier = kDefaultStealingMultiplier,
257 : TaskSetBase(pool, registerForParentCancel, stealingLoadMultiplier), cost_(cost) {}
265 :
ConcurrentTaskSet(p, ParentCascadeCancel::kOff, kDefaultStealingMultiplier, cost) {}
272 :
ConcurrentTaskSet(p, ParentCascadeCancel::kOff, stealingLoadMultiplier, cost) {}
297 template <
typename F>
298 DISPENSO_REQUIRES(OnceCallableFunc<F>)
301 bool skipRecheck = false,
302 float poolRecursiveLoadFactor = kDefaultPoolRecursiveLoadFactor) {
303 if (cost_ == TaskCost::kHeavy) {
304 schedulePlaced(std::forward<F>(f), skipRecheck, poolRecursiveLoadFactor);
314 if (outstandingTaskCount_.load(std::memory_order_relaxed) > taskSetLoadFactor_ &&
315 DISPENSO_EXPECT(!
canceled(),
true) && detail::PerPoolPerThreadInfo::canInlineSchedule()) {
316 detail::InlineDepthGuard depthGuard;
321 ssize_t curWork = pool_.workRemaining_.load(std::memory_order_relaxed);
322 ssize_t quickFactor =
323 static_cast<ssize_t
>(
static_cast<float>(pool_.numThreads()) * poolRecursiveLoadFactor);
324 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(&pool_) && curWork > quickFactor) ||
325 curWork > pool_.poolLoadFactor_.load(std::memory_order_relaxed)) {
326 if (!detail::PerPoolPerThreadInfo::canInlineSchedule()) {
327 pool_.schedule(packageTask(std::forward<F>(f)), ForceQueuingTag());
330 detail::InlineDepthGuard depthGuard;
335 pool_.schedule(packageTask(std::forward<F>(f)), ForceQueuingTag());
350 template <
typename F>
351 DISPENSO_REQUIRES(OnceCallableFunc<F>)
353 if (cost_ == TaskCost::kHeavy) {
354 pool_.schedulePlaced(packageTask(std::forward<F>(f)), fq);
357 pool_.schedule(packageTask(std::forward<F>(f)), fq);
372 template <
typename Generator>
374 if (cost_ == TaskCost::kHeavy) {
375 scheduleBulkImplPlaced(count, std::forward<Generator>(gen));
378 scheduleBulkImpl(count, std::forward<Generator>(gen),
nullptr);
389 template <
typename Generator>
391 scheduleBulkImplForceQueue(count, std::forward<Generator>(gen),
nullptr);
398 DISPENSO_DLL_ACCESS
bool wait();
413 DISPENSO_DLL_ACCESS
bool tryWait(
size_t maxToExecute);
422 TaskSetBase::cancel();
431 return TaskSetBase::canceled();
443 bool tryExecuteNext() {
444 return pool_.tryExecuteNext();
447 template <
typename F>
448 DISPENSO_REQUIRES(OnceCallableFunc<F>)
451 bool skipRecheck =
false,
452 float poolRecursiveLoadFactor = kDefaultPoolRecursiveLoadFactor) {
453 ssize_t placedThreshold = std::max(pool_.numThreads() + 1, taskSetLoadFactor_ / 2);
454 if (outstandingTaskCount_.load(std::memory_order_relaxed) > placedThreshold &&
455 DISPENSO_EXPECT(!
canceled(),
true) && detail::PerPoolPerThreadInfo::canInlineSchedule()) {
456 detail::InlineDepthGuard depthGuard;
461 ssize_t curWork = pool_.workRemaining_.load(std::memory_order_relaxed);
462 ssize_t quickFactor =
463 static_cast<ssize_t
>(
static_cast<float>(pool_.numThreads()) * poolRecursiveLoadFactor);
464 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(&pool_) && curWork > quickFactor) ||
465 curWork > pool_.poolLoadFactor_.load(std::memory_order_relaxed)) {
466 if (!detail::PerPoolPerThreadInfo::canInlineSchedule()) {
467 pool_.schedulePlaced(packageTask(std::forward<F>(f)), ForceQueuingTag());
470 detail::InlineDepthGuard depthGuard;
475 pool_.schedulePlaced(packageTask(std::forward<F>(f)), ForceQueuingTag());
478 template <
typename F>
479 DISPENSO_REQUIRES(OnceCallableFunc<F>)
480 void schedulePlaced(F&& f, ForceQueuingTag) {
481 pool_.schedulePlaced(packageTask(std::forward<F>(f)), ForceQueuingTag());
484 template <
typename Result>
485 friend class detail::FutureBase;
486 template <
typename Result>
487 friend class detail::FutureImplBase;
489 friend class detail::LimitGatedScheduler;
void schedule(F &&f, bool skipRecheck=false, float poolRecursiveLoadFactor=kDefaultPoolRecursiveLoadFactor)
DISPENSO_DLL_ACCESS bool wait()
ConcurrentTaskSet(ThreadPool &p, ssize_t stealingLoadMultiplier)
ConcurrentTaskSet(ThreadPool &pool, ParentCascadeCancel registerForParentCancel, ssize_t stealingLoadMultiplier=kDefaultStealingMultiplier, TaskCost cost=TaskCost::kHeavy)
ConcurrentTaskSet(ThreadPool &p, TaskCost cost)
ConcurrentTaskSet(ThreadPool &p, TaskCost cost, ssize_t stealingLoadMultiplier)
void scheduleBulk(size_t count, Generator &&gen, ForceQueuingTag)
void scheduleBulk(size_t count, Generator &&gen)
ConcurrentTaskSet(ThreadPool &p)
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
TaskSet(ThreadPool &p, ssize_t stealingLoadMultiplier)
void scheduleBulk(size_t count, Generator &&gen)
void scheduleBulk(size_t count, Generator &&gen, ForceQueuingTag)
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
TaskSet(ThreadPool &p, ParentCascadeCancel registerForParentCancel, ssize_t stealingLoadMultiplier=kDefaultStealingMultiplier)
DISPENSO_DLL_ACCESS bool wait()
DISPENSO_DLL_ACCESS TaskSetBase * parentTaskSet()