dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
timed_task.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
12#pragma once
13
14#include <chrono>
15#include <functional>
16#include <memory>
17#include <queue>
18
19#include <dispenso/detail/timed_task_impl.h>
20#include <dispenso/priority.h>
21#include <dispenso/timing.h>
22
23namespace dispenso {
24
28enum class TimedTaskType {
29 kNormal
30 ,
31 kSteady
33};
34
40class TimedTask {
41 public:
42 TimedTask(const TimedTask&) = delete;
43 TimedTask(TimedTask&& other) : impl_(std::move(other.impl_)) {}
44
45 TimedTask& operator=(const TimedTask&) = delete;
46 TimedTask& operator=(TimedTask&& other) {
47 impl_ = std::move(other.impl_);
48 return *this;
49 }
50
55 void cancel() {
56 impl_->timesToRun.store(0, std::memory_order_release);
57 impl_->flags.fetch_or(detail::kFFlagsCancelled, std::memory_order_release);
58 }
59
65 void detach() {
66 impl_->flags.fetch_or(detail::kFFlagsDetached, std::memory_order_release);
67 }
68
74 size_t calls() const {
75 return impl_->count.load(std::memory_order_acquire);
76 }
77
86 if (!impl_ || impl_->flags.load(std::memory_order_acquire) & detail::kFFlagsDetached) {
87 return;
88 }
89 cancel();
90 while (impl_->inProgress.load(std::memory_order_acquire)) {
91 }
92 // Now we can safely destroy the underlying function. We do this here because we can't risk
93 // that func may call code in it's destructor that may no longer be relevant after this
94 // TimedTask destructor completes.
95 impl_->func = {};
96 }
97
98 private:
99 template <typename Schedulable, typename F>
100 TimedTask(
102 F&& f,
103 double nextRunAbs,
104 double period = 0.0,
105 size_t timesToRun = 1,
106 TimedTaskType type = TimedTaskType::kNormal)
107 : impl_(detail::make_shared<detail::TimedTaskImpl>(
110 period,
111 std::forward<F>(f),
112 sched,
113 type == TimedTaskType::kSteady)) {}
114
115 std::shared_ptr<detail::TimedTaskImpl> impl_;
116
117 friend class TimedTaskScheduler;
118}; // namespace dispenso
119
127 public:
136 DISPENSO_DLL_ACCESS explicit TimedTaskScheduler(
137 ThreadPriority priority = ThreadPriority::kNormal);
138 DISPENSO_DLL_ACCESS ~TimedTaskScheduler();
139
144 std::lock_guard<std::mutex> lk(queueMutex_);
145 priority_ = priority;
146 }
147
162 template <typename Schedulable, typename F>
165 F&& func,
166 double nextRunAbs,
167 double period = 0.0,
168 size_t timesToRun = 1,
169 TimedTaskType type = TimedTaskType::kNormal) {
170 TimedTask task(sched, std::forward<F>(func), nextRunAbs, period, timesToRun, type);
171 addTimedTask(task.impl_);
172 return task;
173 }
174
184 template <typename Schedulable, typename Rep, typename Period, typename F>
186 schedule(Schedulable& sched, F&& func, const std::chrono::duration<Rep, Period>& timeInFuture) {
187 return schedule(sched, std::forward<F>(func), toNextRun(timeInFuture));
188 }
189
200 template <typename Schedulable, typename Clock, typename Duration, typename F>
203 F&& func,
204 const std::chrono::time_point<Clock, Duration>& nextRunTime) {
205 return schedule(sched, std::forward<F>(func), toNextRun(nextRunTime));
206 }
207
221 template <typename Schedulable, typename Rep, typename Period, typename F>
224 F&& func,
225 const std::chrono::duration<Rep, Period>& timeInFuture,
226 const std::chrono::duration<Rep, Period>& period,
227 size_t timesToRun = std::numeric_limits<size_t>::max(),
228 TimedTaskType type = TimedTaskType::kNormal) {
229 return schedule(
230 sched, std::forward<F>(func), toNextRun(timeInFuture), toPeriod(period), timesToRun, type);
231 }
232
247 template <
248 typename Schedulable,
249 typename Rep,
250 typename Period,
251 typename Clock,
252 typename Duration,
253 typename F>
256 F&& func,
257 const std::chrono::time_point<Clock, Duration>& nextRunTime,
258 const std::chrono::duration<Rep, Period>& period,
259 size_t timesToRun = std::numeric_limits<size_t>::max(),
260 TimedTaskType type = TimedTaskType::kNormal) {
261 return schedule(
262 sched, std::forward<F>(func), toNextRun(nextRunTime), toPeriod(period), timesToRun, type);
263 }
264
265 private:
266 template <class Rep, class Period>
267 static double toNextRun(const std::chrono::duration<Rep, Period>& timeInFuture) {
268 return getTime() + std::chrono::duration<double>(timeInFuture).count();
269 }
270
271 template <typename Clock, typename Duration>
272 static double toNextRun(const std::chrono::time_point<Clock, Duration>& nextRunTime) {
273 auto curTime = Clock::now();
274 return toNextRun(nextRunTime - curTime);
275 }
276
277 template <class Rep, class Period>
278 static double toPeriod(const std::chrono::duration<Rep, Period>& period) {
279 return std::chrono::duration<double>(period).count();
280 }
281 DISPENSO_DLL_ACCESS void addTimedTask(std::shared_ptr<detail::TimedTaskImpl> task);
282 void timeQueueRunLoop();
283
284 void kickOffTask(std::shared_ptr<detail::TimedTaskImpl> next, double curTime);
285
286 struct Compare {
287 bool operator()(
288 const std::shared_ptr<detail::TimedTaskImpl>& a,
289 const std::shared_ptr<detail::TimedTaskImpl>& b) const {
290 return a->nextAbsTime > b->nextAbsTime;
291 }
292 };
293
294 // TODO(bbudge): Consider lock-free priority queue implementation. I'd expect it to be minimally
295 // beneficial for this use case though... timed tasks should rarely be super-high contention.
296 std::mutex queueMutex_;
297 std::priority_queue<
298 std::shared_ptr<detail::TimedTaskImpl>,
299 std::vector<std::shared_ptr<detail::TimedTaskImpl>>,
300 Compare>
301 tasks_;
302 bool running_{true};
303 detail::EpochWaiter epoch_;
304 std::thread thread_;
305 ThreadPriority priority_;
306};
307
313DISPENSO_DLL_ACCESS TimedTaskScheduler& globalTimedTaskScheduler();
314
315} // namespace dispenso
size_t calls() const
Definition timed_task.h:74
void setPriority(ThreadPriority priority)
Definition timed_task.h:143
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:222
TimedTask schedule(Schedulable &sched, F &&func, double nextRunAbs, double period=0.0, size_t timesToRun=1, TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:163
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:254
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture)
Definition timed_task.h:186
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime)
Definition timed_task.h:201
detail::OpResult< T > OpResult
Definition pipeline.h:29
ThreadPriority
Definition priority.h:29