102 template <
class Rep,
class Period>
106 std::chrono::microseconds(kDefaultSleepLenUs)) {
120 std::lock_guard<std::mutex>
lk(threadsMutex_);
131 return numThreads_.load(std::memory_order_relaxed);
142 template <
typename F>
143 void schedule(
F&&
f);
153 template <
typename F>
164 class PerThreadData {
166 void setThread(std::thread&&
t);
175 alignas(kCacheLineSize) std::thread thread_;
176 std::atomic<bool> running_{
true};
179 DISPENSO_DLL_ACCESS uint32_t wait(uint32_t priorEpoch);
180 DISPENSO_DLL_ACCESS
void wake();
182 void setSignalingWake(
bool enable, uint32_t sleepDurationUs) {
183 std::lock_guard<std::mutex> lk(threadsMutex_);
184 ssize_t currentPoolSize = numThreads();
186 enableEpochWaiter_.store(enable, std::memory_order_release);
187 sleepLengthUs_.store(sleepDurationUs, std::memory_order_release);
188 resizeLocked(currentPoolSize);
191 DISPENSO_DLL_ACCESS
void resizeLocked(ssize_t n);
193 void executeNext(OnceFunction work);
195 DISPENSO_DLL_ACCESS
void threadLoop(PerThreadData& threadData);
197 bool tryExecuteNext();
198 bool tryExecuteNextFromProducerToken(moodycamel::ProducerToken& token);
200 template <
typename F>
201 void schedule(moodycamel::ProducerToken& token, F&& f);
203 template <
typename F>
204 void schedule(moodycamel::ProducerToken& token, F&& f, ForceQueuingTag);
206 void conditionallyWake() {
207 if (enableEpochWaiter_.load(std::memory_order_acquire)) {
209 auto queuedWork = queuedWork_.fetch_add(1, std::memory_order_acq_rel) + 1;
210 auto idle = idleButAwake_.load(std::memory_order_acquire);
211 if (idle < queuedWork) {
219#if __cplusplus < 201703L
220 static void*
operator new(
size_t sz) {
221 return detail::alignedMalloc(sz);
223 static void operator delete(
void* ptr) {
224 return detail::alignedFree(ptr);
229 mutable std::mutex threadsMutex_;
230 std::deque<PerThreadData> threads_;
231 size_t poolLoadMultiplier_;
232 std::atomic<ssize_t> poolLoadFactor_;
233 std::atomic<ssize_t> numThreads_;
235 moodycamel::ConcurrentQueue<OnceFunction> work_;
237 alignas(kCacheLineSize) std::atomic<ssize_t> queuedWork_{0};
238 alignas(kCacheLineSize) std::atomic<ssize_t> idleButAwake_{0};
240 alignas(kCacheLineSize) std::atomic<ssize_t> workRemaining_{0};
242 alignas(kCacheLineSize) detail::EpochWaiter epochWaiter_;
243 alignas(kCacheLineSize) std::atomic<bool> enableEpochWaiter_{kDefaultWakeupEnable};
244 std::atomic<uint32_t> sleepLengthUs_{kDefaultSleepLenUs};
246#if defined DISPENSO_DEBUG
247 alignas(kCacheLineSize) std::atomic<ssize_t> outstandingTaskSets_{0};
250 friend class ConcurrentTaskSet;
251 friend class TaskSet;
252 friend class TaskSetBase;
287 static_cast<moodycamel::ProducerToken*
>(detail::PerPoolPerThreadInfo::producer(
this))) {
292 if (!numThreads_.load(std::memory_order_relaxed)) {
296 workRemaining_.fetch_add(1, std::memory_order_release);
297 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
298 bool enqueued = work_.enqueue({std::forward<F>(
f)});
299 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();