dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
graph.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <atomic>
17#include <deque>
18#include <limits>
19#include <memory>
20#include <type_traits>
21#include <vector>
22
24#include <dispenso/platform.h>
27
28/*
29Terminology
30--------------------------------------------------------------------------------
31The Node depends on the predecessor. The dependent depends on the node.
32~~~
33┌─────────────┐ ┌──────┐ ┌───────────┐
34│ predecessor │ ──▶ │ node │ ──▶ │ dependent │
35└─────────────┘ └──────┘ └───────────┘
36~~~
37
38Graph construction
39--------------------------------------------------------------------------------
40The Graph class can be used to created tasks with dependencies and execute it once.
41Graphs must not contain cycles.
42Example:
43~~~
44//
45// ┌────────────┐ ┌───────────────┐ ┌───────────────────┐
46// │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
47// └────────────┘ └───────────────┘ └───────────────────┘
48// ▲
49// ┌────────────┐ ┌───────────────┐ │
50// │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
51// └────────────┘ └───────────────┘
52
53std::array<float, 5> r;
54
55dispenso::Graph graph;
56
57dispenso::Node& N0 = graph.addNode([&]() { r[0] += 1; });
58dispenso::Node& N2 = graph.addNode([&]() { r[2] += 8; });
59dispenso::Node& N1 = graph.addNode([&]() { r[1] += r[0] * 2; });
60dispenso::Node& N3 = graph.addNode([&]() { r[3] += r[2] / 2; });
61dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
62
63N4.dependsOn(N1, N3);
64N1.dependsOn(N0);
65N3.dependsOn(N2);
66
67dispenso::TaskSet taskSet(dispenso::globalThreadPool());
68dispenso::ParallelForExecutor parallelForExecutor;
69parallelForExecutor(taskSet, graph);
70~~~
71
72Partial revaluation
73--------------------------------------------------------------------------------
74If graph is big or we need to recompute graph partially we can execute it again.
75After execution of the graph all nodes change their state from "incomplete" to
76"completed". If order to evaluate whole graph again we can use function `setAllNodesIncomplete`
77Example:
78~~~
79r = {0, 0, 0, 0, 0};
80setAllNodesIncomplete(graph);
81parallelForExecutor(taskSet, graph);
82~~~
83
84The graph can be recomputed partially if we have new input data for one or several nodes in the
85graph. It order to do it we need to call `setIncomplete()` method for every node which we need to
86recompute and after use functor `ForwardPropagator` to mark as "incomplete" all dependents.
87
88Example:
89~~~
90N1.setIncomplete();
91r[1] = r[4] = 0;
92ForwardPropagator forwardPropagator;
93forwardPropagator(graph);
94evaluateGraph(graph);
95~~~
96In this exaple only node 1 and 4 will be invoked.
97
98 Subgraphs
99--------------------------------------------------------------------------------
100It is possible to organize nodes into subgraphs and destroy and recreate if we have static and
101dynamic parts of the computation graph
102Example:
103~~~
104//
105// ∙----subgraph1---∙ ∙---subgraph2-------∙
106// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ ┌───────────────────┐
107// ¦ │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
108// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦ └───────────────────┘
109// ¦ ¦ ¦ ¦ ▲
110// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ │
111// ¦ │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
112// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦
113// ∙----------------∙ ∙-------------------∙
114std::array<float, 5> r;
115dispenso::Graph graph;
116
117dispenso::Subgraph& subgraph1 = graph.addSubgraph();
118dispenso::Subgraph& subgraph2 = graph.addSubgraph();
119
120dispenso::Node& N0 = subgraph1.addNode([&]() { r[0] += 1; });
121dispenso::Node& N2 = subgraph1.addNode([&]() { r[2] += 8; });
122dispenso::Node& N1 = subgraph2.addNode([&]() { r[1] += r[0] * 2; });
123dispenso::Node& N3 = subgraph2.addNode([&]() { r[3] += r[2] / 2; });
124dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
125
126N4.dependsOn(N1, N3);
127N1.dependsOn(N0);
128N3.dependsOn(N2);
129
130// evaluate graph first time
131r = {0, 0, 0, 0, 0};
132dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
133dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
134concurrentTaskSetExecutor(concurrentTaskSet, graph);
135
136// disconnect and destroy nodes of subgraph2
137// it invalidates node references/pointers of this subgraph
138subgraph2.clear();
139
140// create another nodes
141dispenso::Node& newN1 = subgraph2.addNode([&]() { r[1] += r[0] * 20; });
142dispenso::Node& newN3 = subgraph2.addNode([&]() { r[3] += r[2] / 20; });
143newN1.dependsOn(N0);
144newN3.dependsOn(N2);
145N4.dependsOn(newN1, newN3);
146
147// and revaluae the graph
148setAllNodesIncomplete(movedGraph);
149concurrentTaskSetExecutor(concurrentTaskSet, graph);
150~~~
151
152Bidirectional propagation dependency
153--------------------------------------------------------------------------------
154In certain scenarios, nodes may alter the same memory. In such instances, it becomes necessary to
155compute the predecessors of the node, even if they possess a "completed" state following state
156propagation. To facilitate this process automatically, we introduce the notion of a bidirectional
157propagation dependency (`BiProp`).
158
159Example:
160~~~
161// ┌─────────────────┐
162// │ 3: m3+=b*b │
163// └─────────────────┘
164// ▲
165// ┌−-----------−−−−−−−−−│−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−┐
166// ╎ ┌───────────┐ ┌─────────────────┐ ┌────────────┐ ╎
167// ╎ │ 0: b+=5 │ ──▷ │ 1: b*=5 │ ──▷ │ 2: b/=m4 │ ╎
168// ╎ └───────────┘ └─────────────────┘ └────────────┘ ╎
169// └−−−-−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−▲−−−−−−−−−−−−┘
170// Legend: │
171// ──▶ Normal dependency ┌────────────┐
172// ──▷ Bidirectional propagation dependency │ 4: m4+=2 │
173// m4 variable modified only in node 4 └────────────┘
174
175float b, m3, m4
176dispenso::BiPropGraph g;
177std::array<dispenso::BiPropNode*, 8> N;
178
179dispenso::BiPropNode& N0 = g.addNode([&]() { b += 5; });
180dispenso::BiPropNode& N1 = g.addNode([&]() { b *= 5; });
181dispenso::BiPropNode& N2 = g.addNode([&]() { b /= m4; });
182dispenso::BiPropNode& N3 = g.addNode([&]() { m3 += b*b; });
183dispenso::BiPropNode& N4 = g.addNode([&]() { m4 += 2; });
184
185N3.dependsOn(N1);
186N2.dependsOn(N4);
187N2.biPropDependsOn(N1);
188N1.biPropDependsOn(N0);
189
190// first execution
191b = m3 = m4 = 0.f;
192dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
193dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
194concurrentTaskSetExecutor(concurrentTaskSet, g);
195
196N[4].setIncomplete();
197// if node 4 is incomplete after propagation node 2 become incomplete. Taking in account that node 2
198// bidirectionally depends on nodes 0 and 1 they will be marked as incomplete as well
199b = m4 = 0.f;
200ForwardPropagator forwardPropagator;
201forwardPropagator(g);
202concurrentTaskSetExecutor(concurrentTaskSet, g);
203~~~
204
205Please read tests from `graph_test.cpp` for more examples.
206*/
207
208namespace detail {
209class ExecutorBase;
210
211template <typename F>
212void callFunctor(void* ptr) {
213 (*static_cast<F*>(ptr))();
214}
215
216template <typename F>
217void destroyFunctor(void* ptr) {
218 static_cast<F*>(ptr)->~F();
219 constexpr size_t kFuncSize = static_cast<size_t>(dispenso::detail::nextPow2(sizeof(F)));
220 dispenso::deallocSmallBuffer<kFuncSize>(ptr);
221}
222
223} // namespace detail
224
225namespace dispenso {
229class Node {
230 public:
231 Node() = delete;
232 Node(const Node&) = delete;
233 Node& operator=(const Node&) = delete;
235 Node(Node&& other) noexcept
236 : numIncompletePredecessors_(other.numIncompletePredecessors_.load()),
237 numPredecessors_(other.numPredecessors_),
238 invoke_(other.invoke_),
239 destroy_(other.destroy_),
240 funcBuffer_(other.funcBuffer_),
241 dependents_(std::move(other.dependents_)) {
242 other.funcBuffer_ = nullptr;
243 }
244 ~Node() {
245 if (funcBuffer_) {
246 destroy_(funcBuffer_);
247 }
248 }
254 template <typename... Ns>
255 inline void dependsOn(Ns&... nodes) {
256 ((void)std::initializer_list<int>{(dependsOnOneNode(nodes), 0)...});
257 }
262 inline void run() const {
263 invoke_(funcBuffer_);
264 numIncompletePredecessors_.store(kCompleted, std::memory_order_release);
265 }
271 template <class F>
272 inline void forEachDependent(F&& func) const {
273 for (const Node* dependent : dependents_) {
274 func(*dependent);
275 }
276 }
282 template <class F>
283 inline void forEachDependent(F&& func) {
284 for (Node* dependent : dependents_) {
285 func(*dependent);
286 }
287 }
291 inline size_t numPredecessors() const {
292 return numPredecessors_;
293 }
300 inline bool isCompleted() const {
301 return numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted;
302 }
310 inline bool setIncomplete() const {
311 if (numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted) {
312 numIncompletePredecessors_.store(0, std::memory_order_relaxed);
313 return true;
314 }
315 return false;
316 }
317
323 inline void setCompleted() const {
324 numIncompletePredecessors_.store(kCompleted, std::memory_order_relaxed);
325 }
326
327 protected:
329 template <class F, class X = std::enable_if_t<!std::is_base_of<Node, F>::value, void>>
330 Node(F&& f) : numIncompletePredecessors_(0) {
331 using FNoRef = typename std::remove_reference<F>::type;
332
333 constexpr size_t kFuncSize = static_cast<size_t>(detail::nextPow2(sizeof(FNoRef)));
334 funcBuffer_ = allocSmallBuffer<kFuncSize>();
335 new (funcBuffer_) FNoRef(std::forward<F>(f));
336 invoke_ = ::detail::callFunctor<FNoRef>;
337 destroy_ = ::detail::destroyFunctor<FNoRef>;
338 }
339
340 void dependsOnOneNode(Node& node) {
341 node.dependents_.emplace_back(this);
342 numPredecessors_++;
343 }
344
345 static constexpr size_t kCompleted = std::numeric_limits<size_t>::max();
346 mutable std::atomic<size_t> numIncompletePredecessors_;
347 size_t numPredecessors_ = 0;
348
349 private:
350 using InvokerType = void (*)(void* ptr);
351
352 InvokerType invoke_;
353 InvokerType destroy_;
354 char* funcBuffer_;
355
356 std::vector<Node*> dependents_; // nodes depend on this
357
358 template <class N>
359 friend class SubgraphT;
360 friend class ::detail::ExecutorBase;
361 template <typename G>
362 friend void setAllNodesIncomplete(const G& graph);
363};
368class BiPropNode : public Node {
369 public:
370 BiPropNode() = delete;
371 BiPropNode(const BiPropNode&) = delete;
372 BiPropNode& operator=(const BiPropNode&) = delete;
374 BiPropNode(BiPropNode&& other) noexcept
375 : Node(std::move(other)), biPropSet_(std::move(other.biPropSet_)) {}
382 template <class... Ns>
383 inline void biPropDependsOn(Ns&... nodes) {
384 ((void)std::initializer_list<int>{(biPropDependsOnOneNode(nodes), 0)...});
385 }
392 inline bool isSameSet(const BiPropNode& node) const {
393 return biPropSet_ && biPropSet_ == node.biPropSet_;
394 }
395
396 private:
397 template <class T, class X = std::enable_if_t<!std::is_base_of<BiPropNode, T>::value, void>>
398 BiPropNode(T&& f) : Node(std::forward<T>(f)) {}
399 inline void removeFromBiPropSet() {
400 if (biPropSet_ != nullptr) {
401 auto it = std::find(biPropSet_->begin(), biPropSet_->end(), this);
402 if (it != biPropSet_->end()) {
403 biPropSet_->erase(it);
404 }
405 }
406 }
407
408 DISPENSO_DLL_ACCESS void biPropDependsOnOneNode(BiPropNode& node);
409
410 std::shared_ptr<std::vector<const BiPropNode*>> biPropSet_;
411
412 template <class N>
413 friend class SubgraphT;
414 friend class ::detail::ExecutorBase;
415};
416
417template <class N>
418class GraphT;
419
425template <class N>
426class DISPENSO_DLL_ACCESS SubgraphT {
427 public:
428 using NodeType = N;
429 SubgraphT() = delete;
430 SubgraphT(const SubgraphT<N>&) = delete;
431 SubgraphT<N>& operator=(const SubgraphT<N>&) = delete;
433 SubgraphT(SubgraphT<N>&& other) noexcept
434 : graph_(other.graph_),
435 nodes_(std::move(other.nodes_)),
436 allocator_(std::move(other.allocator_)) {}
437 ~SubgraphT();
444 template <class T>
445 DISPENSO_REQUIRES(OnceCallableFunc<T>)
446 N& addNode(T&& f) {
447 nodes_.push_back(new (allocator_->alloc()) NodeType(std::forward<T>(f)));
448 return *nodes_.back();
449 }
453 size_t numNodes() const {
454 return nodes_.size();
455 }
461 const N& node(size_t index) const {
462 return *nodes_[index];
463 }
469 N& node(size_t index) {
470 return *nodes_[index];
471 }
477 template <class F>
478 inline void forEachNode(F&& func) const {
479 for (const N* node : nodes_) {
480 func(*node);
481 }
482 }
489 template <class F>
490 inline void forEachNode(F&& func) {
491 for (N* node : nodes_) {
492 func(*node);
493 }
494 }
499 void clear();
500
501 private:
502 using DeallocFunc = void (*)(NoLockPoolAllocator*);
503 using PoolPtr = std::unique_ptr<NoLockPoolAllocator, DeallocFunc>;
504
505 static constexpr size_t kNodeSizeP2 = static_cast<size_t>(detail::nextPow2(sizeof(NodeType)));
506
507 explicit SubgraphT(GraphT<N>* graph) : graph_(graph), nodes_(), allocator_(getAllocator()) {}
508
509 inline void removeNodeFromBiPropSet(Node* /* node */) {}
510 void removeNodeFromBiPropSet(BiPropNode* node) {
511 node->removeFromBiPropSet();
512 }
513 void decrementDependentCounters();
514 size_t markNodesWithPredicessors();
515 void removePredecessorDependencies(size_t numGraphPredecessors);
516
517 void destroyNodes();
518
519 static PoolPtr getAllocator();
520 static void releaseAllocator(NoLockPoolAllocator* ptr);
521
522 GraphT<N>* graph_;
523#if defined(_WIN32) && !defined(__MINGW32__)
524#pragma warning(push)
525#pragma warning(disable : 4251)
526#endif
527 std::vector<N*> nodes_;
528
529 PoolPtr allocator_;
530#if defined(_WIN32) && !defined(__MINGW32__)
531#pragma warning(pop)
532#endif
533
534 template <class T>
535 friend class GraphT;
536};
537
547template <class N>
548class DISPENSO_DLL_ACCESS GraphT {
549 public:
550 using NodeType = N;
552 GraphT(const GraphT<N>&) = delete;
553 GraphT& operator=(const GraphT<N>&) = delete;
558 subgraphs_.push_back(SubgraphType(this));
559 }
567 GraphT<N>& operator=(GraphT&& other) noexcept;
574 template <class T>
575 DISPENSO_REQUIRES(OnceCallableFunc<T>)
576 N& addNode(T&& f) {
577 return subgraphs_[0].addNode(std::forward<T>(f));
578 }
582 size_t numNodes() const {
583 return subgraphs_[0].numNodes();
584 }
590 const N& node(size_t index) const {
591 return subgraphs_[0].node(index);
592 }
598 N& node(size_t index) {
599 return subgraphs_[0].node(index);
600 }
608 size_t numSubgraphs() const {
609 return subgraphs_.size();
610 }
616 const SubgraphT<N>& subgraph(size_t index) const {
617 return subgraphs_[index];
618 }
624 SubgraphT<N>& subgraph(size_t index) {
625 return subgraphs_[index];
626 }
632 template <class F>
633 inline void forEachSubgraph(F&& func) const {
634 for (const SubgraphT<N>& subgraph : subgraphs_) {
635 func(subgraph);
636 }
637 }
643 template <class F>
644 inline void forEachSubgraph(F&& func) {
645 for (SubgraphT<N>& subgraph : subgraphs_) {
646 func(subgraph);
647 }
648 }
655 template <class F>
656 inline void forEachNode(F&& func) const {
657 for (const SubgraphT<N>& subgraph : subgraphs_) {
658 for (const N* node : subgraph.nodes_) {
659 func(*node);
660 }
661 }
662 }
668 template <class F>
669 inline void forEachNode(F&& func) {
670 for (SubgraphT<N>& subgraph : subgraphs_) {
671 for (N* node : subgraph.nodes_) {
672 func(*node);
673 }
674 }
675 }
679 inline void clear() {
680 subgraphs_.clear();
681 subgraphs_.push_back(SubgraphType(this));
682 }
686 inline void clearSubgraphs() {
687 for (SubgraphT<N>& subgraph : subgraphs_) {
688 subgraph.destroyNodes();
689 }
690 }
691
692 private:
693 static constexpr size_t kSubgraphSizeP2 =
694 static_cast<size_t>(detail::nextPow2(sizeof(SubgraphType)));
695
696#if defined(_WIN32) && !defined(__MINGW32__)
697#pragma warning(push)
698#pragma warning(disable : 4251)
699#endif
700 std::deque<SubgraphT<N>> subgraphs_;
701#if defined(_WIN32) && !defined(__MINGW32__)
702#pragma warning(pop)
703#endif
704
705 template <class T>
706 friend class SubgraphT;
707};
708
713
718} // namespace dispenso
BiPropNode(BiPropNode &&other) noexcept
Definition graph.h:374
bool isSameSet(const BiPropNode &node) const
Definition graph.h:392
void biPropDependsOn(Ns &... nodes)
Definition graph.h:383
void forEachNode(F &&func) const
Definition graph.h:656
void clear()
Definition graph.h:679
void forEachNode(F &&func)
Definition graph.h:669
void forEachSubgraph(F &&func) const
Definition graph.h:633
size_t numSubgraphs() const
Definition graph.h:608
void forEachSubgraph(F &&func)
Definition graph.h:644
N & node(size_t index)
Definition graph.h:598
const SubgraphT< N > & subgraph(size_t index) const
Definition graph.h:616
GraphT(GraphT< N > &&other)
const N & node(size_t index) const
Definition graph.h:590
SubgraphT< N > & subgraph(size_t index)
Definition graph.h:624
void clearSubgraphs()
Definition graph.h:686
SubgraphT< N > & addSubgraph()
size_t numNodes() const
Definition graph.h:582
bool isCompleted() const
Definition graph.h:300
bool setIncomplete() const
Definition graph.h:310
Node(F &&f)
Definition graph.h:330
void setCompleted() const
Definition graph.h:323
void forEachDependent(F &&func) const
Definition graph.h:272
void forEachDependent(F &&func)
Definition graph.h:283
void dependsOn(Ns &... nodes)
Definition graph.h:255
size_t numPredecessors() const
Definition graph.h:291
Node(Node &&other) noexcept
Definition graph.h:235
void run() const
Definition graph.h:262
const N & node(size_t index) const
Definition graph.h:461
void forEachNode(F &&func) const
Definition graph.h:478
N & node(size_t index)
Definition graph.h:469
size_t numNodes() const
Definition graph.h:453
void forEachNode(F &&func)
Definition graph.h:490
SubgraphT(SubgraphT< N > &&other) noexcept
Definition graph.h:433