dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
timed_task.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <chrono>
17#include <functional>
18#include <memory>
19#include <queue>
20
21#include <dispenso/detail/timed_task_impl.h>
22#include <dispenso/priority.h>
23#include <dispenso/timing.h>
24
25namespace dispenso {
26
30enum class TimedTaskType {
31 kNormal
32 ,
33 kSteady
35};
36
42class TimedTask {
43 public:
44 TimedTask(const TimedTask&) = delete;
46 TimedTask(TimedTask&& other) : impl_(std::move(other.impl_)) {}
47
48 TimedTask& operator=(const TimedTask&) = delete;
49 TimedTask& operator=(TimedTask&& other) {
50 impl_ = std::move(other.impl_);
51 return *this;
52 }
53
58 void cancel() {
59 impl_->timesToRun.store(0, std::memory_order_release);
60 impl_->flags.fetch_or(detail::kFFlagsCancelled, std::memory_order_release);
61 }
62
68 void detach() {
69 impl_->flags.fetch_or(detail::kFFlagsDetached, std::memory_order_release);
70 }
71
77 size_t calls() const {
78 return impl_->count.load(std::memory_order_acquire);
79 }
80
89 if (!impl_ || impl_->flags.load(std::memory_order_acquire) & detail::kFFlagsDetached) {
90 return;
91 }
92 cancel();
93 while (impl_->inProgress.load(std::memory_order_acquire)) {
94 }
95 // Now we can safely destroy the underlying function. We do this here because we can't risk
96 // that func may call code in it's destructor that may no longer be relevant after this
97 // TimedTask destructor completes.
98 impl_->func = {};
99 }
100
101 private:
102 template <typename Schedulable, typename F>
103 TimedTask(
104 Schedulable& sched,
105 F&& f,
106 double nextRunAbs,
107 double period = 0.0,
108 size_t timesToRun = 1,
109 TimedTaskType type = TimedTaskType::kNormal)
110 : impl_(
111 detail::make_shared<detail::TimedTaskImpl>(
112 timesToRun,
113 nextRunAbs,
114 period,
115 std::forward<F>(f),
116 sched,
117 type == TimedTaskType::kSteady)) {}
118
119 std::shared_ptr<detail::TimedTaskImpl> impl_;
120
121 friend class TimedTaskScheduler;
122}; // namespace dispenso
123
131 public:
140 DISPENSO_DLL_ACCESS explicit TimedTaskScheduler(
141 ThreadPriority priority = ThreadPriority::kNormal);
142 DISPENSO_DLL_ACCESS ~TimedTaskScheduler();
143
148 std::lock_guard<std::mutex> lk(queueMutex_);
149 priority_ = priority;
150 }
151
166 template <typename Schedulable, typename F>
168 Schedulable& sched,
169 F&& func,
170 double nextRunAbs,
171 double period = 0.0,
172 size_t timesToRun = 1,
173 TimedTaskType type = TimedTaskType::kNormal) {
174 TimedTask task(sched, std::forward<F>(func), nextRunAbs, period, timesToRun, type);
175 addTimedTask(task.impl_);
176 return task;
177 }
178
188 template <typename Schedulable, typename Rep, typename Period, typename F>
190 schedule(Schedulable& sched, F&& func, const std::chrono::duration<Rep, Period>& timeInFuture) {
191 return schedule(sched, std::forward<F>(func), toNextRun(timeInFuture));
192 }
193
204 template <typename Schedulable, typename Clock, typename Duration, typename F>
206 Schedulable& sched,
207 F&& func,
208 const std::chrono::time_point<Clock, Duration>& nextRunTime) {
209 return schedule(sched, std::forward<F>(func), toNextRun(nextRunTime));
210 }
211
225 template <typename Schedulable, typename Rep, typename Period, typename F>
227 Schedulable& sched,
228 F&& func,
229 const std::chrono::duration<Rep, Period>& timeInFuture,
230 const std::chrono::duration<Rep, Period>& period,
231 size_t timesToRun = std::numeric_limits<size_t>::max(),
232 TimedTaskType type = TimedTaskType::kNormal) {
233 return schedule(
234 sched, std::forward<F>(func), toNextRun(timeInFuture), toPeriod(period), timesToRun, type);
235 }
236
251 template <
252 typename Schedulable,
253 typename Rep,
254 typename Period,
255 typename Clock,
256 typename Duration,
257 typename F>
259 Schedulable& sched,
260 F&& func,
261 const std::chrono::time_point<Clock, Duration>& nextRunTime,
262 const std::chrono::duration<Rep, Period>& period,
263 size_t timesToRun = std::numeric_limits<size_t>::max(),
264 TimedTaskType type = TimedTaskType::kNormal) {
265 return schedule(
266 sched, std::forward<F>(func), toNextRun(nextRunTime), toPeriod(period), timesToRun, type);
267 }
268
269 private:
270 template <class Rep, class Period>
271 static double toNextRun(const std::chrono::duration<Rep, Period>& timeInFuture) {
272 return getTime() + std::chrono::duration<double>(timeInFuture).count();
273 }
274
275 template <typename Clock, typename Duration>
276 static double toNextRun(const std::chrono::time_point<Clock, Duration>& nextRunTime) {
277 auto curTime = Clock::now();
278 return toNextRun(nextRunTime - curTime);
279 }
280
281 template <class Rep, class Period>
282 static double toPeriod(const std::chrono::duration<Rep, Period>& period) {
283 return std::chrono::duration<double>(period).count();
284 }
285 DISPENSO_DLL_ACCESS void addTimedTask(std::shared_ptr<detail::TimedTaskImpl> task);
286 void timeQueueRunLoop();
287
288 void kickOffTask(std::shared_ptr<detail::TimedTaskImpl> next, double curTime);
289
290 struct Compare {
291 bool operator()(
292 const std::shared_ptr<detail::TimedTaskImpl>& a,
293 const std::shared_ptr<detail::TimedTaskImpl>& b) const {
294 return a->nextAbsTime > b->nextAbsTime;
295 }
296 };
297
298 // TODO(bbudge): Consider lock-free priority queue implementation. I'd expect it to be minimally
299 // beneficial for this use case though... timed tasks should rarely be super-high contention.
300 std::mutex queueMutex_;
301 std::priority_queue<
302 std::shared_ptr<detail::TimedTaskImpl>,
303 std::vector<std::shared_ptr<detail::TimedTaskImpl>>,
304 Compare>
305 tasks_;
306 bool running_{true};
307 detail::EpochWaiter epoch_;
308 std::thread thread_;
309 ThreadPriority priority_;
310};
311
318
319} // namespace dispenso
TimedTask(TimedTask &&other)
Definition timed_task.h:46
size_t calls() const
Definition timed_task.h:77
void setPriority(ThreadPriority priority)
Definition timed_task.h:147
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:226
TimedTask schedule(Schedulable &sched, F &&func, double nextRunAbs, double period=0.0, size_t timesToRun=1, TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:167
DISPENSO_DLL_ACCESS TimedTaskScheduler(ThreadPriority priority=ThreadPriority::kNormal)
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime, const std::chrono::duration< Rep, Period > &period, size_t timesToRun=std::numeric_limits< size_t >::max(), TimedTaskType type=TimedTaskType::kNormal)
Definition timed_task.h:258
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::duration< Rep, Period > &timeInFuture)
Definition timed_task.h:190
TimedTask schedule(Schedulable &sched, F &&func, const std::chrono::time_point< Clock, Duration > &nextRunTime)
Definition timed_task.h:205
ThreadPriority
Definition priority.h:30
DISPENSO_DLL_ACCESS TimedTaskScheduler & globalTimedTaskScheduler()
DISPENSO_DLL_ACCESS double getTime()