19#include <condition_variable>
25#include <moodycamel/concurrentqueue.h>
27#include <dispenso/detail/epoch_waiter.h>
28#include <dispenso/detail/per_thread_info.h>
35#if !defined(DISPENSO_WAKEUP_ENABLE)
36#if defined(_WIN32) || defined(__linux__)
37#define DISPENSO_WAKEUP_ENABLE 1
42#define DISPENSO_WAKEUP_ENABLE 0
46#if !defined(DISPENSO_POLL_PERIOD_US)
48#define DISPENSO_POLL_PERIOD_US 1000
50#if !(DISPENSO_WAKEUP_ENABLE)
51#define DISPENSO_POLL_PERIOD_US 200
53#define DISPENSO_POLL_PERIOD_US (1 << 15)
58constexpr uint32_t kDefaultSleepLenUs = DISPENSO_POLL_PERIOD_US;
60constexpr bool kDefaultWakeupEnable = DISPENSO_WAKEUP_ENABLE;
67struct ForceQueuingTag {};
83 DISPENSO_DLL_ACCESS
ThreadPool(
size_t n,
size_t poolLoadMultiplier = 32);
103 template <
class Rep,
class Period>
106 const std::chrono::duration<Rep, Period>& sleepDuration =
107 std::chrono::microseconds(kDefaultSleepLenUs)) {
110 static_cast<uint32_t
>(
111 std::chrono::duration_cast<std::chrono::microseconds>(sleepDuration).count()));
120 DISPENSO_DLL_ACCESS
void resize(ssize_t n) {
121 std::lock_guard<std::mutex> lk(threadsMutex_);
132 return numThreads_.load(std::memory_order_relaxed);
143 template <
typename F>
144 DISPENSO_REQUIRES(OnceCallableFunc<F>)
145 void schedule(F&& f);
155 template <
typename F>
156 DISPENSO_REQUIRES(OnceCallableFunc<F>)
157 void schedule(F&& f, ForceQueuingTag);
167 class PerThreadData {
169 void setThread(std::thread&& t);
179 std::atomic<bool> running_{
true};
182 DISPENSO_DLL_ACCESS uint32_t wait(uint32_t priorEpoch);
183 DISPENSO_DLL_ACCESS
void wake();
185 void setSignalingWake(
bool enable, uint32_t sleepDurationUs) {
186 std::lock_guard<std::mutex> lk(threadsMutex_);
187 ssize_t currentPoolSize = numThreads();
189 enableEpochWaiter_.store(enable, std::memory_order_release);
190 sleepLengthUs_.store(sleepDurationUs, std::memory_order_release);
191 resizeLocked(currentPoolSize);
194 DISPENSO_DLL_ACCESS
void resizeLocked(ssize_t n);
196 void executeNext(OnceFunction work);
198 DISPENSO_DLL_ACCESS
void threadLoop(PerThreadData& threadData);
200 bool tryExecuteNext();
201 bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
203 template <
typename F>
204 void schedule(moodycamel::ProducerToken& token, F&& f);
206 template <
typename F>
207 void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
209 void conditionallyWake() {
210 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
212 auto queuedWork = queuedWork_.fetch_add(1, std::memory_order_acq_rel) + 1;
213 auto idle = idleButAwake_.load(std::memory_order_acquire);
214 if (idle < queuedWork) {
222#if __cplusplus < 201703L
223 static void*
operator new(
size_t sz) {
224 return detail::alignedMalloc(sz);
226 static void operator delete(
void* ptr) {
227 return detail::alignedFree(ptr);
232 mutable std::mutex threadsMutex_;
233 std::deque<PerThreadData> threads_;
234 size_t poolLoadMultiplier_;
235 std::atomic<ssize_t> poolLoadFactor_;
236 std::atomic<ssize_t> numThreads_;
238 moodycamel::ConcurrentQueue<OnceFunction> work_;
246 alignas(
kCacheLineSize) std::atomic<bool> enableEpochWaiter_{kDefaultWakeupEnable};
247 std::atomic<uint32_t> sleepLengthUs_{kDefaultSleepLenUs};
249#if defined DISPENSO_DEBUG
250 alignas(
kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
253 friend class ConcurrentTaskSet;
254 friend class TaskSet;
255 friend class TaskSetBase;
275DISPENSO_REQUIRES(OnceCallableFunc<F>)
277 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
278 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
279 quickLoadFactor += quickLoadFactor / 2;
280 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(
this) && curWork > quickLoadFactor) ||
281 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
284 schedule(std::forward<F>(f), ForceQueuingTag());
289DISPENSO_REQUIRES(OnceCallableFunc<F>)
292 static_cast<moodycamel::ProducerToken*
>(detail::PerPoolPerThreadInfo::producer(
this))) {
293 schedule(*token, std::forward<F>(f), ForceQueuingTag());
297 if (!numThreads_.load(std::memory_order_relaxed)) {
301 workRemaining_.fetch_add(1, std::memory_order_release);
302 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
303 bool enqueued = work_.enqueue({std::forward<F>(f)});
304 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
312inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f) {
313 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
314 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
315 quickLoadFactor += quickLoadFactor / 2;
316 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(
this) && curWork > quickLoadFactor) ||
317 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
320 schedule(token, std::forward<F>(f), ForceQueuingTag());
325inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag) {
326 if (!numThreads_.load(std::memory_order_relaxed)) {
330 workRemaining_.fetch_add(1, std::memory_order_release);
331 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
332 bool enqueued = work_.enqueue(token, {std::forward<F>(f)});
333 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
340inline bool ThreadPool::tryExecuteNext() {
342 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
343 bool dequeued = work_.try_dequeue(next);
344 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
346 executeNext(std::move(next));
352inline bool ThreadPool::tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token) {
354 if (work_.try_dequeue_from_producer(token, next)) {
355 executeNext(std::move(next));
361inline void ThreadPool::executeNext(OnceFunction next) {
363 workRemaining_.fetch_add(-1, std::memory_order_relaxed);
void setSignalingWake(bool enable, const std::chrono::duration< Rep, Period > &sleepDuration=std::chrono::microseconds(kDefaultSleepLenUs))
DISPENSO_DLL_ACCESS ~ThreadPool()
DISPENSO_DLL_ACCESS void resize(ssize_t n)
ssize_t numThreads() const
DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier=32)
DISPENSO_DLL_ACCESS ThreadPool & globalThreadPool()
DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads)