dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
task_set.cpp
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#include "task_set.h"
9
10#include <cstdio>
11
12namespace dispenso {
13
14namespace detail {
15// 64 depth is pretty ridiculous, but try not to step on anyone's feet.
16constexpr int32_t kMaxTasksStackSize = 64;
17
18DISPENSO_THREAD_LOCAL TaskSetBase* g_taskStack[kMaxTasksStackSize];
19DISPENSO_THREAD_LOCAL int32_t g_taskStackSize = 0;
20
21void pushThreadTaskSet(TaskSetBase* t) {
22#ifndef NDEBUG
23 if (g_taskStackSize < 0 || g_taskStackSize >= kMaxTasksStackSize) {
24 fprintf(stderr, "TaskSet parent stack index is invalid when pushing: %d\n", g_taskStackSize);
25 std::abort();
26 }
27#endif // NDEBUG
28 g_taskStack[g_taskStackSize++] = t;
29}
30void popThreadTaskSet() {
31#ifndef NDEBUG
32 if (g_taskStackSize <= 0) {
33 fprintf(stderr, "TaskSet parent stack index is invalid when popping: %d\n", g_taskStackSize);
34 std::abort();
35 }
36#endif // NDEBUG
37 --g_taskStackSize;
38}
39} // namespace detail
40
41TaskSetBase* parentTaskSet() {
42 using namespace detail;
43
44#ifndef NDEBUG
45 if (g_taskStackSize < 0 || g_taskStackSize >= kMaxTasksStackSize) {
46 fprintf(stderr, "TaskSet parent stack index is invalid when accessing: %d\n", g_taskStackSize);
47 std::abort();
48 }
49#endif // NDEBUG
50
51 return g_taskStackSize ? g_taskStack[g_taskStackSize - 1] : nullptr;
52}
53
54void TaskSetBase::trySetCurrentException() {
55#if defined(__cpp_exceptions)
56 auto status = kUnset;
57 if (guardException_.compare_exchange_strong(status, kSetting, std::memory_order_acq_rel)) {
58 exception_ = std::current_exception();
59 guardException_.store(kSet, std::memory_order_release);
60 canceled_.store(true, std::memory_order_release);
61 }
62#endif // __cpp_exceptions
63}
64
65inline bool TaskSetBase::testAndResetException() {
66#if defined(__cpp_exceptions)
67 if (guardException_.load(std::memory_order_acquire) == kSet) {
68 auto exception = std::move(exception_);
69 guardException_.store(kUnset, std::memory_order_release);
70 std::rethrow_exception(exception);
71 }
72#endif // __cpp_exceptions
73 return canceled_.load(std::memory_order_acquire);
74}
75
77 // Steal work until our set is unblocked. Note that this is not the
78 // fastest possible way to unblock the current set, but it will alleviate
79 // deadlock, and should provide decent throughput for all waiters.
80
81 // The deadlock scenario mentioned goes as follows: N threads in the
82 // ThreadPool. Each thread is running code that is using TaskSets. No
83 // progress could be made without stealing.
84 while (outstandingTaskCount_.load(std::memory_order_acquire)) {
85 if (!pool_.tryExecuteNext()) {
86 std::this_thread::yield();
87 }
88 }
89
90 return testAndResetException();
91}
92
94 while (outstandingTaskCount_.load(std::memory_order_acquire) && maxToExecute--) {
95 if (!pool_.tryExecuteNext()) {
96 break;
97 }
98 }
99
100 // Must check completion prior to checking exceptions, otherwise there could be a case where
101 // exceptions are checked, then an exception is propagated, and then we return whether all items
102 // have been completed, thus dropping the exception.
103 if (outstandingTaskCount_.load(std::memory_order_acquire)) {
104 return false;
105 }
106
107 return !testAndResetException();
108}
109
110moodycamel::ProducerToken TaskSet::makeToken(moodycamel::ConcurrentQueue<OnceFunction>& pool) {
111 return moodycamel::ProducerToken(pool);
112}
113
115 // Steal work until our set is unblocked.
116 // The deadlock scenario mentioned goes as follows: N threads in the
117 // ThreadPool. Each thread is running code that is using TaskSets. No
118 // progress could be made without stealing.
119 while (pool_.tryExecuteNextFromProducerToken(token_)) {
120 }
121
122 while (outstandingTaskCount_.load(std::memory_order_acquire)) {
123 if (!pool_.tryExecuteNext()) {
124 std::this_thread::yield();
125 }
126 }
127
128 return testAndResetException();
129}
130
132 ssize_t maxToExe = static_cast<ssize_t>(maxToExecute);
133 while (outstandingTaskCount_.load(std::memory_order_acquire) && maxToExe--) {
134 if (!pool_.tryExecuteNextFromProducerToken(token_)) {
135 break;
136 }
137 }
138
139 // Must check completion prior to checking exceptions, otherwise there could be a case where
140 // exceptions are checked, then an exception is propagated, and then we return whether all items
141 // have been completed, thus dropping the exception.
142
143 maxToExe = std::max<ssize_t>(0, maxToExe);
144
145 while (outstandingTaskCount_.load(std::memory_order_acquire) && maxToExe--) {
146 if (!pool_.tryExecuteNext()) {
147 std::this_thread::yield();
148 }
149 }
150
151 if (outstandingTaskCount_.load(std::memory_order_acquire)) {
152 return false;
153 }
154
155 return !testAndResetException();
156}
157
158} // namespace dispenso
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
Definition task_set.cpp:93
DISPENSO_DLL_ACCESS bool wait()
Definition task_set.cpp:76
DISPENSO_DLL_ACCESS bool tryWait(size_t maxToExecute)
Definition task_set.cpp:131
DISPENSO_DLL_ACCESS bool wait()
Definition task_set.cpp:114
detail::OpResult< T > OpResult
Definition pipeline.h:29