dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
graph_executor.cpp
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#include <dispenso/graph_executor.h>
9
10namespace dispenso {
11
12template <typename G>
14 using NodeType = typename G::NodeType;
15 nodesToExecute_.clear();
16 nodesToExecuteNext_.clear();
17
18 graph.forEachNode([&](const NodeType& node) {
20 nodesToExecute_.emplace_back(&node);
21 }
22 });
23
24 while (!nodesToExecute_.empty()) {
25 for (const Node* n : nodesToExecute_) {
26 const NodeType* node = static_cast<const NodeType*>(n);
27 node->run();
28 node->forEachDependent([&](const Node& d) {
30 static_cast<const NodeType&>(d), std::memory_order_relaxed)) {
31 nodesToExecuteNext_.emplace_back(static_cast<const NodeType*>(&d));
32 }
33 });
34 }
35 nodesToExecute_.swap(nodesToExecuteNext_);
36 nodesToExecuteNext_.clear();
37 }
38}
39
40template <typename TaskSetT, typename G>
42 using NodeType = typename G::NodeType;
43 nodesToExecute_.clear();
44 nodesToExecuteNext_.clear();
45
46 graph.forEachNode([&](const NodeType& node) {
48 nodesToExecute_.emplace_back(&node);
49 }
50 });
51 while (!nodesToExecute_.empty()) {
52 dispenso::parallel_for(taskSet, size_t(0), nodesToExecute_.size(), [this](size_t i) {
53 const NodeType* node = static_cast<const NodeType*>(nodesToExecute_[i]);
54 node->run();
55
56 node->forEachDependent([&](const Node& d) {
57 if (decNumIncompletePredecessors(
58 static_cast<const NodeType&>(d), std::memory_order_acq_rel)) {
59 nodesToExecuteNext_.emplace_back(static_cast<const NodeType*>(&d));
60 }
61 });
62 });
63
64 nodesToExecute_.swap(nodesToExecuteNext_);
65 nodesToExecuteNext_.clear();
66 }
67}
68
69template <typename G>
70void ConcurrentTaskSetExecutor::operator()(
72 const G& graph,
73 bool wait) {
74 using NodeType = typename G::NodeType;
75 startNodes_.clear();
76
77 graph.forEachNode([&](const NodeType& node) {
79 startNodes_.emplace_back(&node);
80 }
81 });
82
83 for (const Node* n : startNodes_) {
84 const NodeType* node = static_cast<const NodeType*>(n);
85 tasks.schedule([&tasks, node]() { evaluateNodeConcurrently(tasks, node); });
86 }
87 if (wait) {
88 tasks.wait();
89 }
90}
91
92template <typename G>
93void setAllNodesIncomplete(const G& graph) {
94 using NodeType = typename G::NodeType;
95
96 graph.forEachNode([&](const NodeType& node) {
97 node.numIncompletePredecessors_.store(node.numPredecessors(), std::memory_order_relaxed);
98 });
99}
100
101template <typename G>
102void ForwardPropagator::operator()(const G& graph) {
103 using NodeType = typename G::NodeType;
104
105 nodesToVisit_.clear();
106 nodesToVisitNext_.clear();
107 visited_.clear();
108 groups_.clear();
109
110 graph.forEachNode([&](const NodeType& node) {
111 if (!node.isCompleted()) {
112 nodesToVisit_.emplace_back(&node);
113 visited_.insert(&node);
114 appendGroup(static_cast<const NodeType*>(&node), groups_);
115 }
116 });
117
118 while (!nodesToVisit_.empty()) {
119 for (const Node* node : nodesToVisit_) {
120 node->forEachDependent([&](const Node& d) {
122 if (visited_.insert(static_cast<const NodeType*>(&d)).second) {
123 nodesToVisitNext_.emplace_back(static_cast<const NodeType*>(&d));
124 appendGroup(static_cast<const NodeType*>(&d), groups_);
125 }
126 });
127 }
128 nodesToVisit_.swap(nodesToVisitNext_);
129 nodesToVisitNext_.clear();
130 }
131
133}
134
135template <>
136void ForwardPropagator::propagateIncompleteStateBidirectionally<Node>() {}
137template <>
138void ForwardPropagator::propagateIncompleteStateBidirectionally<BiPropNode>() {
139 nodesToVisit_.clear();
140
141 for (const std::vector<const BiPropNode*>* group : groups_) {
142 for (const dispenso::BiPropNode* gnode : *group) {
143 if (gnode->setIncomplete()) {
144 nodesToVisit_.emplace_back(gnode);
145 }
146 }
147 }
148
149 for (const Node* node : nodesToVisit_) {
150 const BiPropNode* biPropNode = static_cast<const BiPropNode*>(node);
151 biPropNode->forEachDependent([](const Node& d) { ifIncompleteAddIncompletePredecessor(d); });
152 }
153}
154
155template DISPENSO_DLL_ACCESS void SingleThreadExecutor::operator()<Graph>(const Graph&);
156template DISPENSO_DLL_ACCESS void SingleThreadExecutor::operator()<BiPropGraph>(const BiPropGraph&);
157
158template DISPENSO_DLL_ACCESS void ParallelForExecutor::operator()<TaskSet, Graph>(
159 TaskSet&,
160 const Graph&);
161template DISPENSO_DLL_ACCESS void ParallelForExecutor::operator()<TaskSet, BiPropGraph>(
162 TaskSet&,
163 const BiPropGraph&);
164template DISPENSO_DLL_ACCESS void ParallelForExecutor::operator()<ConcurrentTaskSet, Graph>(
165 ConcurrentTaskSet& tasks,
166 const Graph& graph);
167template DISPENSO_DLL_ACCESS void ParallelForExecutor::operator()<ConcurrentTaskSet, BiPropGraph>(
168 ConcurrentTaskSet& tasks,
169 const BiPropGraph& graph);
170
171template DISPENSO_DLL_ACCESS void ConcurrentTaskSetExecutor::operator()<Graph>(
173 const Graph& graph,
174 bool wait);
175template DISPENSO_DLL_ACCESS void ConcurrentTaskSetExecutor::operator()<BiPropGraph>(
177 const BiPropGraph& graph,
178 bool wait);
179
180template DISPENSO_DLL_ACCESS void setAllNodesIncomplete<Graph>(const Graph&);
181template DISPENSO_DLL_ACCESS void setAllNodesIncomplete<BiPropGraph>(const BiPropGraph&);
182template DISPENSO_DLL_ACCESS void ForwardPropagator::operator()<Graph>(const Graph&);
183template DISPENSO_DLL_ACCESS void ForwardPropagator::operator()<BiPropGraph>(const BiPropGraph&);
184
185} // namespace dispenso
void operator()(TaskSetT &taskSet, const G &graph)
void operator()(const G &graph)
void parallel_for(TaskSetT &taskSet, StateContainer &states, const StateGen &defaultState, const ChunkedRange< IntegerT > &range, F &&f, ParForOptions options={})
detail::OpResult< T > OpResult
Definition pipeline.h:29