dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
graph.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <atomic>
17#include <deque>
18#include <limits>
19#include <memory>
20#include <type_traits>
21#include <vector>
22
23#include <dispenso/platform.h>
26/*
27Terminology
28--------------------------------------------------------------------------------
29The Node depends on the predecessor. The dependent depends on the node.
30~~~
31┌─────────────┐ ┌──────┐ ┌───────────┐
32│ predecessor │ ──▶ │ node │ ──▶ │ dependent │
33└─────────────┘ └──────┘ └───────────┘
34~~~
35
36Graph construction
37--------------------------------------------------------------------------------
38The Graph class can be used to created tasks with dependencies and execute it once.
39Graphs must not contain cycles.
40Example:
41~~~
42//
43// ┌────────────┐ ┌───────────────┐ ┌───────────────────┐
44// │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
45// └────────────┘ └───────────────┘ └───────────────────┘
46// ▲
47// ┌────────────┐ ┌───────────────┐ │
48// │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
49// └────────────┘ └───────────────┘
50
51std::array<float, 5> r;
52
53dispenso::Graph graph;
54
55dispenso::Node& N0 = graph.addNode([&]() { r[0] += 1; });
56dispenso::Node& N2 = graph.addNode([&]() { r[2] += 8; });
57dispenso::Node& N1 = graph.addNode([&]() { r[1] += r[0] * 2; });
58dispenso::Node& N3 = graph.addNode([&]() { r[3] += r[2] / 2; });
59dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
60
61N4.dependsOn(N1, N3);
62N1.dependsOn(N0);
63N3.dependsOn(N2);
64
65dispenso::TaskSet taskSet(dispenso::globalThreadPool());
66dispenso::ParallelForExecutor parallelForExecutor;
67parallelForExecutor(taskSet, graph);
68~~~
69
70Partial revaluation
71--------------------------------------------------------------------------------
72If graph is big or we need to recompute graph partially we can execute it again.
73After execution of the graph all nodes change their state from "incomplete" to
74"completed". If order to evaluate whole graph again we can use function `setAllNodesIncomplete`
75Example:
76~~~
77r = {0, 0, 0, 0, 0};
78setAllNodesIncomplete(graph);
79parallelForExecutor(taskSet, graph);
80~~~
81
82The graph can be recomputed partially if we have new input data for one or several nodes in the
83graph. It order to do it we need to call `setIncomplete()` method for every node which we need to
84recompute and after use functor `ForwardPropagator` to mark as "incomplete" all dependents.
85
86Example:
87~~~
88N1.setIncomplete();
89r[1] = r[4] = 0;
90ForwardPropagator forwardPropagator;
91forwardPropagator(graph);
92evaluateGraph(graph);
93~~~
94In this exaple only node 1 and 4 will be invoked.
95
96 Subgraphs
97--------------------------------------------------------------------------------
98It is possible to organize nodes into subgraphs and destroy and recreate if we have static and
99dynamic parts of the computation graph
100Example:
101~~~
102//
103// ∙----subgraph1---∙ ∙---subgraph2-------∙
104// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ ┌───────────────────┐
105// ¦ │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
106// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦ └───────────────────┘
107// ¦ ¦ ¦ ¦ ▲
108// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ │
109// ¦ │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
110// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦
111// ∙----------------∙ ∙-------------------∙
112std::array<float, 5> r;
113dispenso::Graph graph;
114
115dispenso::Subgraph& subgraph1 = graph.addSubgraph();
116dispenso::Subgraph& subgraph2 = graph.addSubgraph();
117
118dispenso::Node& N0 = subgraph1.addNode([&]() { r[0] += 1; });
119dispenso::Node& N2 = subgraph1.addNode([&]() { r[2] += 8; });
120dispenso::Node& N1 = subgraph2.addNode([&]() { r[1] += r[0] * 2; });
121dispenso::Node& N3 = subgraph2.addNode([&]() { r[3] += r[2] / 2; });
122dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
123
124N4.dependsOn(N1, N3);
125N1.dependsOn(N0);
126N3.dependsOn(N2);
127
128// evaluate graph first time
129r = {0, 0, 0, 0, 0};
130dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
131dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
132concurrentTaskSetExecutor(concurrentTaskSet, graph);
133
134// disconnect and destroy nodes of subgraph2
135// it invalidates node references/pointers of this subgraph
136subgraph2.clear();
137
138// create another nodes
139dispenso::Node& newN1 = subgraph2.addNode([&]() { r[1] += r[0] * 20; });
140dispenso::Node& newN3 = subgraph2.addNode([&]() { r[3] += r[2] / 20; });
141newN1.dependsOn(N0);
142newN3.dependsOn(N2);
143N4.dependsOn(newN1, newN3);
144
145// and revaluae the graph
146setAllNodesIncomplete(movedGraph);
147concurrentTaskSetExecutor(concurrentTaskSet, graph);
148~~~
149
150Bidirectional propagation dependency
151--------------------------------------------------------------------------------
152In certain scenarios, nodes may alter the same memory. In such instances, it becomes necessary to
153compute the predecessors of the node, even if they possess a "completed" state following state
154propagation. To facilitate this process automatically, we introduce the notion of a bidirectional
155propagation dependency (`BiProp`).
156
157Example:
158~~~
159// ┌─────────────────┐
160// │ 3: m3+=b*b │
161// └─────────────────┘
162// ▲
163// ┌−-----------−−−−−−−−−│−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−┐
164// ╎ ┌───────────┐ ┌─────────────────┐ ┌────────────┐ ╎
165// ╎ │ 0: b+=5 │ ──▷ │ 1: b*=5 │ ──▷ │ 2: b/=m4 │ ╎
166// ╎ └───────────┘ └─────────────────┘ └────────────┘ ╎
167// └−−−-−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−▲−−−−−−−−−−−−┘
168// Legend: │
169// ──▶ Normal dependency ┌────────────┐
170// ──▷ Bidirectional propagation dependency │ 4: m4+=2 │
171// m4 variable modified only in node 4 └────────────┘
172
173float b, m3, m4
174dispenso::BiPropGraph g;
175std::array<dispenso::BiPropNode*, 8> N;
176
177dispenso::BiPropNode& N0 = g.addNode([&]() { b += 5; });
178dispenso::BiPropNode& N1 = g.addNode([&]() { b *= 5; });
179dispenso::BiPropNode& N2 = g.addNode([&]() { b /= m4; });
180dispenso::BiPropNode& N3 = g.addNode([&]() { m3 += b*b; });
181dispenso::BiPropNode& N4 = g.addNode([&]() { m4 += 2; });
182
183N3.dependsOn(N1);
184N2.dependsOn(N4);
185N2.biPropDependsOn(N1);
186N1.biPropDependsOn(N0);
187
188// first execution
189b = m3 = m4 = 0.f;
190dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
191dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
192concurrentTaskSetExecutor(concurrentTaskSet, g);
193
194N[4].setIncomplete();
195// if node 4 is incomplete after propagation node 2 become incomplete. Taking in account that node 2
196// bidirectionally depends on nodes 0 and 1 they will be marked as incomplete as well
197b = m4 = 0.f;
198ForwardPropagator forwardPropagator;
199forwardPropagator(g);
200concurrentTaskSetExecutor(concurrentTaskSet, g);
201~~~
202
203Please read tests from `graph_test.cpp` for more examples.
204*/
205
206namespace detail {
207class ExecutorBase;
208
209template <typename F>
210void callFunctor(void* ptr) {
211 (*static_cast<F*>(ptr))();
212}
213
214template <typename F>
215void destroyFunctor(void* ptr) {
216 static_cast<F*>(ptr)->~F();
217 constexpr size_t kFuncSize = static_cast<size_t>(dispenso::detail::nextPow2(sizeof(F)));
218 dispenso::deallocSmallBuffer<kFuncSize>(ptr);
219}
220
221} // namespace detail
222
223namespace dispenso {
227class Node {
228 public:
229 Node() = delete;
230 Node(const Node&) = delete;
231 Node& operator=(const Node&) = delete;
232 Node(Node&& other) noexcept
233 : numIncompletePredecessors_(other.numIncompletePredecessors_.load()),
234 numPredecessors_(other.numPredecessors_),
235 invoke_(other.invoke_),
236 destroy_(other.destroy_),
237 funcBuffer_(other.funcBuffer_),
238 dependents_(std::move(other.dependents_)) {
239 other.funcBuffer_ = nullptr;
240 }
241 ~Node() {
242 if (funcBuffer_) {
243 destroy_(funcBuffer_);
244 }
245 }
251 template <typename... Ns>
252 inline void dependsOn(Ns&... nodes) {
253 ((void)std::initializer_list<int>{(dependsOnOneNode(nodes), 0)...});
254 }
259 inline void run() const {
260 invoke_(funcBuffer_);
261 numIncompletePredecessors_.store(kCompleted, std::memory_order_release);
262 }
268 template <class F>
269 inline void forEachDependent(F&& func) const {
270 for (const Node* dependent : dependents_) {
271 func(*dependent);
272 }
273 }
279 template <class F>
280 inline void forEachDependent(F&& func) {
281 for (Node* dependent : dependents_) {
282 func(*dependent);
283 }
284 }
288 inline size_t numPredecessors() const {
289 return numPredecessors_;
290 }
297 inline bool isCompleted() const {
298 return numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted;
299 }
307 inline bool setIncomplete() const {
308 if (numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted) {
309 numIncompletePredecessors_.store(0, std::memory_order_relaxed);
310 return true;
311 }
312 return false;
313 }
314
320 inline void setCompleted() const {
321 numIncompletePredecessors_.store(kCompleted, std::memory_order_relaxed);
322 }
323
324 protected:
325 template <class F, class X = std::enable_if_t<!std::is_base_of<Node, F>::value, void>>
326 Node(F&& f) : numIncompletePredecessors_(0) {
327 using FNoRef = typename std::remove_reference<F>::type;
328
329 constexpr size_t kFuncSize = static_cast<size_t>(detail::nextPow2(sizeof(FNoRef)));
330 funcBuffer_ = allocSmallBuffer<kFuncSize>();
331 new (funcBuffer_) FNoRef(std::forward<F>(f));
332 invoke_ = ::detail::callFunctor<FNoRef>;
333 destroy_ = ::detail::destroyFunctor<FNoRef>;
334 }
335
336 void dependsOnOneNode(Node& node) {
337 node.dependents_.emplace_back(this);
338 numPredecessors_++;
339 }
340
341 static constexpr size_t kCompleted = std::numeric_limits<size_t>::max();
342 mutable std::atomic<size_t> numIncompletePredecessors_;
343 size_t numPredecessors_ = 0;
344
345 private:
346 using InvokerType = void (*)(void* ptr);
347
348 InvokerType invoke_;
349 InvokerType destroy_;
350 char* funcBuffer_;
351
352 std::vector<Node*> dependents_; // nodes depend on this
353
354 template <class N>
355 friend class SubgraphT;
356 friend class ::detail::ExecutorBase;
357 template <typename G>
358 friend void setAllNodesIncomplete(const G& graph);
359};
364class BiPropNode : public Node {
365 public:
366 BiPropNode() = delete;
367 BiPropNode(const BiPropNode&) = delete;
368 BiPropNode& operator=(const BiPropNode&) = delete;
369 BiPropNode(BiPropNode&& other) noexcept
370 : Node(std::move(other)), biPropSet_(std::move(other.biPropSet_)) {}
377 template <class... Ns>
378 inline void biPropDependsOn(Ns&... nodes) {
379 ((void)std::initializer_list<int>{(biPropDependsOnOneNode(nodes), 0)...});
380 }
387 inline bool isSameSet(const BiPropNode& node) const {
388 return biPropSet_ && biPropSet_ == node.biPropSet_;
389 }
390
391 private:
392 template <class T, class X = std::enable_if_t<!std::is_base_of<BiPropNode, T>::value, void>>
393 BiPropNode(T&& f) : Node(std::forward<T>(f)) {}
394 inline void removeFromBiPropSet() {
395 if (biPropSet_ != nullptr) {
396 auto it = std::find(biPropSet_->begin(), biPropSet_->end(), this);
397 if (it != biPropSet_->end()) {
398 biPropSet_->erase(it);
399 }
400 }
401 }
402
403 DISPENSO_DLL_ACCESS void biPropDependsOnOneNode(BiPropNode& node);
404
405 std::shared_ptr<std::vector<const BiPropNode*>> biPropSet_;
406
407 template <class N>
408 friend class SubgraphT;
409 friend class ::detail::ExecutorBase;
410};
411
412template <class N>
413class GraphT;
414
415template <class N>
416class DISPENSO_DLL_ACCESS SubgraphT {
417 public:
418 using NodeType = N;
419 SubgraphT() = delete;
420 SubgraphT(const SubgraphT<N>&) = delete;
421 SubgraphT<N>& operator=(const SubgraphT<N>&) = delete;
422 SubgraphT(SubgraphT<N>&& other) noexcept
423 : graph_(other.graph_),
424 nodes_(std::move(other.nodes_)),
425 allocator_(std::move(other.allocator_)) {}
426 ~SubgraphT();
433 template <class T>
434 N& addNode(T&& f) {
435 nodes_.push_back(new (allocator_->alloc()) NodeType(std::forward<T>(f)));
436 return *nodes_.back();
437 }
441 size_t numNodes() const {
442 return nodes_.size();
443 }
449 const N& node(size_t index) const {
450 return *nodes_[index];
451 }
457 N& node(size_t index) {
458 return *nodes_[index];
459 }
465 template <class F>
466 inline void forEachNode(F&& func) const {
467 for (const N* node : nodes_) {
468 func(*node);
469 }
470 }
477 template <class F>
478 inline void forEachNode(F&& func) {
479 for (N* node : nodes_) {
480 func(*node);
481 }
482 }
487 void clear();
488
489 private:
490 using DeallocFunc = void (*)(NoLockPoolAllocator*);
491 using PoolPtr = std::unique_ptr<NoLockPoolAllocator, DeallocFunc>;
492
493 static constexpr size_t kNodeSizeP2 = detail::nextPow2(sizeof(NodeType));
494
495 explicit SubgraphT(GraphT<N>* graph) : graph_(graph), nodes_(), allocator_(getAllocator()) {}
496
497 inline void removeNodeFromBiPropSet(Node* /* node */) {}
498 void removeNodeFromBiPropSet(BiPropNode* node) {
499 node->removeFromBiPropSet();
500 }
501 void decrementDependentCounters();
502 size_t markNodesWithPredicessors();
503 void removePredecessorDependencies(size_t numGraphPredecessors);
504
505 void destroyNodes();
506
507 static PoolPtr getAllocator();
508 static void releaseAllocator(NoLockPoolAllocator* ptr);
509
510 GraphT<N>* graph_;
511#if defined(_WIN32) && !defined(__MINGW32__)
512#pragma warning(push)
513#pragma warning(disable : 4251)
514#endif
515 std::vector<N*> nodes_;
516
517 PoolPtr allocator_;
518#if defined(_WIN32) && !defined(__MINGW32__)
519#pragma warning(pop)
520#endif
521
522 template <class T>
523 friend class GraphT;
524};
525
526template <class N>
527class DISPENSO_DLL_ACCESS GraphT {
528 public:
529 using NodeType = N;
531 GraphT(const GraphT<N>&) = delete;
532 GraphT& operator=(const GraphT<N>&) = delete;
537 subgraphs_.push_back(SubgraphType(this));
538 }
542 GraphT(GraphT<N>&& other);
546 GraphT<N>& operator=(GraphT&& other) noexcept;
553 template <class T>
554 N& addNode(T&& f) {
555 return subgraphs_[0].addNode(std::forward<T>(f));
556 }
560 size_t numNodes() const {
561 return subgraphs_[0].numNodes();
562 }
568 const N& node(size_t index) const {
569 return subgraphs_[0].node(index);
570 }
576 N& node(size_t index) {
577 return subgraphs_[0].node(index);
578 }
582 SubgraphT<N>& addSubgraph();
586 size_t numSubgraphs() const {
587 return subgraphs_.size();
588 }
594 const SubgraphT<N>& subgraph(size_t index) const {
595 return subgraphs_[index];
596 }
602 SubgraphT<N>& subgraph(size_t index) {
603 return subgraphs_[index];
604 }
610 template <class F>
611 inline void forEachSubgraph(F&& func) const {
612 for (const SubgraphT<N>& subgraph : subgraphs_) {
613 func(subgraph);
614 }
615 }
621 template <class F>
622 inline void forEachSubgraph(F&& func) {
623 for (SubgraphT<N>& subgraph : subgraphs_) {
624 func(subgraph);
625 }
626 }
633 template <class F>
634 inline void forEachNode(F&& func) const {
635 for (const SubgraphT<N>& subgraph : subgraphs_) {
636 for (const N* node : subgraph.nodes_) {
637 func(*node);
638 }
639 }
640 }
646 template <class F>
647 inline void forEachNode(F&& func) {
648 for (SubgraphT<N>& subgraph : subgraphs_) {
649 for (N* node : subgraph.nodes_) {
650 func(*node);
651 }
652 }
653 }
657 inline void clear() {
658 subgraphs_.clear();
659 subgraphs_.push_back(SubgraphType(this));
660 }
664 inline void clearSubgraphs() {
665 for (SubgraphT<N>& subgraph : subgraphs_) {
666 subgraph.destroyNodes();
667 }
668 }
669
670 private:
671 static constexpr size_t kSubgraphSizeP2 = detail::nextPow2(sizeof(SubgraphType));
672
673#if defined(_WIN32) && !defined(__MINGW32__)
674#pragma warning(push)
675#pragma warning(disable : 4251)
676#endif
677 std::deque<SubgraphT<N>> subgraphs_;
678#if defined(_WIN32) && !defined(__MINGW32__)
679#pragma warning(pop)
680#endif
681
682 template <class T>
683 friend class SubgraphT;
684};
685
686using Graph = GraphT<Node>;
687using BiPropGraph = GraphT<BiPropNode>;
688
689using Subgraph = SubgraphT<Node>;
690using BiPropSubgraph = SubgraphT<BiPropNode>;
691} // namespace dispenso
bool isSameSet(const BiPropNode &node) const
Definition graph.h:387
void biPropDependsOn(Ns &... nodes)
Definition graph.h:378
N & addNode(T &&f)
Definition graph.h:554
void forEachNode(F &&func) const
Definition graph.h:634
void clear()
Definition graph.h:657
void forEachNode(F &&func)
Definition graph.h:647
void forEachSubgraph(F &&func) const
Definition graph.h:611
size_t numSubgraphs() const
Definition graph.h:586
void forEachSubgraph(F &&func)
Definition graph.h:622
N & node(size_t index)
Definition graph.h:576
const SubgraphT< N > & subgraph(size_t index) const
Definition graph.h:594
const N & node(size_t index) const
Definition graph.h:568
SubgraphT< N > & subgraph(size_t index)
Definition graph.h:602
void clearSubgraphs()
Definition graph.h:664
size_t numNodes() const
Definition graph.h:560
bool isCompleted() const
Definition graph.h:297
bool setIncomplete() const
Definition graph.h:307
void setCompleted() const
Definition graph.h:320
void forEachDependent(F &&func) const
Definition graph.h:269
void forEachDependent(F &&func)
Definition graph.h:280
void dependsOn(Ns &... nodes)
Definition graph.h:252
size_t numPredecessors() const
Definition graph.h:288
void run() const
Definition graph.h:259
const N & node(size_t index) const
Definition graph.h:449
N & addNode(T &&f)
Definition graph.h:434
void forEachNode(F &&func) const
Definition graph.h:466
N & node(size_t index)
Definition graph.h:457
size_t numNodes() const
Definition graph.h:441
void forEachNode(F &&func)
Definition graph.h:478