dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
thread_pool.cpp
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#include <dispenso/detail/quanta.h>
10
11#if defined DISPENSO_DEBUG
12#include <iostream>
13#endif // DISPENSO_DEBUG
14
15namespace dispenso {
16
17namespace {
18size_t getAdjustedThreadCount(size_t requested) {
19 static const size_t maxThreads = []() {
20 size_t maxT = std::numeric_limits<size_t>::max();
21
22#if defined(_WIN32) && !defined(__MINGW32__)
23#pragma warning(push)
24#pragma warning(disable : 4996)
25#endif
26
27 char* envThreads = std::getenv("DISPENSO_MAX_THREADS_PER_POOL");
28
29#if defined(_WIN32) && !defined(__MINGW32__)
30#pragma warning(pop)
31#endif
32
33 if (envThreads) {
34 char* end = nullptr;
35 maxT = std::strtoul(envThreads, &end, 10);
36#if defined DISPENSO_DEBUG
37 std::cout << "DISPENSO_MAX_THREADS_PER_POOL = " << maxT << std::endl;
38#endif // DISPENSO_DEBUG
39 }
40 return maxT;
41 }();
42
43 return std::min(requested, maxThreads);
44}
45} // namespace
46
47void ThreadPool::PerThreadData::setThread(std::thread&& t) {
48 thread_ = std::move(t);
49}
50
51void ThreadPool::PerThreadData::stop() {
52 running_.store(false, std::memory_order_release);
53}
54
55uint32_t ThreadPool::wait(uint32_t currentEpoch) {
56 if (sleepLengthUs_ > 0) {
57 return epochWaiter_.waitFor(currentEpoch, sleepLengthUs_.load(std::memory_order_acquire));
58 } else {
59 return epochWaiter_.current();
60 }
61}
62void ThreadPool::wake() {
63 epochWaiter_.bumpAndWake();
64}
65
66inline bool ThreadPool::PerThreadData::running() {
67 return running_.load(std::memory_order_acquire);
68}
69
71 : poolLoadMultiplier_(poolLoadMultiplier),
72 poolLoadFactor_(static_cast<ssize_t>(getAdjustedThreadCount(n) * poolLoadMultiplier)),
73 numThreads_(static_cast<ssize_t>(getAdjustedThreadCount(n))) {
74 detail::registerFineSchedulerQuanta();
75#if defined DISPENSO_DEBUG
77#endif // DISPENSO_DEBUG
78 for (size_t i = 0; i < static_cast<size_t>(numThreads_); ++i) {
79 threads_.emplace_back();
80 threads_.back().setThread(std::thread([this, &back = threads_.back()]() { threadLoop(back); }));
81 }
82}
83
84ThreadPool::PerThreadData::~PerThreadData() {}
85
86void ThreadPool::threadLoop(PerThreadData& data) {
87 constexpr int kBackoffYield = 50;
88 constexpr int kBackoffSleep = kBackoffYield + 5;
89
90 moodycamel::ConsumerToken ctoken(work_);
91 moodycamel::ProducerToken ptoken(work_);
92
93 OnceFunction next;
94
95 int failCount = 0;
96 detail::PerPoolPerThreadInfo::registerPool(this, &ptoken);
97 uint32_t epoch = epochWaiter_.current();
98
99 if (enableEpochWaiter_) {
100 bool idle = true;
101 idleButAwake_.fetch_add(1, std::memory_order_acq_rel);
102
103 while (data.running()) {
104 while (true) {
105 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
106 bool got = work_.try_dequeue(ctoken, next);
107 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
108 if (!got) {
109 break;
110 }
111 queuedWork_.fetch_sub(1, std::memory_order_acq_rel);
112 if (idle) {
113 idle = false;
114 idleButAwake_.fetch_sub(1, std::memory_order_acq_rel);
115 }
116 executeNext(std::move(next));
117 failCount = 0;
118 }
119
120 if (!idle) {
121 idle = true;
122 idleButAwake_.fetch_add(1, std::memory_order_acq_rel);
123 }
124
125 ++failCount;
126
127 detail::cpuRelax();
128 if (failCount > kBackoffSleep) {
129 idleButAwake_.fetch_sub(1, std::memory_order_acq_rel);
130 epoch = wait(epoch);
131 idleButAwake_.fetch_add(1, std::memory_order_acq_rel);
132 } else if (failCount > kBackoffYield) {
133 std::this_thread::yield();
134 }
135 }
136 idleButAwake_.fetch_sub(1, std::memory_order_acq_rel);
137 } else {
138 while (data.running()) {
139 while (true) {
140 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
141 bool got = work_.try_dequeue(ctoken, next);
142 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
143 if (!got) {
144 break;
145 }
146 executeNext(std::move(next));
147 failCount = 0;
148 }
149
150 ++failCount;
151
152 detail::cpuRelax();
153 if (failCount > kBackoffSleep) {
154 epoch = wait(epoch);
155 } else if (failCount > kBackoffYield) {
156 std::this_thread::yield();
157 }
158 }
159 }
160}
161
162void ThreadPool::resizeLocked(ssize_t sn) {
163 sn = getAdjustedThreadCount(sn);
164
165 assert(sn >= 0);
166 size_t n = static_cast<size_t>(sn);
167
168 if (n < threads_.size()) {
169 for (size_t i = n; i < threads_.size(); ++i) {
170 threads_[i].stop();
171 }
172
173 while (threads_.size() > n) {
174 wake();
175 threads_.back().thread_.join();
176 threads_.pop_back();
177 }
178
179 } else if (n > threads_.size()) {
180 for (size_t i = threads_.size(); i < n; ++i) {
181 threads_.emplace_back();
182 threads_.back().setThread(
183 std::thread([this, &back = threads_.back()]() { threadLoop(back); }));
184 }
185 }
186 poolLoadFactor_.store(static_cast<ssize_t>(n * poolLoadMultiplier_), std::memory_order_relaxed);
187 numThreads_.store(sn, std::memory_order_relaxed);
188
189 if (!sn) {
190 // Pool will run future tasks inline since we have no threads, but we still need to empty
191 // current set of tasks
192 while (tryExecuteNext()) {
193 }
194 }
195}
196
198#if defined DISPENSO_DEBUG
199 assert(outstandingTaskSets_.load(std::memory_order_acquire) == 0);
200#endif // DISPENSO_DEBUG
201
202 // Strictly speaking, it is unnecessary to lock this in the destructor; however, it could be a
203 // useful diagnostic to learn that the mutex is already locked when we reach this point.
204 std::unique_lock<std::mutex> lk(threadsMutex_, std::try_to_lock);
205 assert(lk.owns_lock());
206 for (auto& t : threads_) {
207 t.stop();
208 wake();
209 }
210
211 while (tryExecuteNext()) {
212 }
213
214 while (!threads_.empty()) {
215 wake();
216 threads_.back().thread_.join();
217 threads_.pop_back();
218 }
219
220 while (tryExecuteNext()) {
221 }
222}
223ThreadPool& globalThreadPool() {
224 // It should be illegal to access globalThreadPool after exiting main.
225 // We default to hardware threads minus one because the calling thread usually is involved in
226 // computation.
227 static ThreadPool pool(std::thread::hardware_concurrency() - 1);
228 return pool;
229}
230
231void resizeGlobalThreadPool(size_t numThreads) {
232 globalThreadPool().resize(static_cast<ssize_t>(numThreads));
233}
234
235} // namespace dispenso
DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier=32)
DISPENSO_DLL_ACCESS void resize(ssize_t n)
DISPENSO_DLL_ACCESS ~ThreadPool()
detail::OpResult< T > OpResult
Definition pipeline.h:29