19#include <condition_variable>
26#include <moodycamel/concurrentqueue.h>
28#include <dispenso/detail/epoch_waiter.h>
29#include <dispenso/detail/per_thread_info.h>
36#if !defined(DISPENSO_WAKEUP_ENABLE)
37#if defined(_WIN32) || defined(__linux__) || defined(__MACH__)
38#define DISPENSO_WAKEUP_ENABLE 1
40#define DISPENSO_WAKEUP_ENABLE 0
44#if !defined(DISPENSO_POLL_PERIOD_US)
46#define DISPENSO_POLL_PERIOD_US 1000
48#if !(DISPENSO_WAKEUP_ENABLE)
49#define DISPENSO_POLL_PERIOD_US 200
51#define DISPENSO_POLL_PERIOD_US (1 << 15)
56constexpr uint32_t kDefaultSleepLenUs = DISPENSO_POLL_PERIOD_US;
58constexpr bool kDefaultWakeupEnable = DISPENSO_WAKEUP_ENABLE;
65struct ForceQueuingTag {};
81 DISPENSO_DLL_ACCESS
ThreadPool(
size_t n,
size_t poolLoadMultiplier = 32);
101 template <
class Rep,
class Period>
104 const std::chrono::duration<Rep, Period>& sleepDuration =
105 std::chrono::microseconds(kDefaultSleepLenUs)) {
108 static_cast<uint32_t
>(
109 std::chrono::duration_cast<std::chrono::microseconds>(sleepDuration).count()));
118 DISPENSO_DLL_ACCESS
void resize(ssize_t n) {
119 std::lock_guard<std::mutex> lk(threadsMutex_);
130 return numThreads_.load(std::memory_order_relaxed);
141 template <
typename F>
142 DISPENSO_REQUIRES(OnceCallableFunc<F>)
143 void schedule(F&& f);
153 template <
typename F>
154 DISPENSO_REQUIRES(OnceCallableFunc<F>)
155 void schedule(F&& f, ForceQueuingTag);
169 template <
typename Generator>
170 void scheduleBulk(
size_t count, Generator&& gen);
180 class PerThreadData {
182 void setThread(std::thread&& t);
192 std::atomic<bool> running_{
true};
195 DISPENSO_DLL_ACCESS uint32_t wait(uint32_t priorEpoch);
196 DISPENSO_DLL_ACCESS
void wake();
197 DISPENSO_DLL_ACCESS
void wakeN(ssize_t n);
199 void setSignalingWake(
bool enable, uint32_t sleepDurationUs) {
200 std::lock_guard<std::mutex> lk(threadsMutex_);
201 ssize_t currentPoolSize = numThreads();
203 enableEpochWaiter_.store(enable, std::memory_order_release);
204 sleepLengthUs_.store(sleepDurationUs, std::memory_order_release);
205 resizeLocked(currentPoolSize);
208 DISPENSO_DLL_ACCESS
void resizeLocked(ssize_t n);
210 void executeNext(OnceFunction work);
212 DISPENSO_DLL_ACCESS
void threadLoop(PerThreadData& threadData);
214 bool tryExecuteNext();
215 bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
217 template <
typename F>
218 void schedule(moodycamel::ProducerToken& token, F&& f);
220 template <
typename F>
221 void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
226 template <
typename Generator>
228 scheduleBulkEnqueue(
size_t count, Generator&& gen, moodycamel::ProducerToken* token =
nullptr);
235 void conditionallyWake() {
236 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
237 ssize_t sleeping = numSleeping_.load(std::memory_order_acquire);
242 ssize_t awake = numThreads_.load(std::memory_order_relaxed) - sleeping;
243 ssize_t pending = workRemaining_.load(std::memory_order_relaxed);
244 if (pending > awake) {
253#if __cplusplus < 201703L
254 static void*
operator new(
size_t sz) {
255 return detail::alignedMalloc(sz);
257 static void operator delete(
void* ptr) {
258 return detail::alignedFree(ptr);
267 static constexpr int kWorkBatchSize = 8;
269 mutable std::mutex threadsMutex_;
270 std::deque<PerThreadData> threads_;
271 size_t poolLoadMultiplier_;
276 std::atomic<ssize_t> numThreads_;
278 moodycamel::ConcurrentQueue<OnceFunction> work_;
285 alignas(
kCacheLineSize) std::atomic<bool> enableEpochWaiter_{kDefaultWakeupEnable};
286 std::atomic<uint32_t> sleepLengthUs_{kDefaultSleepLenUs};
288#if defined DISPENSO_DEBUG
289 alignas(
kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
292 friend class ConcurrentTaskSet;
293 friend class TaskSet;
294 friend class TaskSetBase;
314DISPENSO_REQUIRES(OnceCallableFunc<F>)
316 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
317 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
318 quickLoadFactor += quickLoadFactor / 2;
319 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(
this) && curWork > quickLoadFactor) ||
320 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
323 schedule(std::forward<F>(f), ForceQueuingTag());
328DISPENSO_REQUIRES(OnceCallableFunc<F>)
331 static_cast<moodycamel::ProducerToken*
>(detail::PerPoolPerThreadInfo::producer(
this))) {
332 schedule(*token, std::forward<F>(f), ForceQueuingTag());
336 if (!numThreads_.load(std::memory_order_relaxed)) {
340 workRemaining_.fetch_add(1, std::memory_order_release);
341 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
342 bool enqueued = work_.enqueue({std::forward<F>(f)});
343 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
351inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f) {
352 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
353 ssize_t quickLoadFactor = numThreads_.load(std::memory_order_relaxed);
354 quickLoadFactor += quickLoadFactor / 2;
355 if ((detail::PerPoolPerThreadInfo::isPoolRecursive(
this) && curWork > quickLoadFactor) ||
356 (curWork > poolLoadFactor_.load(std::memory_order_relaxed))) {
359 schedule(token, std::forward<F>(f), ForceQueuingTag());
364inline void ThreadPool::schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag) {
365 if (!numThreads_.load(std::memory_order_relaxed)) {
369 workRemaining_.fetch_add(1, std::memory_order_release);
370 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
371 bool enqueued = work_.enqueue(token, {std::forward<F>(f)});
372 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
379inline bool ThreadPool::tryExecuteNext() {
381 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
382 bool dequeued = work_.try_dequeue(next);
383 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
385 executeNext(std::move(next));
391inline bool ThreadPool::tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token) {
393 if (work_.try_dequeue_from_producer(token, next)) {
394 executeNext(std::move(next));
400inline void ThreadPool::executeNext(OnceFunction next) {
402 workRemaining_.fetch_add(-1, std::memory_order_relaxed);
409template <
typename Generator>
411 using difference_type = std::ptrdiff_t;
412 using value_type = OnceFunction;
413 using pointer = OnceFunction*;
414 using reference = OnceFunction&;
415 using iterator_category = std::input_iterator_tag;
419 OnceFunction operator*() {
420 return (*gen)(index);
422 BulkGenIter& operator++() {
426 BulkGenIter operator++(
int) {
427 BulkGenIter tmp = *
this;
434template <
typename Generator>
435void ThreadPool::scheduleBulkEnqueue(
438 moodycamel::ProducerToken* token) {
439 detail::BulkGenIter<typename std::remove_reference<Generator>::type> it{&gen, 0};
442 workRemaining_.fetch_add(
static_cast<ssize_t
>(count), std::memory_order_release);
444 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
447 enqueued = work_.enqueue_bulk(*token, it, count);
449 enqueued = work_.enqueue_bulk(it, count);
451 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
456 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
457 ssize_t sleeping = numSleeping_.load(std::memory_order_acquire);
458 ssize_t toWake = std::min(sleeping,
static_cast<ssize_t
>(count));
465template <
typename Generator>
466void ThreadPool::scheduleBulk(
size_t count, Generator&& gen) {
471 ssize_t numPool = numThreads_.load(std::memory_order_relaxed);
474 for (
size_t i = 0; i < count; ++i) {
481 size_t chunkSize =
static_cast<size_t>(numPool) +
static_cast<size_t>(numPool) / 2;
487 ssize_t curWork = workRemaining_.load(std::memory_order_relaxed);
488 ssize_t loadFactor = poolLoadFactor_.load(std::memory_order_relaxed);
489 if (curWork > loadFactor) {
495 ssize_t room = loadFactor - curWork;
496 size_t toEnqueue = std::min({count - i, chunkSize,
static_cast<size_t>(room)});
497 if (toEnqueue == 0) {
501 scheduleBulkEnqueue(toEnqueue, [&gen, base](
size_t j) {
return gen(base + j); });
void setSignalingWake(bool enable, const std::chrono::duration< Rep, Period > &sleepDuration=std::chrono::microseconds(kDefaultSleepLenUs))
DISPENSO_DLL_ACCESS ~ThreadPool()
DISPENSO_DLL_ACCESS void resize(ssize_t n)
ssize_t numThreads() const
DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier=32)
DISPENSO_DLL_ACCESS ThreadPool & globalThreadPool()
DISPENSO_DLL_ACCESS void resizeGlobalThreadPool(size_t numThreads)