8#include <dispenso/detail/quanta.h>
9#include <dispenso/timed_task.h>
17 detail::registerFineSchedulerQuanta();
18 if (!setCurrentThreadPriority(
prio)) {
19 std::cerr <<
"Couldn't set thread priority" << std::endl;
24TimedTaskScheduler::~TimedTaskScheduler() {
26 std::lock_guard<std::mutex> lk(queueMutex_);
33void TimedTaskScheduler::kickOffTask(std::shared_ptr<detail::TimedTaskImpl> next,
double curTime) {
34 size_t remaining = next->timesToRun.fetch_sub(1, std::memory_order_acq_rel);
36 auto* np = next.get();
37 np->func(std::move(next));
38 }
else if (remaining > 1) {
42 next->nextAbsTime += next->period;
44 next->nextAbsTime = curTime + next->period;
46 std::lock_guard<std::mutex> lk(queueMutex_);
47 tasks_.push(std::move(next));
51constexpr double kSmallTimeBuffer = 10e-6;
53void TimedTaskScheduler::timeQueueRunLoop() {
55 constexpr double kSpinYieldBuffer = 1e-3;
56 constexpr double kSpinBuffer = 100e-6;
58 constexpr double kSpinYieldBuffer = 500e-6;
59 constexpr double kSpinBuffer = 50e-6;
61 constexpr double kConvertToUs = 1e6;
63 uint32_t curEpoch = epoch_.current();
67 std::unique_lock<std::mutex> lk(queueMutex_);
68 if (priority_ != getCurrentThreadPriority()) {
69 setCurrentThreadPriority(priority_);
77 curEpoch = epoch_.wait(curEpoch);
81 double curTime = getTime();
83 std::unique_lock<std::mutex> lk(queueMutex_);
84 timeRemaining = tasks_.top()->nextAbsTime - curTime;
85 if (timeRemaining < kSmallTimeBuffer) {
86 auto next = tasks_.top();
90 kickOffTask(std::move(next), curTime);
91 }
else if (timeRemaining < kSpinBuffer) {
93 }
else if (timeRemaining < kSpinYieldBuffer) {
95 std::this_thread::yield();
99 curEpoch = epoch_.waitFor(
100 curEpoch,
static_cast<uint64_t
>((timeRemaining - kSpinBuffer) * kConvertToUs));
105void TimedTaskScheduler::addTimedTask(std::shared_ptr<detail::TimedTaskImpl> task) {
106 double curTime = getTime();
107 double timeRemaining;
108 timeRemaining = task->nextAbsTime - curTime;
109 if (timeRemaining < kSmallTimeBuffer) {
110 kickOffTask(std::move(task), curTime);
112 std::lock_guard<std::mutex> lk(queueMutex_);
113 tasks_.push(std::move(task));
115 epoch_.bumpAndWake();
118TimedTaskScheduler& globalTimedTaskScheduler() {
119 static TimedTaskScheduler scheduler;
DISPENSO_DLL_ACCESS TimedTaskScheduler(ThreadPriority priority=ThreadPriority::kNormal)
detail::OpResult< T > OpResult