dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
thread_pool.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
15#pragma once
16
17#include <atomic>
18#include <cassert>
19#include <condition_variable>
20#include <cstdlib>
21#include <deque>
22#include <mutex>
23#include <thread>
24
25#include <moodycamel/concurrentqueue.h>
26
27#include <dispenso/detail/epoch_waiter.h>
28#include <dispenso/detail/per_thread_info.h>
30#include <dispenso/platform.h>
32
33namespace dispenso {
34
35#if !defined(DISPENSO_WAKEUP_ENABLE)
36#if defined(_WIN32) || defined(__linux__)
37#define DISPENSO_WAKEUP_ENABLE 1
38#else
39// TODO(bbudge): For now, only enable Linux and Windows. On Mac still need to figure out how to
40// wake more quickly (see e.g.
41// https://developer.apple.com/library/archive/documentation/Darwin/Conceptual/KernelProgramming/scheduler/scheduler.html)
42#define DISPENSO_WAKEUP_ENABLE 0
43#endif // Linux or Windows
44#endif // DISPENSO_WAKEUP_ENABLE
45
46#if !defined(DISPENSO_POLL_PERIOD_US)
47#if defined(_WIN32)
48#define DISPENSO_POLL_PERIOD_US 1000
49#else
50#if !(DISPENSO_WAKEUP_ENABLE)
51#define DISPENSO_POLL_PERIOD_US 200
52#else
53#define DISPENSO_POLL_PERIOD_US (1 << 15) // Determined empirically good on dual Xeon Linux
54#endif // DISPENSO_WAKEUP_ENABLE
55#endif // PLATFORM
56#endif // DISPENSO_POLL_PERIOD_US
57
58constexpr uint32_t kDefaultSleepLenUs = DISPENSO_POLL_PERIOD_US;
59
60constexpr bool kDefaultWakeupEnable = DISPENSO_WAKEUP_ENABLE;
61
67struct ForceQueuingTag {};
68
74class alignas(kCacheLineSize) ThreadPool {
75 public:
83 DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier = 32);
84
103 template <class Rep, class Period>
105 bool enable,
106 const std::chrono::duration<Rep, Period>& sleepDuration =
107 std::chrono::microseconds(kDefaultSleepLenUs)) {
108 setSignalingWake(
109 enable,
110 static_cast<uint32_t>(
111 std::chrono::duration_cast<std::chrono::microseconds>(sleepDuration).count()));
112 }
113
120 DISPENSO_DLL_ACCESS void resize(ssize_t n) {
121 std::lock_guard<std::mutex> lk(threadsMutex_);
122 resizeLocked(n);
123 }
124
131 ssize_t numThreads() const {
132 return numThreads_.load(std::memory_order_relaxed);
133 }
134
143 template <typename F>
144 DISPENSO_REQUIRES(OnceCallableFunc<F>)
145 void schedule(F&& f);
146
155 template <typename F>
156 DISPENSO_REQUIRES(OnceCallableFunc<F>)
157 void schedule(F&& f, ForceQueuingTag);
158
164 DISPENSO_DLL_ACCESS ~ThreadPool();
165
166 private:
167 class PerThreadData {
168 public:
169 void setThread(std::thread&& t);
170
171 bool running();
172
173 void stop();
174
175 ~PerThreadData();
176
177 public:
178 alignas(kCacheLineSize) std::thread thread_;
179 std::atomic<bool> running_{true};
180 };
181
182 DISPENSO_DLL_ACCESS uint32_t wait(uint32_t priorEpoch);
183 DISPENSO_DLL_ACCESS void wake();
184
185 void setSignalingWake(bool enable, uint32_t sleepDurationUs) {
186 std::lock_guard<std::mutex> lk(threadsMutex_);
187 ssize_t currentPoolSize = numThreads();
188 resizeLocked(0);
189 enableEpochWaiter_.store(enable, std::memory_order_release);
190 sleepLengthUs_.store(sleepDurationUs, std::memory_order_release);
191 resizeLocked(currentPoolSize);
192 }
193
194 DISPENSO_DLL_ACCESS void resizeLocked(ssize_t n);
195
196 void executeNext(OnceFunction work);
197
198 DISPENSO_DLL_ACCESS void threadLoop(PerThreadData& threadData);
199
200 bool tryExecuteNext();
201 bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
202
203 template <typename F>
204 void schedule(moodycamel::ProducerToken& token, F&& f);
205
206 template <typename F>
207 void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
208
209 void conditionallyWake() {
210 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
211 // A rare race to overwake is preferable to a race that underwakes.
212 auto queuedWork = queuedWork_.fetch_add(1, std::memory_order_acq_rel) + 1;
213 auto idle = idleButAwake_.load(std::memory_order_acquire);
214 if (idle < queuedWork) {
215 wake();
216 }
217 }
218 }
219
220 public:
221 // If we are not yet C++17, we provide aligned new/delete to avoid false sharing.
222#if __cplusplus < 201703L
223 static void* operator new(size_t sz) {
224 return detail::alignedMalloc(sz);
225 }
226 static void operator delete(void* ptr) {
227 return detail::alignedFree(ptr);
228 }
229#endif // __cplusplus
230
231 private:
232 mutable std::mutex threadsMutex_;
233 std::deque<PerThreadData> threads_;
234 size_t poolLoadMultiplier_;
235 std::atomic<ssize_t> poolLoadFactor_;
236 std::atomic<ssize_t> numThreads_;
237
238 moodycamel::ConcurrentQueue<OnceFunction> work_;
239
240 alignas(kCacheLineSize) std::atomic<ssize_t> queuedWork_{0};
241 alignas(kCacheLineSize) std::atomic<ssize_t> idleButAwake_{0};
242
243 alignas(kCacheLineSize) std::atomic<ssize_t> workRemaining_{0};
244
245 alignas(kCacheLineSize) detail::EpochWaiter epochWaiter_;
246 alignas(kCacheLineSize) std::atomic<bool> enableEpochWaiter_{kDefaultWakeupEnable};
247 std::atomic<uint32_t> sleepLengthUs_{kDefaultSleepLenUs};
248
249#if defined DISPENSO_DEBUG
250 alignas(kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
251#endif // NDEBUG
252
253 friend class ConcurrentTaskSet;
254 friend class TaskSet;
255 friend class TaskSetBase;
256};
257
263DISPENSO_DLL_ACCESS ThreadPool& globalThreadPool();
264
270DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads);
271
272// ----------------------------- Implementation details -------------------------------------
273
274template <typename F>
275DISPENSO_REQUIRES(OnceCallableFunc<F>)
276inline void ThreadPool::schedule(F&& f) {
277 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
278 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
279 quickLoadFactor += quickLoadFactor / 2;
280 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(this) && curWork > quickLoadFactor) ||
281 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
282 f();
283 } else {
284 schedule(std::forward<F>(f), ForceQueuingTag());
285 }
286}
287
288template <typename F>
289DISPENSO_REQUIRES(OnceCallableFunc<F>)
290inline void ThreadPool::schedule(F&& f, ForceQueuingTag) {
291 if (auto* token =
292 static_cast<moodycamel::ProducerToken*>(detail::PerPoolPerThreadInfo::producer(this))) {
293 schedule(*token, std::forward<F>(f), ForceQueuingTag());
294 return;
295 }
296
297 if (!numThreads_.load(std::memory_order_relaxed)) {
298 f();
299 return;
300 }
301 workRemaining_.fetch_add(1, std::memory_order_release);
302 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
303 bool enqueued = work_.enqueue({std::forward<F>(f)});
304 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
305 (void)(enqueued); // unused
306 assert(enqueued);
307
308 conditionallyWake();
309}
310
311template <typename F>
312inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f) {
313 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
314 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
315 quickLoadFactor += quickLoadFactor / 2;
316 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(this) && curWork > quickLoadFactor) ||
317 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
318 f();
319 } else {
320 schedule(token, std::forward<F>(f), ForceQueuingTag());
321 }
322}
323
324template <typename F>
325inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag) {
326 if (!numThreads_.load(std::memory_order_relaxed)) {
327 f();
328 return;
329 }
330 workRemaining_.fetch_add(1, std::memory_order_release);
331 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
332 bool enqueued = work_.enqueue(token, {std::forward<F>(f)});
333 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
334 (void)(enqueued); // unused
335 assert(enqueued);
336
337 conditionallyWake();
338}
339
340inline bool ThreadPool::tryExecuteNext() {
341 OnceFunction next;
342 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
343 bool dequeued = work_.try_dequeue(next);
344 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
345 if (dequeued) {
346 executeNext(std::move(next));
347 return true;
348 }
349 return false;
350}
351
352inline bool ThreadPool::tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token) {
353 OnceFunction next;
354 if (work_.try_dequeue_from_producer(token, next)) {
355 executeNext(std::move(next));
356 return true;
357 }
358 return false;
359}
360
361inline void ThreadPool::executeNext(OnceFunction next) {
362 next();
363 workRemaining_.fetch_add(-1, std::memory_order_relaxed);
364}
365
366} // namespace dispenso
void setSignalingWake(bool enable, const std::chrono::duration< Rep, Period > &sleepDuration=std::chrono::microseconds(kDefaultSleepLenUs))
DISPENSO_DLL_ACCESS ~ThreadPool()
DISPENSO_DLL_ACCESS void resize(ssize_t n)
ssize_t numThreads() const
DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier=32)
constexpr size_t kCacheLineSize
A constant that defines a safe number of bytes+alignment to avoid false sharing.
Definition platform.h:89
DISPENSO_DLL_ACCESS ThreadPool & globalThreadPool()
DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads)