8#include <dispenso/detail/quanta.h>
11#if defined DISPENSO_DEBUG
18size_t getAdjustedThreadCount(
size_t requested) {
19 static const size_t maxThreads = []() {
20 size_t maxT = std::numeric_limits<size_t>::max();
22#if defined(_WIN32) && !defined(__MINGW32__)
24#pragma warning(disable : 4996)
27 char* envThreads = std::getenv(
"DISPENSO_MAX_THREADS_PER_POOL");
29#if defined(_WIN32) && !defined(__MINGW32__)
35 maxT = std::strtoul(envThreads, &end, 10);
36#if defined DISPENSO_DEBUG
37 std::cout <<
"DISPENSO_MAX_THREADS_PER_POOL = " << maxT << std::endl;
43 return std::min(requested, maxThreads);
47void ThreadPool::PerThreadData::setThread(std::thread&& t) {
48 thread_ = std::move(t);
51void ThreadPool::PerThreadData::stop() {
52 running_.store(
false, std::memory_order_release);
55uint32_t ThreadPool::wait(uint32_t currentEpoch) {
56 if (sleepLengthUs_ > 0) {
57 return epochWaiter_.waitFor(currentEpoch, sleepLengthUs_.load(std::memory_order_acquire));
59 return epochWaiter_.current();
62void ThreadPool::wake() {
63 epochWaiter_.bumpAndWake();
66inline bool ThreadPool::PerThreadData::running() {
67 return running_.load(std::memory_order_acquire);
74 detail::registerFineSchedulerQuanta();
75#if defined DISPENSO_DEBUG
79 threads_.emplace_back();
80 threads_.back().setThread(std::thread([
this, &back = threads_.back()]() { threadLoop(back); }));
84ThreadPool::PerThreadData::~PerThreadData() {}
86void ThreadPool::threadLoop(PerThreadData& data) {
87 constexpr int kBackoffYield = 50;
88 constexpr int kBackoffSleep = kBackoffYield + 5;
90 moodycamel::ConsumerToken ctoken(work_);
91 moodycamel::ProducerToken ptoken(work_);
96 detail::PerPoolPerThreadInfo::registerPool(
this, &ptoken);
97 uint32_t epoch = epochWaiter_.current();
99 if (enableEpochWaiter_) {
101 idleButAwake_.fetch_add(1, std::memory_order_acq_rel);
103 while (data.running()) {
105 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
106 bool got = work_.try_dequeue(ctoken, next);
107 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
111 queuedWork_.fetch_sub(1, std::memory_order_acq_rel);
114 idleButAwake_.fetch_sub(1, std::memory_order_acq_rel);
116 executeNext(std::move(next));
122 idleButAwake_.fetch_add(1, std::memory_order_acq_rel);
128 if (failCount > kBackoffSleep) {
129 idleButAwake_.fetch_sub(1, std::memory_order_acq_rel);
131 idleButAwake_.fetch_add(1, std::memory_order_acq_rel);
132 }
else if (failCount > kBackoffYield) {
133 std::this_thread::yield();
136 idleButAwake_.fetch_sub(1, std::memory_order_acq_rel);
138 while (data.running()) {
140 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
141 bool got = work_.try_dequeue(ctoken, next);
142 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
146 executeNext(std::move(next));
153 if (failCount > kBackoffSleep) {
155 }
else if (failCount > kBackoffYield) {
156 std::this_thread::yield();
162void ThreadPool::resizeLocked(ssize_t sn) {
163 sn = getAdjustedThreadCount(sn);
166 size_t n =
static_cast<size_t>(sn);
168 if (n < threads_.size()) {
169 for (
size_t i = n; i < threads_.size(); ++i) {
173 while (threads_.size() > n) {
175 threads_.back().thread_.join();
179 }
else if (n > threads_.size()) {
180 for (
size_t i = threads_.size(); i < n; ++i) {
181 threads_.emplace_back();
182 threads_.back().setThread(
183 std::thread([
this, &back = threads_.back()]() { threadLoop(back); }));
186 poolLoadFactor_.store(
static_cast<ssize_t
>(n * poolLoadMultiplier_), std::memory_order_relaxed);
187 numThreads_.store(sn, std::memory_order_relaxed);
192 while (tryExecuteNext()) {
198#if defined DISPENSO_DEBUG
204 std::unique_lock<std::mutex>
lk(threadsMutex_, std::try_to_lock);
206 for (
auto&
t : threads_) {
211 while (tryExecuteNext()) {
214 while (!threads_.empty()) {
216 threads_.back().thread_.join();
220 while (tryExecuteNext()) {
231void resizeGlobalThreadPool(
size_t numThreads) {
232 globalThreadPool().
resize(
static_cast<ssize_t
>(numThreads));
DISPENSO_DLL_ACCESS ThreadPool(size_t n, size_t poolLoadMultiplier=32)
DISPENSO_DLL_ACCESS void resize(ssize_t n)
DISPENSO_DLL_ACCESS ~ThreadPool()
detail::OpResult< T > OpResult