dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
graph.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#pragma once
9
10#include <atomic>
11#include <deque>
12#include <limits>
13#include <memory>
14#include <type_traits>
15#include <vector>
16
17#include <dispenso/platform.h>
20/*
21Terminology
22--------------------------------------------------------------------------------
23The Node depends on the predecessor. The dependent depends on the node.
24~~~
25┌─────────────┐ ┌──────┐ ┌───────────┐
26│ predecessor │ ──▶ │ node │ ──▶ │ dependent │
27└─────────────┘ └──────┘ └───────────┘
28~~~
29
30Graph construction
31--------------------------------------------------------------------------------
32The Graph class can be used to created tasks with dependencies and execute it once.
33Graphs must not contain cycles.
34Example:
35~~~
36//
37// ┌────────────┐ ┌───────────────┐ ┌───────────────────┐
38// │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
39// └────────────┘ └───────────────┘ └───────────────────┘
40// ▲
41// ┌────────────┐ ┌───────────────┐ │
42// │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
43// └────────────┘ └───────────────┘
44
45std::array<float, 5> r;
46
47dispenso::Graph graph;
48
49dispenso::Node& N0 = graph.addNode([&]() { r[0] += 1; });
50dispenso::Node& N2 = graph.addNode([&]() { r[2] += 8; });
51dispenso::Node& N1 = graph.addNode([&]() { r[1] += r[0] * 2; });
52dispenso::Node& N3 = graph.addNode([&]() { r[3] += r[2] / 2; });
53dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
54
55N4.dependsOn(N1, N3);
56N1.dependsOn(N0);
57N3.dependsOn(N2);
58
59dispenso::TaskSet taskSet(dispenso::globalThreadPool());
60dispenso::ParallelForExecutor parallelForExecutor;
61parallelForExecutor(taskSet, graph);
62~~~
63
64Partial revaluation
65--------------------------------------------------------------------------------
66If graph is big or we need to recompute graph partially we can execute it again.
67After execution of the graph all nodes change their state from "incomplete" to
68"completed". If order to evaluate whole graph again we can use function `setAllNodesIncomplete`
69Example:
70~~~
71r = {0, 0, 0, 0, 0};
72setAllNodesIncomplete(graph);
73parallelForExecutor(taskSet, graph);
74~~~
75
76The graph can be recomputed partially if we have new input data for one or several nodes in the
77graph. It order to do it we need to call `setIncomplete()` method for every node which we need to
78recompute and after use functor `ForwardPropagator` to mark as "incomplete" all dependents.
79
80Example:
81~~~
82N1.setIncomplete();
83r[1] = r[4] = 0;
84ForwardPropagator forwardPropagator;
85forwardPropagator(graph);
86evaluateGraph(graph);
87~~~
88In this exaple only node 1 and 4 will be invoked.
89
90 Subgraphs
91--------------------------------------------------------------------------------
92It is possible to organize nodes into subgraphs and destroy and recreate if we have static and
93dynamic parts of the computation graph
94Example:
95~~~
96//
97// ∙----subgraph1---∙ ∙---subgraph2-------∙
98// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ ┌───────────────────┐
99// ¦ │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
100// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦ └───────────────────┘
101// ¦ ¦ ¦ ¦ ▲
102// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ │
103// ¦ │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
104// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦
105// ∙----------------∙ ∙-------------------∙
106std::array<float, 5> r;
107dispenso::Graph graph;
108
109dispenso::Subgraph& subgraph1 = graph.addSubgraph();
110dispenso::Subgraph& subgraph2 = graph.addSubgraph();
111
112dispenso::Node& N0 = subgraph1.addNode([&]() { r[0] += 1; });
113dispenso::Node& N2 = subgraph1.addNode([&]() { r[2] += 8; });
114dispenso::Node& N1 = subgraph2.addNode([&]() { r[1] += r[0] * 2; });
115dispenso::Node& N3 = subgraph2.addNode([&]() { r[3] += r[2] / 2; });
116dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
117
118N4.dependsOn(N1, N3);
119N1.dependsOn(N0);
120N3.dependsOn(N2);
121
122// evaluate graph first time
123r = {0, 0, 0, 0, 0};
124dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
125dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
126concurrentTaskSetExecutor(concurrentTaskSet, graph);
127
128// disconnect and destroy nodes of subgraph2
129// it invalidates node references/pointers of this subgraph
130subgraph2.clear();
131
132// create another nodes
133dispenso::Node& newN1 = subgraph2.addNode([&]() { r[1] += r[0] * 20; });
134dispenso::Node& newN3 = subgraph2.addNode([&]() { r[3] += r[2] / 20; });
135newN1.dependsOn(N0);
136newN3.dependsOn(N2);
137N4.dependsOn(newN1, newN3);
138
139// and revaluae the graph
140setAllNodesIncomplete(movedGraph);
141concurrentTaskSetExecutor(concurrentTaskSet, graph);
142~~~
143
144Bidirectional propagation dependency
145--------------------------------------------------------------------------------
146In certain scenarios, nodes may alter the same memory. In such instances, it becomes necessary to
147compute the predecessors of the node, even if they possess a "completed" state following state
148propagation. To facilitate this process automatically, we introduce the notion of a bidirectional
149propagation dependency (`BiProp`).
150
151Example:
152~~~
153// ┌─────────────────┐
154// │ 3: m3+=b*b │
155// └─────────────────┘
156// ▲
157// ┌−-----------−−−−−−−−−│−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−┐
158// ╎ ┌───────────┐ ┌─────────────────┐ ┌────────────┐ ╎
159// ╎ │ 0: b+=5 │ ──▷ │ 1: b*=5 │ ──▷ │ 2: b/=m4 │ ╎
160// ╎ └───────────┘ └─────────────────┘ └────────────┘ ╎
161// └−−−-−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−▲−−−−−−−−−−−−┘
162// Legend: │
163// ──▶ Normal dependency ┌────────────┐
164// ──▷ Bidirectional propagation dependency │ 4: m4+=2 │
165// m4 variable modified only in node 4 └────────────┘
166
167float b, m3, m4
168dispenso::BiPropGraph g;
169std::array<dispenso::BiPropNode*, 8> N;
170
171dispenso::BiPropNode& N0 = g.addNode([&]() { b += 5; });
172dispenso::BiPropNode& N1 = g.addNode([&]() { b *= 5; });
173dispenso::BiPropNode& N2 = g.addNode([&]() { b /= m4; });
174dispenso::BiPropNode& N3 = g.addNode([&]() { m3 += b*b; });
175dispenso::BiPropNode& N4 = g.addNode([&]() { m4 += 2; });
176
177N3.dependsOn(N1);
178N2.dependsOn(N4);
179N2.biPropDependsOn(N1);
180N1.biPropDependsOn(N0);
181
182// first execution
183b = m3 = m4 = 0.f;
184dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
185dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
186concurrentTaskSetExecutor(concurrentTaskSet, g);
187
188N[4].setIncomplete();
189// if node 4 is incomplete after propagation node 2 become incomplete. Taking in account that node 2
190// bidirectionally depends on nodes 0 and 1 they will be marked as incomplete as well
191b = m4 = 0.f;
192ForwardPropagator forwardPropagator;
193forwardPropagator(g);
194concurrentTaskSetExecutor(concurrentTaskSet, g);
195~~~
196
197Please read tests from `graph_test.cpp` for more examples.
198*/
199
200namespace detail {
201class ExecutorBase;
202
203template <typename F>
204void callFunctor(void* ptr) {
205 (*static_cast<F*>(ptr))();
206}
207
208template <typename F>
209void destroyFunctor(void* ptr) {
210 static_cast<F*>(ptr)->~F();
211 constexpr size_t kFuncSize = static_cast<size_t>(dispenso::detail::nextPow2(sizeof(F)));
213}
214
215} // namespace detail
216
217namespace dispenso {
221class Node {
222 public:
223 Node() = delete;
224 Node(const Node&) = delete;
225 Node& operator=(const Node&) = delete;
227 : numIncompletePredecessors_(other.numIncompletePredecessors_.load()),
228 numPredecessors_(other.numPredecessors_),
229 invoke_(other.invoke_),
230 destroy_(other.destroy_),
231 funcBuffer_(other.funcBuffer_),
232 dependents_(std::move(other.dependents_)) {
233 other.funcBuffer_ = nullptr;
234 }
235 ~Node() {
236 if (funcBuffer_) {
237 destroy_(funcBuffer_);
238 }
239 }
245 template <typename... Ns>
246 inline void dependsOn(Ns&... nodes) {
247 ((void)std::initializer_list<int>{(dependsOnOneNode(nodes), 0)...});
248 }
253 inline void run() const {
254 invoke_(funcBuffer_);
255 numIncompletePredecessors_.store(kCompleted, std::memory_order_release);
256 }
262 template <class F>
263 inline void forEachDependent(F&& func) const {
264 for (const Node* dependent : dependents_) {
265 func(*dependent);
266 }
267 }
273 template <class F>
274 inline void forEachDependent(F&& func) {
275 for (Node* dependent : dependents_) {
276 func(*dependent);
277 }
278 }
282 inline size_t numPredecessors() const {
283 return numPredecessors_;
284 }
291 inline bool isCompleted() const {
292 return numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted;
293 }
301 inline bool setIncomplete() const {
302 if (numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted) {
303 numIncompletePredecessors_.store(0, std::memory_order_relaxed);
304 return true;
305 }
306 return false;
307 }
308
314 inline void setCompleted() const {
315 numIncompletePredecessors_.store(kCompleted, std::memory_order_relaxed);
316 }
317
318 protected:
320 Node(F&& f) : numIncompletePredecessors_(0) {
321 using FNoRef = typename std::remove_reference<F>::type;
322
323 constexpr size_t kFuncSize = static_cast<size_t>(detail::nextPow2(sizeof(FNoRef)));
324 funcBuffer_ = allocSmallBuffer<kFuncSize>();
325 new (funcBuffer_) FNoRef(std::forward<F>(f));
326 invoke_ = ::detail::callFunctor<FNoRef>;
327 destroy_ = ::detail::destroyFunctor<FNoRef>;
328 }
329
330 void dependsOnOneNode(Node& node) {
331 node.dependents_.emplace_back(this);
332 numPredecessors_++;
333 }
334
335 static constexpr size_t kCompleted = std::numeric_limits<size_t>::max();
336 mutable std::atomic<size_t> numIncompletePredecessors_;
337 size_t numPredecessors_ = 0;
338
339 private:
340 using InvokerType = void (*)(void* ptr);
341
342 InvokerType invoke_;
343 InvokerType destroy_;
344 char* funcBuffer_;
345
346 std::vector<Node*> dependents_; // nodes depend on this
347
348 template <class N>
349 friend class SubgraphT;
350 friend class ::detail::ExecutorBase;
351 template <typename G>
352 friend void setAllNodesIncomplete(const G& graph);
353};
358class BiPropNode : public Node {
359 public:
360 BiPropNode() = delete;
361 BiPropNode(const BiPropNode&) = delete;
362 BiPropNode& operator=(const BiPropNode&) = delete;
364 : Node(std::move(other)), biPropSet_(std::move(other.biPropSet_)) {}
371 template <class... Ns>
372 inline void biPropDependsOn(Ns&... nodes) {
373 ((void)std::initializer_list<int>{(biPropDependsOnOneNode(nodes), 0)...});
374 }
381 inline bool isSameSet(const BiPropNode& node) const {
382 return biPropSet_ && biPropSet_ == node.biPropSet_;
383 }
384
385 private:
387 BiPropNode(T&& f) : Node(std::forward<T>(f)) {}
388 inline void removeFromBiPropSet() {
389 if (biPropSet_ != nullptr) {
390 auto it = std::find(biPropSet_->begin(), biPropSet_->end(), this);
391 if (it != biPropSet_->end()) {
392 biPropSet_->erase(it);
393 }
394 }
395 }
396
397 DISPENSO_DLL_ACCESS void biPropDependsOnOneNode(BiPropNode& node);
398
399 std::shared_ptr<std::vector<const BiPropNode*>> biPropSet_;
400
401 template <class N>
402 friend class SubgraphT;
403 friend class ::detail::ExecutorBase;
404};
405
406template <class N>
407class GraphT;
408
409template <class N>
410class DISPENSO_DLL_ACCESS SubgraphT {
411 public:
412 using NodeType = N;
413 SubgraphT() = delete;
414 SubgraphT(const SubgraphT<N>&) = delete;
415 SubgraphT<N>& operator=(const SubgraphT<N>&) = delete;
417 : graph_(other.graph_),
418 nodes_(std::move(other.nodes_)),
419 allocator_(std::move(other.allocator_)) {}
420 ~SubgraphT();
427 template <class T>
428 N& addNode(T&& f) {
429 nodes_.push_back(new (allocator_->alloc()) NodeType(std::forward<T>(f)));
430 return *nodes_.back();
431 }
435 size_t numNodes() const {
436 return nodes_.size();
437 }
443 const N& node(size_t index) const {
444 return *nodes_[index];
445 }
451 N& node(size_t index) {
452 return *nodes_[index];
453 }
459 template <class F>
460 inline void forEachNode(F&& func) const {
461 for (const N* node : nodes_) {
462 func(*node);
463 }
464 }
471 template <class F>
472 inline void forEachNode(F&& func) {
473 for (N* node : nodes_) {
474 func(*node);
475 }
476 }
481 void clear();
482
483 private:
484 using DeallocFunc = void (*)(NoLockPoolAllocator*);
485 using PoolPtr = std::unique_ptr<NoLockPoolAllocator, DeallocFunc>;
486
487 static constexpr size_t kNodeSizeP2 = detail::nextPow2(sizeof(NodeType));
488
489 explicit SubgraphT(GraphT<N>* graph) : graph_(graph), nodes_(), allocator_(getAllocator()) {}
490
491 inline void removeNodeFromBiPropSet(Node* /* node */) {}
492 void removeNodeFromBiPropSet(BiPropNode* node) {
493 node->removeFromBiPropSet();
494 }
495 void decrementDependentCounters();
496 size_t markNodesWithPredicessors();
497 void removePredecessorDependencies(size_t numGraphPredecessors);
498
499 void destroyNodes();
500
501 static PoolPtr getAllocator();
502 static void releaseAllocator(NoLockPoolAllocator* ptr);
503
504 GraphT<N>* graph_;
505#if defined(_WIN32) && !defined(__MINGW32__)
506#pragma warning(push)
507#pragma warning(disable : 4251)
508#endif
509 std::vector<N*> nodes_;
510
511 PoolPtr allocator_;
512#if defined(_WIN32) && !defined(__MINGW32__)
513#pragma warning(pop)
514#endif
515
516 template <class T>
517 friend class GraphT;
518};
519
520template <class N>
521class DISPENSO_DLL_ACCESS GraphT {
522 public:
523 using NodeType = N;
524 using SubgraphType = SubgraphT<N>;
525 GraphT(const GraphT<N>&) = delete;
526 GraphT& operator=(const GraphT<N>&) = delete;
531 subgraphs_.push_back(SubgraphType(this));
532 }
540 GraphT<N>& operator=(GraphT&& other) noexcept;
547 template <class T>
548 N& addNode(T&& f) {
549 return subgraphs_[0].addNode(std::forward<T>(f));
550 }
554 size_t numNodes() const {
555 return subgraphs_[0].numNodes();
556 }
562 const N& node(size_t index) const {
563 return subgraphs_[0].node(index);
564 }
570 N& node(size_t index) {
571 return subgraphs_[0].node(index);
572 }
576 SubgraphT<N>& addSubgraph();
580 size_t numSubgraphs() const {
581 return subgraphs_.size();
582 }
588 const SubgraphT<N>& subgraph(size_t index) const {
589 return subgraphs_[index];
590 }
597 return subgraphs_[index];
598 }
604 template <class F>
605 inline void forEachSubgraph(F&& func) const {
606 for (const SubgraphT<N>& subgraph : subgraphs_) {
607 func(subgraph);
608 }
609 }
615 template <class F>
616 inline void forEachSubgraph(F&& func) {
617 for (SubgraphT<N>& subgraph : subgraphs_) {
618 func(subgraph);
619 }
620 }
627 template <class F>
628 inline void forEachNode(F&& func) const {
629 for (const SubgraphT<N>& subgraph : subgraphs_) {
630 for (const N* node : subgraph.nodes_) {
631 func(*node);
632 }
633 }
634 }
640 template <class F>
641 inline void forEachNode(F&& func) {
642 for (SubgraphT<N>& subgraph : subgraphs_) {
643 for (N* node : subgraph.nodes_) {
644 func(*node);
645 }
646 }
647 }
651 inline void clear() {
652 subgraphs_.clear();
653 subgraphs_.push_back(SubgraphType(this));
654 }
658 inline void clearSubgraphs() {
659 for (SubgraphT<N>& subgraph : subgraphs_) {
660 subgraph.destroyNodes();
661 }
662 }
663
664 private:
665 static constexpr size_t kSubgraphSizeP2 = detail::nextPow2(sizeof(SubgraphType));
666
667#if defined(_WIN32) && !defined(__MINGW32__)
668#pragma warning(push)
669#pragma warning(disable : 4251)
670#endif
671 std::deque<SubgraphT<N>> subgraphs_;
672#if defined(_WIN32) && !defined(__MINGW32__)
673#pragma warning(pop)
674#endif
675
676 template <class T>
677 friend class SubgraphT;
678};
679
680using Graph = GraphT<Node>;
681using BiPropGraph = GraphT<BiPropNode>;
682
683using Subgraph = SubgraphT<Node>;
684using BiPropSubgraph = SubgraphT<BiPropNode>;
685} // namespace dispenso
bool isSameSet(const BiPropNode &node) const
Definition graph.h:381
void biPropDependsOn(Ns &... nodes)
Definition graph.h:372
N & addNode(T &&f)
Definition graph.h:548
void forEachNode(F &&func) const
Definition graph.h:628
void clear()
Definition graph.h:651
void forEachNode(F &&func)
Definition graph.h:641
void forEachSubgraph(F &&func) const
Definition graph.h:605
size_t numSubgraphs() const
Definition graph.h:580
void forEachSubgraph(F &&func)
Definition graph.h:616
N & node(size_t index)
Definition graph.h:570
const SubgraphT< N > & subgraph(size_t index) const
Definition graph.h:588
const N & node(size_t index) const
Definition graph.h:562
SubgraphT< N > & subgraph(size_t index)
Definition graph.h:596
void clearSubgraphs()
Definition graph.h:658
size_t numNodes() const
Definition graph.h:554
bool isCompleted() const
Definition graph.h:291
bool setIncomplete() const
Definition graph.h:301
void setCompleted() const
Definition graph.h:314
void forEachDependent(F &&func) const
Definition graph.h:263
void forEachDependent(F &&func)
Definition graph.h:274
void dependsOn(Ns &... nodes)
Definition graph.h:246
size_t numPredecessors() const
Definition graph.h:282
void run() const
Definition graph.h:253
const N & node(size_t index) const
Definition graph.h:443
N & addNode(T &&f)
Definition graph.h:428
void forEachNode(F &&func) const
Definition graph.h:460
N & node(size_t index)
Definition graph.h:451
size_t numNodes() const
Definition graph.h:435
void forEachNode(F &&func)
Definition graph.h:472
detail::OpResult< T > OpResult
Definition pipeline.h:29