dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
timed_task.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
12#pragma once
13
14#include <chrono>
15#include <functional>
16#include <memory>
17#include <queue>
18
19#include <dispenso/detail/timed_task_impl.h>
20#include <dispenso/priority.h>
21#include <dispenso/timing.h>
22
23namespace dispenso {
24
28enum class TimedTaskType {
29 kNormal
30 ,
31 kSteady
33};
34
40class TimedTask {
41 public:
42 TimedTask(const TimedTask&) = delete;
43 TimedTask(TimedTask&& other) : impl_(std::move(other.impl_)) {}
44
45 TimedTask& operator=(const TimedTask&) = delete;
46 TimedTask& operator=(TimedTask&& other) {
47 impl_ = std::move(other.impl_);
48 return *this;
49 }
50
55 void cancel() {
56 impl_->timesToRun.store(0, std::memory_order_release);
57 impl_->flags.fetch_or(detail::kFFlagsCancelled, std::memory_order_release);
58 }
59
65 void detach() {
66 impl_->flags.fetch_or(detail::kFFlagsDetached, std::memory_order_release);
67 }
68
74 size_t calls() const {
75 return impl_->count.load(std::memory_order_acquire);
76 }
77
86 if (!impl_ || impl_->flags.load(std::memory_order_acquire) & detail::kFFlagsDetached) {
87 return;
88 }
89 cancel();
90 while (impl_->inProgress.load(std::memory_order_acquire)) {
91 }
92 // Now we can safely destroy the underlying function. We do this here because we can't risk
93 // that func may call code in it's destructor that may no longer be relevant after this
94 // TimedTask destructor completes.
95 impl_->func = {};
96 }
97
98 private:
99 template <typename Schedulable, typename F>
100 TimedTask(
101 Schedulable& sched,
102 F&& f,
103 double nextRunAbs,
104 double period = 0.0,
105 size_t timesToRun = 1,
106 TimedTaskType type = TimedTaskType::kNormal)
107 : impl_(
108 detail::make_shared<detail::TimedTaskImpl>(
109 timesToRun,
110 nextRunAbs,
111 period,
112 std::forward<F>(f),
113 sched,
114 type == TimedTaskType::kSteady)) {}
115
116 std::shared_ptr<detail::TimedTaskImpl> impl_;
117
118 friend class TimedTaskScheduler;
119}; // namespace dispenso
120
128 public:
137 DISPENSO_DLL_ACCESS explicit TimedTaskScheduler(
138 ThreadPriority priority = ThreadPriority::kNormal);
139 DISPENSO_DLL_ACCESS ~TimedTaskScheduler();
140
145 std::lock_guard<std::mutex> lk(queueMutex_);
146 priority_ = priority;
147 }
148
163 template <typename Schedulable, typename F>
165 Schedulable& sched,
166 F&& func,
167 double nextRunAbs,
168 double period = 0.0,
169 size_t timesToRun = 1,
170 TimedTaskType type = TimedTaskType::kNormal) {
171 TimedTask task(sched, std::forward<F>(func), nextRunAbs, period, timesToRun, type);
172 addTimedTask(task.impl_);
173 return task;
174 }
175
185 template <typename Schedulable, typename Rep, typename Period, typename F>
187 schedule(Schedulable& sched, F&& func, const std::chrono::duration<Rep, Period>& timeInFuture) {
188 return schedule(sched, std::forward<F>(func), toNextRun(timeInFuture));
189 }
190
201 template <typename Schedulable, typename Clock, typename Duration, typename F>
203 Schedulable& sched,
204 F&& func,
205 const std::chrono::time_point<Clock, Duration>& nextRunTime) {
206 return schedule(sched, std::forward<F>(func), toNextRun(nextRunTime));
207 }
208
222 template <typename Schedulable, typename Rep, typename Period, typename F>
224 Schedulable& sched,
225 F&& func,
226 const std::chrono::duration<Rep, Period>& timeInFuture,
227 const std::chrono::duration<Rep, Period>& period,
228 size_t timesToRun = std::numeric_limits<size_t>::max(),
229 TimedTaskType type = TimedTaskType::kNormal) {
230 return schedule(
231 sched, std::forward<F>(func), toNextRun(timeInFuture), toPeriod(period), timesToRun, type);
232 }
233
248 template <
249 typename Schedulable,
250 typename Rep,
251 typename Period,
252 typename Clock,
253 typename Duration,
254 typename F>
256 Schedulable& sched,
257 F&& func,
258 const std::chrono::time_point<Clock, Duration>& nextRunTime,
259 const std::chrono::duration<Rep, Period>& period,
260 size_t timesToRun = std::numeric_limits<size_t>::max(),
261 TimedTaskType type = TimedTaskType::kNormal) {
262 return schedule(
263 sched, std::forward<F>(func), toNextRun(nextRunTime), toPeriod(period), timesToRun, type);
264 }
265
266 private:
267 template <class Rep, class Period>
268 static double toNextRun(const std::chrono::duration<Rep, Period>& timeInFuture) {
269 return getTime() + std::chrono::duration<double>(timeInFuture).count();
270 }
271
272 template <typename Clock, typename Duration>
273 static double toNextRun(const std::chrono::time_point<Clock, Duration>& nextRunTime) {
274 auto curTime = Clock::now();
275 return toNextRun(nextRunTime - curTime);
276 }
277
278 template <class Rep, class Period>
279 static double toPeriod(const std::chrono::duration<Rep, Period>& period) {
280 return std::chrono::duration<double>(period).count();
281 }
282 DISPENSO_DLL_ACCESS void addTimedTask(std::shared_ptr<detail::TimedTaskImpl> task);
283 void timeQueueRunLoop();
284
285 void kickOffTask(std::shared_ptr<detail::TimedTaskImpl> next, double curTime);
286
287 struct Compare {
288 bool operator()(
289 const std::shared_ptr<detail::TimedTaskImpl>& a,
290 const std::shared_ptr<detail::TimedTaskImpl>& b) const {
291 return a->nextAbsTime > b->nextAbsTime;
292 }
293 };
294
295 // TODO(bbudge): Consider lock-free priority queue implementation. I'd expect it to be minimally
296 // beneficial for this use case though... timed tasks should rarely be super-high contention.
297 std::mutex queueMutex_;
298 std::priority_queue<
299 std::shared_ptr<detail::TimedTaskImpl>,
300 std::vector<std::shared_ptr<detail::TimedTaskImpl>>,
301 Compare>
302 tasks_;
303 bool running_{true};
304 detail::EpochWaiter epoch_;
305 std::thread thread_;
306 ThreadPriority priority_;
307};
308
314DISPENSO_DLL_ACCESS TimedTaskScheduler& globalTimedTaskScheduler();
315
316} // namespace dispenso
size_t calls() const
Definition timed_task.h:74
void setPriority(ThreadPriority priority)
Definition timed_task.h:144
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:223
TimedTask schedule(Schedulable &sched, F &&func, double nextRunAbs, double period=0.0, size_t timesToRun=1, TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:164
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:255
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture)
Definition timed_task.h:187
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime)
Definition timed_task.h:202
DISPENSO_DLL_ACCESS TimedTaskScheduler(ThreadPriority priority=ThreadPriority::kNormal)
ThreadPriority
Definition priority.h:29