dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
timed_task.cpp
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#include <dispenso/detail/quanta.h>
9#include <dispenso/timed_task.h>
10
11#include <iostream>
12
13namespace dispenso {
14
16 : thread_([this, prio]() {
17 detail::registerFineSchedulerQuanta();
18 if (!setCurrentThreadPriority(prio)) {
19 std::cerr << "Couldn't set thread priority" << std::endl;
20 }
21 timeQueueRunLoop();
22 }),
23 priority_(prio) {}
24TimedTaskScheduler::~TimedTaskScheduler() {
25 {
26 std::lock_guard<std::mutex> lk(queueMutex_);
27 running_ = false;
28 }
29 epoch_.bumpAndWake();
30 thread_.join();
31}
32
33void TimedTaskScheduler::kickOffTask(std::shared_ptr<detail::TimedTaskImpl> next, double curTime) {
34 size_t remaining = next->timesToRun.fetch_sub(1, std::memory_order_acq_rel);
35 if (remaining == 1) {
36 auto* np = next.get();
37 np->func(std::move(next));
38 } else if (remaining > 1) {
39 next->func(next);
40
41 if (next->steady) {
42 next->nextAbsTime += next->period;
43 } else {
44 next->nextAbsTime = curTime + next->period;
45 }
46 std::lock_guard<std::mutex> lk(queueMutex_);
47 tasks_.push(std::move(next));
48 }
49}
50
51constexpr double kSmallTimeBuffer = 10e-6;
52
53void TimedTaskScheduler::timeQueueRunLoop() {
54#if defined(_WIN32)
55 constexpr double kSpinYieldBuffer = 1e-3;
56 constexpr double kSpinBuffer = 100e-6;
57#else
58 constexpr double kSpinYieldBuffer = 500e-6;
59 constexpr double kSpinBuffer = 50e-6;
60#endif // platform
61 constexpr double kConvertToUs = 1e6;
62
63 uint32_t curEpoch = epoch_.current();
64
65 while (true) {
66 {
67 std::unique_lock<std::mutex> lk(queueMutex_);
68 if (priority_ != getCurrentThreadPriority()) {
69 setCurrentThreadPriority(priority_);
70 }
71
72 if (!running_) {
73 break;
74 }
75 if (tasks_.empty()) {
76 lk.unlock();
77 curEpoch = epoch_.wait(curEpoch);
78 continue;
79 }
80 }
81 double curTime = getTime();
82 double timeRemaining;
83 std::unique_lock<std::mutex> lk(queueMutex_);
84 timeRemaining = tasks_.top()->nextAbsTime - curTime;
85 if (timeRemaining < kSmallTimeBuffer) {
86 auto next = tasks_.top();
87 tasks_.pop();
88 lk.unlock();
89
90 kickOffTask(std::move(next), curTime);
91 } else if (timeRemaining < kSpinBuffer) {
92 continue;
93 } else if (timeRemaining < kSpinYieldBuffer) {
94 lk.unlock();
95 std::this_thread::yield();
96 continue;
97 } else {
98 lk.unlock();
99 curEpoch = epoch_.waitFor(
100 curEpoch, static_cast<uint64_t>((timeRemaining - kSpinBuffer) * kConvertToUs));
101 }
102 }
103}
104
105void TimedTaskScheduler::addTimedTask(std::shared_ptr<detail::TimedTaskImpl> task) {
106 double curTime = getTime();
107 double timeRemaining;
108 timeRemaining = task->nextAbsTime - curTime;
109 if (timeRemaining < kSmallTimeBuffer) {
110 kickOffTask(std::move(task), curTime);
111 } else {
112 std::lock_guard<std::mutex> lk(queueMutex_);
113 tasks_.push(std::move(task));
114 }
115 epoch_.bumpAndWake();
116}
117
118TimedTaskScheduler& globalTimedTaskScheduler() {
119 static TimedTaskScheduler scheduler;
120 return scheduler;
121}
122
123} // namespace dispenso
DISPENSO_DLL_ACCESS TimedTaskScheduler(ThreadPriority priority=ThreadPriority::kNormal)
detail::OpResult< T > OpResult
Definition pipeline.h:29
ThreadPriority
Definition priority.h:29