dispenso 1.6.0
A library for task parallelism
Loading...
Searching...
No Matches
schedulable.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
15#pragma once
16
17#include <cassert>
18#include <chrono>
19#include <condition_variable>
20#include <mutex>
21#include <thread>
22
23#include <dispenso/detail/completion_event_impl.h>
24#include <dispenso/task_set.h>
25
26namespace dispenso {
27
35 public:
43 template <typename F>
44 DISPENSO_REQUIRES(OnceCallableFunc<F>)
45 void schedule(F&& f) const {
46 f();
47 }
48
54 template <typename F>
55 DISPENSO_REQUIRES(OnceCallableFunc<F>)
56 void schedule(F&& f, ForceQueuingTag) const {
57 f();
58 }
59};
60
61constexpr ImmediateInvoker kImmediateInvoker;
62
69 public:
77 template <typename F>
78 DISPENSO_REQUIRES(OnceCallableFunc<F>)
79 void schedule(F&& f) const {
80 schedule(std::forward<F>(f), ForceQueuingTag());
81 }
89 template <typename F>
90 DISPENSO_REQUIRES(OnceCallableFunc<F>)
91 void schedule(F&& f, ForceQueuingTag) const {
92 auto* waiter = getWaiter();
93 waiter->add();
94 std::thread thread([f = std::move(f), waiter]() {
95 // RAII so remove() runs even if f() throws. In practice an uncaught exception out of a
96 // std::thread function calls std::terminate -> abort, which kills the process before the
97 // atexit waiter could block, but the guard documents the invariant and covers exotic
98 // terminate-handler setups.
99 RemoveGuard guard{waiter};
100 f();
101 });
102 thread.detach();
103 }
104
105 private:
106 // DO NOT REMOVE WITHOUT READING THE FOLLOWING:
107 //
108 // schedule() above spawns a detached std::thread per call. On Windows shared-lib builds, if the
109 // process exits while a detached thread is mid-execution inside dispenso's code, the OS unmaps
110 // dispenso.dll's pages during DLL_PROCESS_DETACH while the thread is still executing them →
111 // EXCEPTION_ACCESS_VIOLATION (0xC0000005). On any platform, the detached thread also has a
112 // non-trivial window between Future::status notify(kReady) and the final
113 // decRefCountMaybeDestroy + thread-local-storage teardown, which extends past the user-visible
114 // future.get() return.
115 //
116 // Two mitigations, both needed:
117 // 1. pinModuleForNewThread() (schedulable.cpp, Windows-only) pins the module that contains
118 // dispenso's code so it is never unmapped while a detached thread is still executing in it.
119 // This removes the access-violation regardless of how long the thread runs, and also covers
120 // a host FreeLibrary() at runtime, which the atexit path cannot. (Only Windows eagerly
121 // unmaps a module's code under a running thread; POSIX/other loaders don't, so no analog.)
122 // 2. ThreadWaiter gives outstanding threads a BOUNDED grace period at exit to reach remove(),
123 // narrowing the post-future.get() teardown window. It is bounded (never blocks shutdown
124 // indefinitely) precisely because (1) makes a thread that runs past the deadline non-fatal.
125 // The SmallBufferAllocator controlled-leak fix (small_buffer_allocator.cpp) addresses a separate
126 // hazard (returning small buffers to a destroyed central store) and is also needed.
127 //
128 // The relevant tests are future_test_sans_exceptions and future_shared_test, the
129 // Future.AsyncNotAsyncSpecifyNewThread / NewThreadInvoker / AsyncSpecifyNewThread cases.
130 //
131 // The ThreadWaiter object itself is intentionally leaked (controlled-leak singleton in
132 // schedulable.cpp). Deleting it on shutdown would create a UAF window for any post-atexit
133 // schedule() call (external thread, static destructor) that hits a freed waiter.
134 struct ThreadWaiter {
135 int count_ = 0;
136 std::mutex mtx_;
137 std::condition_variable cond_;
138
139 void add() DISPENSO_NO_THREAD_SAFETY_ANALYSIS {
140 std::lock_guard<std::mutex> lk(mtx_);
141 ++count_;
142 }
143
144 void remove() DISPENSO_NO_THREAD_SAFETY_ANALYSIS {
145 std::lock_guard<std::mutex> lk(mtx_);
146 assert(count_ > 0 && "remove() called without matching add()");
147 if (--count_ == 0) {
148 cond_.notify_one();
149 }
150 }
151
152 void wait() {
153 std::unique_lock<std::mutex> lk(mtx_);
154 // Bounded best-effort: pinModuleForNewThread() keeps our code mapped, so a thread still
155 // running past this deadline cannot fault on unload — this only narrows the teardown window
156 // and must never wedge shutdown, so it times out rather than blocking forever.
157 cond_.wait_for(lk, std::chrono::seconds(2), [this]() { return count_ == 0; });
158 }
159 };
160
161 struct RemoveGuard {
162 ThreadWaiter* w;
163 ~RemoveGuard() {
164 w->remove();
165 }
166 };
167
168 DISPENSO_DLL_ACCESS static ThreadWaiter* getWaiter();
169};
170
171constexpr NewThreadInvoker kNewThreadInvoker;
172
173} // namespace dispenso
void schedule(F &&f) const
Definition schedulable.h:45
void schedule(F &&f) const
Definition schedulable.h:79