dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
timed_task.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <chrono>
17#include <functional>
18#include <memory>
19#include <queue>
20
21#include <dispenso/detail/timed_task_impl.h>
22#include <dispenso/priority.h>
23#include <dispenso/timing.h>
24
25namespace dispenso {
26
30enum class TimedTaskType {
31 kNormal
32 ,
33 kSteady
35};
36
42class TimedTask {
43 public:
44 TimedTask(const TimedTask&) = delete;
45 TimedTask(TimedTask&& other) : impl_(std::move(other.impl_)) {}
46
47 TimedTask& operator=(const TimedTask&) = delete;
48 TimedTask& operator=(TimedTask&& other) {
49 impl_ = std::move(other.impl_);
50 return *this;
51 }
52
57 void cancel() {
58 impl_->timesToRun.store(0, std::memory_order_release);
59 impl_->flags.fetch_or(detail::kFFlagsCancelled, std::memory_order_release);
60 }
61
67 void detach() {
68 impl_->flags.fetch_or(detail::kFFlagsDetached, std::memory_order_release);
69 }
70
76 size_t calls() const {
77 return impl_->count.load(std::memory_order_acquire);
78 }
79
88 if (!impl_ || impl_->flags.load(std::memory_order_acquire) & detail::kFFlagsDetached) {
89 return;
90 }
91 cancel();
92 while (impl_->inProgress.load(std::memory_order_acquire)) {
93 }
94 // Now we can safely destroy the underlying function. We do this here because we can't risk
95 // that func may call code in it's destructor that may no longer be relevant after this
96 // TimedTask destructor completes.
97 impl_->func = {};
98 }
99
100 private:
101 template <typename Schedulable, typename F>
102 TimedTask(
103 Schedulable& sched,
104 F&& f,
105 double nextRunAbs,
106 double period = 0.0,
107 size_t timesToRun = 1,
108 TimedTaskType type = TimedTaskType::kNormal)
109 : impl_(
110 detail::make_shared<detail::TimedTaskImpl>(
111 timesToRun,
112 nextRunAbs,
113 period,
114 std::forward<F>(f),
115 sched,
116 type == TimedTaskType::kSteady)) {}
117
118 std::shared_ptr<detail::TimedTaskImpl> impl_;
119
120 friend class TimedTaskScheduler;
121}; // namespace dispenso
122
130 public:
139 DISPENSO_DLL_ACCESS explicit TimedTaskScheduler(
140 ThreadPriority priority = ThreadPriority::kNormal);
141 DISPENSO_DLL_ACCESS ~TimedTaskScheduler();
142
147 std::lock_guard<std::mutex> lk(queueMutex_);
148 priority_ = priority;
149 }
150
165 template <typename Schedulable, typename F>
167 Schedulable& sched,
168 F&& func,
169 double nextRunAbs,
170 double period = 0.0,
171 size_t timesToRun = 1,
172 TimedTaskType type = TimedTaskType::kNormal) {
173 TimedTask task(sched, std::forward<F>(func), nextRunAbs, period, timesToRun, type);
174 addTimedTask(task.impl_);
175 return task;
176 }
177
187 template <typename Schedulable, typename Rep, typename Period, typename F>
189 schedule(Schedulable& sched, F&& func, const std::chrono::duration<Rep, Period>& timeInFuture) {
190 return schedule(sched, std::forward<F>(func), toNextRun(timeInFuture));
191 }
192
203 template <typename Schedulable, typename Clock, typename Duration, typename F>
205 Schedulable& sched,
206 F&& func,
207 const std::chrono::time_point<Clock, Duration>& nextRunTime) {
208 return schedule(sched, std::forward<F>(func), toNextRun(nextRunTime));
209 }
210
224 template <typename Schedulable, typename Rep, typename Period, typename F>
226 Schedulable& sched,
227 F&& func,
228 const std::chrono::duration<Rep, Period>& timeInFuture,
229 const std::chrono::duration<Rep, Period>& period,
230 size_t timesToRun = std::numeric_limits<size_t>::max(),
231 TimedTaskType type = TimedTaskType::kNormal) {
232 return schedule(
233 sched, std::forward<F>(func), toNextRun(timeInFuture), toPeriod(period), timesToRun, type);
234 }
235
250 template <
251 typename Schedulable,
252 typename Rep,
253 typename Period,
254 typename Clock,
255 typename Duration,
256 typename F>
258 Schedulable& sched,
259 F&& func,
260 const std::chrono::time_point<Clock, Duration>& nextRunTime,
261 const std::chrono::duration<Rep, Period>& period,
262 size_t timesToRun = std::numeric_limits<size_t>::max(),
263 TimedTaskType type = TimedTaskType::kNormal) {
264 return schedule(
265 sched, std::forward<F>(func), toNextRun(nextRunTime), toPeriod(period), timesToRun, type);
266 }
267
268 private:
269 template <class Rep, class Period>
270 static double toNextRun(const std::chrono::duration<Rep, Period>& timeInFuture) {
271 return getTime() + std::chrono::duration<double>(timeInFuture).count();
272 }
273
274 template <typename Clock, typename Duration>
275 static double toNextRun(const std::chrono::time_point<Clock, Duration>& nextRunTime) {
276 auto curTime = Clock::now();
277 return toNextRun(nextRunTime - curTime);
278 }
279
280 template <class Rep, class Period>
281 static double toPeriod(const std::chrono::duration<Rep, Period>& period) {
282 return std::chrono::duration<double>(period).count();
283 }
284 DISPENSO_DLL_ACCESS void addTimedTask(std::shared_ptr<detail::TimedTaskImpl> task);
285 void timeQueueRunLoop();
286
287 void kickOffTask(std::shared_ptr<detail::TimedTaskImpl> next, double curTime);
288
289 struct Compare {
290 bool operator()(
291 const std::shared_ptr<detail::TimedTaskImpl>& a,
292 const std::shared_ptr<detail::TimedTaskImpl>& b) const {
293 return a->nextAbsTime > b->nextAbsTime;
294 }
295 };
296
297 // TODO(bbudge): Consider lock-free priority queue implementation. I'd expect it to be minimally
298 // beneficial for this use case though... timed tasks should rarely be super-high contention.
299 std::mutex queueMutex_;
300 std::priority_queue<
301 std::shared_ptr<detail::TimedTaskImpl>,
302 std::vector<std::shared_ptr<detail::TimedTaskImpl>>,
303 Compare>
304 tasks_;
305 bool running_{true};
306 detail::EpochWaiter epoch_;
307 std::thread thread_;
308 ThreadPriority priority_;
309};
310
316DISPENSO_DLL_ACCESS TimedTaskScheduler& globalTimedTaskScheduler();
317
318} // namespace dispenso
size_t calls() const
Definition timed_task.h:76
void setPriority(ThreadPriority priority)
Definition timed_task.h:146
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:225
TimedTask schedule(Schedulable &sched, F &&func, double nextRunAbs, double period=0.0, size_t timesToRun=1, TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:166
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:257
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture)
Definition timed_task.h:189
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime)
Definition timed_task.h:204
DISPENSO_DLL_ACCESS TimedTaskScheduler(ThreadPriority priority=ThreadPriority::kNormal)
ThreadPriority
Definition priority.h:30