dispenso 1.5.1
A library for task parallelism
Loading...
Searching...
No Matches
task_set.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
15#pragma once
16
17namespace dispenso {
18enum class ParentCascadeCancel { kOff, kOn };
19}
20
21#include <dispenso/detail/task_set_impl.h>
22
23namespace dispenso {
24
25constexpr ssize_t kDefaultStealingMultiplier = 4;
26
38class TaskSet : public TaskSetBase {
39 public:
49 ThreadPool& p,
50 ParentCascadeCancel registerForParentCancel,
51 ssize_t stealingLoadMultiplier = kDefaultStealingMultiplier)
52 : TaskSetBase(p, registerForParentCancel, stealingLoadMultiplier),
53 token_(makeToken(p.work_)) {}
54
56 TaskSet(ThreadPool& p) : TaskSet(p, ParentCascadeCancel::kOff, kDefaultStealingMultiplier) {}
59 TaskSet(ThreadPool& p, ssize_t stealingLoadMultiplier)
60 : TaskSet(p, ParentCascadeCancel::kOff, stealingLoadMultiplier) {}
61
62 TaskSet(TaskSet&& other) = delete;
63 TaskSet& operator=(TaskSet&& other) = delete;
64
78 template <typename F>
79 DISPENSO_REQUIRES(OnceCallableFunc<F>)
80 void schedule(F&& f) {
81 if (DISPENSO_EXPECT(canceled(), false)) {
82 return;
83 }
84 if (outstandingTaskCount_.load(std::memory_order_relaxed) > taskSetLoadFactor_) {
85 f();
86 } else {
87 pool_.schedule(token_, packageTask(std::forward<F>(f)));
88 }
89 }
90
103 template <typename F>
104 DISPENSO_REQUIRES(OnceCallableFunc<F>)
105 void schedule(F&& f, ForceQueuingTag fq) {
106 pool_.schedule(token_, packageTask(std::forward<F>(f)), fq);
107 }
108
121 template <typename Generator>
122 void scheduleBulk(size_t count, Generator&& gen) {
123 scheduleBulkImpl(count, std::forward<Generator>(gen), &token_);
124 }
125
132 DISPENSO_DLL_ACCESS bool wait();
133
148 DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute);
149
155 void cancel() {
156 TaskSetBase::cancel();
157 }
158
164 bool canceled() const {
165 return TaskSetBase::canceled();
166 }
167
173 wait();
174 }
175
176 private:
177 DISPENSO_DLL_ACCESS moodycamel::ProducerToken makeToken(
178 moodycamel::ConcurrentQueue<OnceFunction>& pool);
179
180 moodycamel::ProducerToken token_;
181
182 template <typename Result>
183 friend class detail::FutureBase;
184};
185
198class ConcurrentTaskSet : public TaskSetBase {
199 public:
209 ThreadPool& pool,
210 ParentCascadeCancel registerForParentCancel,
211 ssize_t stealingLoadMultiplier = kDefaultStealingMultiplier)
212 : TaskSetBase(pool, registerForParentCancel, stealingLoadMultiplier) {}
213
216 : ConcurrentTaskSet(p, ParentCascadeCancel::kOff, kDefaultStealingMultiplier) {}
219 ConcurrentTaskSet(ThreadPool& p, ssize_t stealingLoadMultiplier)
220 : ConcurrentTaskSet(p, ParentCascadeCancel::kOff, stealingLoadMultiplier) {}
221
222 ConcurrentTaskSet(ConcurrentTaskSet&& other) = delete;
223 ConcurrentTaskSet& operator=(ConcurrentTaskSet&& other) = delete;
224
241 template <typename F>
242 DISPENSO_REQUIRES(OnceCallableFunc<F>)
243 void schedule(F&& f, bool skipRecheck = false) {
244 if (outstandingTaskCount_.load(std::memory_order_relaxed) > taskSetLoadFactor_ &&
245 DISPENSO_EXPECT(!canceled(), true)) {
246 f();
247 } else if (skipRecheck) {
248 pool_.schedule(packageTask(std::forward<F>(f)), ForceQueuingTag());
249 } else {
250 pool_.schedule(packageTask(std::forward<F>(f)));
251 }
252 }
253
266 template <typename F>
267 DISPENSO_REQUIRES(OnceCallableFunc<F>)
268 void schedule(F&& f, ForceQueuingTag fq) {
269 pool_.schedule(packageTask(std::forward<F>(f)), fq);
270 }
271
284 template <typename Generator>
285 void scheduleBulk(size_t count, Generator&& gen) {
286 scheduleBulkImpl(count, std::forward<Generator>(gen), nullptr);
287 }
288
293 DISPENSO_DLL_ACCESS bool wait();
294
308 DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute);
309
316 void cancel() {
317 TaskSetBase::cancel();
318 }
319
325 bool canceled() const {
326 return TaskSetBase::canceled();
327 }
328
334 wait();
335 }
336
337 private:
338 bool tryExecuteNext() {
339 return pool_.tryExecuteNext();
340 }
341
342 template <typename Result>
343 friend class detail::FutureBase;
344
345 friend class detail::LimitGatedScheduler;
346};
347
353DISPENSO_DLL_ACCESS TaskSetBase* parentTaskSet();
354
355} // namespace dispenso
void schedule(F &&f, bool skipRecheck=false)
Definition task_set.h:243
DISPENSO_DLL_ACCESS bool wait()
ConcurrentTaskSet(ThreadPool &p, ssize_t stealingLoadMultiplier)
Definition task_set.h:219
ConcurrentTaskSet(ThreadPool &pool, ParentCascadeCancel registerForParentCancel, ssize_t stealingLoadMultiplier=kDefaultStealingMultiplier)
Definition task_set.h:208
void scheduleBulk(size_t count, Generator &&gen)
Definition task_set.h:285
ConcurrentTaskSet(ThreadPool &p)
Definition task_set.h:215
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
TaskSet(ThreadPool &p, ssize_t stealingLoadMultiplier)
Definition task_set.h:59
void scheduleBulk(size_t count, Generator &&gen)
Definition task_set.h:122
void schedule(F &&f)
Definition task_set.h:80
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
bool canceled() const
Definition task_set.h:164
TaskSet(ThreadPool &p, ParentCascadeCancel registerForParentCancel, ssize_t stealingLoadMultiplier=kDefaultStealingMultiplier)
Definition task_set.h:48
TaskSet(ThreadPool &p)
Definition task_set.h:56
DISPENSO_DLL_ACCESS bool wait()
DISPENSO_DLL_ACCESS TaskSetBase * parentTaskSet()