dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
thread_pool.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <atomic>
17#include <cassert>
18#include <condition_variable>
19#include <cstdlib>
20#include <deque>
21#include <mutex>
22#include <thread>
23
24#include <concurrentqueue.h>
25
26#include <dispenso/detail/epoch_waiter.h>
27#include <dispenso/detail/per_thread_info.h>
29#include <dispenso/platform.h>
31
32namespace dispenso {
33
34#if !defined(DISPENSO_WAKEUP_ENABLE)
35#if defined(_WIN32) || defined(__linux__)
36#define DISPENSO_WAKEUP_ENABLE 1
37#else
38// TODO(bbudge): For now, only enable Linux and Windows. On Mac still need to figure out how to
39// wake more quickly (see e.g.
40// https://developer.apple.com/library/archive/documentation/Darwin/Conceptual/KernelProgramming/scheduler/scheduler.html)
41#define DISPENSO_WAKEUP_ENABLE 0
42#endif // Linux or Windows
43#endif // DISPENSO_WAKEUP_ENABLE
44
45#if !defined(DISPENSO_POLL_PERIOD_US)
46#if defined(_WIN32)
47#define DISPENSO_POLL_PERIOD_US 1000
48#else
49#if !(DISPENSO_WAKEUP_ENABLE)
50#define DISPENSO_POLL_PERIOD_US 200
51#else
52#define DISPENSO_POLL_PERIOD_US (1 << 15) // Determined empirically good on dual Xeon Linux
53#endif // DISPENSO_WAKEUP_ENABLE
54#endif // PLATFORM
55#endif // DISPENSO_POLL_PERIOD_US
56
57constexpr uint32_t kDefaultSleepLenUs = DISPENSO_POLL_PERIOD_US;
58
59constexpr bool kDefaultWakeupEnable = DISPENSO_WAKEUP_ENABLE;
60
67
74 public:
82 DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier = 32);
83
102 template <class Rep, class Period>
104 bool enable,
105 const std::chrono::duration<Rep, Period>& sleepDuration =
106 std::chrono::microseconds(kDefaultSleepLenUs)) {
107 setSignalingWake(
108 enable,
109 static_cast<uint32_t>(
110 std::chrono::duration_cast<std::chrono::microseconds>(sleepDuration).count()));
111 }
112
119 DISPENSO_DLL_ACCESS void resize(ssize_t n) {
120 std::lock_guard<std::mutex> lk(threadsMutex_);
121 resizeLocked(n);
122 }
123
130 ssize_t numThreads() const {
131 return numThreads_.load(std::memory_order_relaxed);
132 }
133
142 template <typename F>
143 void schedule(F&& f);
144
153 template <typename F>
154 void schedule(F&& f, ForceQueuingTag);
155
161 DISPENSO_DLL_ACCESS ~ThreadPool();
162
163 private:
164 class PerThreadData {
165 public:
166 void setThread(std::thread&& t);
167
168 bool running();
169
170 void stop();
171
172 ~PerThreadData();
173
174 public:
175 alignas(kCacheLineSize) std::thread thread_;
176 std::atomic<bool> running_{true};
177 };
178
179 DISPENSO_DLL_ACCESS uint32_t wait(uint32_t priorEpoch);
180 DISPENSO_DLL_ACCESS void wake();
181
182 void setSignalingWake(bool enable, uint32_t sleepDurationUs) {
183 std::lock_guard<std::mutex> lk(threadsMutex_);
184 ssize_t currentPoolSize = numThreads();
185 resizeLocked(0);
186 enableEpochWaiter_.store(enable, std::memory_order_release);
187 sleepLengthUs_.store(sleepDurationUs, std::memory_order_release);
188 resizeLocked(currentPoolSize);
189 }
190
191 DISPENSO_DLL_ACCESS void resizeLocked(ssize_t n);
192
193 void executeNext(OnceFunction work);
194
195 DISPENSO_DLL_ACCESS void threadLoop(PerThreadData& threadData);
196
197 bool tryExecuteNext();
198 bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
199
200 template <typename F>
201 void schedule(moodycamel::ProducerToken& token, F&& f);
202
203 template <typename F>
204 void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
205
206 void conditionallyWake() {
207 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
208 // A rare race to overwake is preferable to a race that underwakes.
209 auto queuedWork = queuedWork_.fetch_add(1, std::memory_order_acq_rel) + 1;
210 auto idle = idleButAwake_.load(std::memory_order_acquire);
211 if (idle < queuedWork) {
212 wake();
213 }
214 }
215 }
216
217 public:
218 // If we are not yet C++17, we provide aligned new/delete to avoid false sharing.
219#if __cplusplus < 201703L
220 static void* operator new(size_t sz) {
221 return detail::alignedMalloc(sz);
222 }
223 static void operator delete(void* ptr) {
224 return detail::alignedFree(ptr);
225 }
226#endif // __cplusplus
227
228 private:
229 mutable std::mutex threadsMutex_;
230 std::deque<PerThreadData> threads_;
231 size_t poolLoadMultiplier_;
232 std::atomic<ssize_t> poolLoadFactor_;
233 std::atomic<ssize_t> numThreads_;
234
235 moodycamel::ConcurrentQueue<OnceFunction> work_;
236
237 alignas(kCacheLineSize) std::atomic<ssize_t> queuedWork_{0};
238 alignas(kCacheLineSize) std::atomic<ssize_t> idleButAwake_{0};
239
240 alignas(kCacheLineSize) std::atomic<ssize_t> workRemaining_{0};
241
242 alignas(kCacheLineSize) detail::EpochWaiter epochWaiter_;
243 alignas(kCacheLineSize) std::atomic<bool> enableEpochWaiter_{kDefaultWakeupEnable};
244 std::atomic<uint32_t> sleepLengthUs_{kDefaultSleepLenUs};
245
246#if defined DISPENSO_DEBUG
247 alignas(kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
248#endif // NDEBUG
249
250 friend class ConcurrentTaskSet;
251 friend class TaskSet;
252 friend class TaskSetBase;
253};
254
260DISPENSO_DLL_ACCESS ThreadPool& globalThreadPool();
261
267DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads);
268
269// ----------------------------- Implementation details -------------------------------------
270
271template <typename F>
272inline void ThreadPool::schedule(F&& f) {
273 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
274 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
276 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(this) && curWork > quickLoadFactor) ||
277 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
278 f();
279 } else {
280 schedule(std::forward<F>(f), ForceQueuingTag());
281 }
282}
283
284template <typename F>
286 if (auto* token =
287 static_cast<moodycamel::ProducerToken*>(detail::PerPoolPerThreadInfo::producer(this))) {
288 schedule(*token, std::forward<F>(f), ForceQueuingTag());
289 return;
290 }
291
292 if (!numThreads_.load(std::memory_order_relaxed)) {
293 f();
294 return;
295 }
296 workRemaining_.fetch_add(1, std::memory_order_release);
297 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
298 bool enqueued = work_.enqueue({std::forward<F>(f)});
299 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
300 (void)(enqueued); // unused
302
303 conditionallyWake();
304}
305
306template <typename F>
307inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f) {
308 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
309 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
311 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(this) && curWork > quickLoadFactor) ||
312 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
313 f();
314 } else {
315 schedule(token, std::forward<F>(f), ForceQueuingTag());
316 }
317}
318
319template <typename F>
320inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag) {
321 if (!numThreads_.load(std::memory_order_relaxed)) {
322 f();
323 return;
324 }
325 workRemaining_.fetch_add(1, std::memory_order_release);
326 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
327 bool enqueued = work_.enqueue(token, {std::forward<F>(f)});
328 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
329 (void)(enqueued); // unused
330 assert(enqueued);
331
332 conditionallyWake();
333}
334
335inline bool ThreadPool::tryExecuteNext() {
336 OnceFunction next;
337 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
338 bool dequeued = work_.try_dequeue(next);
339 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
340 if (dequeued) {
341 executeNext(std::move(next));
342 return true;
343 }
344 return false;
345}
346
347inline bool ThreadPool::tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token) {
348 OnceFunction next;
349 if (work_.try_dequeue_from_producer(token, next)) {
350 executeNext(std::move(next));
351 return true;
352 }
353 return false;
354}
355
356inline void ThreadPool::executeNext(OnceFunction next) {
357 next();
358 workRemaining_.fetch_add(-1, std::memory_order_relaxed);
359}
360
361} // namespace dispenso
void setSignalingWake(bool enable, const std::chrono::duration< Rep, Period > &sleepDuration=std::chrono::microseconds(kDefaultSleepLenUs))
DISPENSO_DLL_ACCESS void resize(ssize_t n)
ssize_t numThreads() const
detail::OpResult< T > OpResult
Definition pipeline.h:29