dispenso 1.6.0
A library for task parallelism
Loading...
Searching...
No Matches
graph.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <atomic>
17#include <deque>
18#include <limits>
19#include <memory>
20#include <type_traits>
21#include <vector>
22
24#include <dispenso/platform.h>
28
38#ifndef DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE
39#define DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE 4
40#endif
41
42static_assert(
43 DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE >= 1 && DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE <= 64,
44 "DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE must be between 1 and 64");
45
46/*
47Terminology
48--------------------------------------------------------------------------------
49The Node depends on the predecessor. The dependent depends on the node.
50~~~
51┌─────────────┐ ┌──────┐ ┌───────────┐
52│ predecessor │ ──▶ │ node │ ──▶ │ dependent │
53└─────────────┘ └──────┘ └───────────┘
54~~~
55
56Graph construction
57--------------------------------------------------------------------------------
58The Graph class can be used to created tasks with dependencies and execute it once.
59Graphs must not contain cycles.
60Example:
61~~~
62//
63// ┌────────────┐ ┌───────────────┐ ┌───────────────────┐
64// │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
65// └────────────┘ └───────────────┘ └───────────────────┘
66// ▲
67// ┌────────────┐ ┌───────────────┐ │
68// │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
69// └────────────┘ └───────────────┘
70
71std::array<float, 5> r;
72
73dispenso::Graph graph;
74
75dispenso::Node& N0 = graph.addNode([&]() { r[0] += 1; });
76dispenso::Node& N2 = graph.addNode([&]() { r[2] += 8; });
77dispenso::Node& N1 = graph.addNode([&]() { r[1] += r[0] * 2; });
78dispenso::Node& N3 = graph.addNode([&]() { r[3] += r[2] / 2; });
79dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
80
81N4.dependsOn(N1, N3);
82N1.dependsOn(N0);
83N3.dependsOn(N2);
84
85dispenso::TaskSet taskSet(dispenso::globalThreadPool());
86dispenso::ParallelForExecutor parallelForExecutor;
87parallelForExecutor(taskSet, graph);
88~~~
89
90Partial revaluation
91--------------------------------------------------------------------------------
92If graph is big or we need to recompute graph partially we can execute it again.
93After execution of the graph all nodes change their state from "incomplete" to
94"completed". If order to evaluate whole graph again we can use function `setAllNodesIncomplete`
95Example:
96~~~
97r = {0, 0, 0, 0, 0};
98setAllNodesIncomplete(graph);
99parallelForExecutor(taskSet, graph);
100~~~
101
102The graph can be recomputed partially if we have new input data for one or several nodes in the
103graph. It order to do it we need to call `setIncomplete()` method for every node which we need to
104recompute and after use functor `ForwardPropagator` to mark as "incomplete" all dependents.
105
106Example:
107~~~
108N1.setIncomplete();
109r[1] = r[4] = 0;
110ForwardPropagator forwardPropagator;
111forwardPropagator(graph);
112evaluateGraph(graph);
113~~~
114In this exaple only node 1 and 4 will be invoked.
115
116 Subgraphs
117--------------------------------------------------------------------------------
118It is possible to organize nodes into subgraphs and destroy and recreate if we have static and
119dynamic parts of the computation graph
120Example:
121~~~
122//
123// ∙----subgraph1---∙ ∙---subgraph2-------∙
124// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ ┌───────────────────┐
125// ¦ │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
126// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦ └───────────────────┘
127// ¦ ¦ ¦ ¦ ▲
128// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ │
129// ¦ │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
130// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦
131// ∙----------------∙ ∙-------------------∙
132std::array<float, 5> r;
133dispenso::Graph graph;
134
135dispenso::Subgraph& subgraph1 = graph.addSubgraph();
136dispenso::Subgraph& subgraph2 = graph.addSubgraph();
137
138dispenso::Node& N0 = subgraph1.addNode([&]() { r[0] += 1; });
139dispenso::Node& N2 = subgraph1.addNode([&]() { r[2] += 8; });
140dispenso::Node& N1 = subgraph2.addNode([&]() { r[1] += r[0] * 2; });
141dispenso::Node& N3 = subgraph2.addNode([&]() { r[3] += r[2] / 2; });
142dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
143
144N4.dependsOn(N1, N3);
145N1.dependsOn(N0);
146N3.dependsOn(N2);
147
148// evaluate graph first time
149r = {0, 0, 0, 0, 0};
150dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
151dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
152concurrentTaskSetExecutor(concurrentTaskSet, graph);
153
154// disconnect and destroy nodes of subgraph2
155// it invalidates node references/pointers of this subgraph
156subgraph2.clear();
157
158// create another nodes
159dispenso::Node& newN1 = subgraph2.addNode([&]() { r[1] += r[0] * 20; });
160dispenso::Node& newN3 = subgraph2.addNode([&]() { r[3] += r[2] / 20; });
161newN1.dependsOn(N0);
162newN3.dependsOn(N2);
163N4.dependsOn(newN1, newN3);
164
165// and revaluae the graph
166setAllNodesIncomplete(movedGraph);
167concurrentTaskSetExecutor(concurrentTaskSet, graph);
168~~~
169
170Bidirectional propagation dependency
171--------------------------------------------------------------------------------
172In certain scenarios, nodes may alter the same memory. In such instances, it becomes necessary to
173compute the predecessors of the node, even if they possess a "completed" state following state
174propagation. To facilitate this process automatically, we introduce the notion of a bidirectional
175propagation dependency (`BiProp`).
176
177Example:
178~~~
179// ┌─────────────────┐
180// │ 3: m3+=b*b │
181// └─────────────────┘
182// ▲
183// ┌−-----------−−−−−−−−−│−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−┐
184// ╎ ┌───────────┐ ┌─────────────────┐ ┌────────────┐ ╎
185// ╎ │ 0: b+=5 │ ──▷ │ 1: b*=5 │ ──▷ │ 2: b/=m4 │ ╎
186// ╎ └───────────┘ └─────────────────┘ └────────────┘ ╎
187// └−−−-−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−▲−−−−−−−−−−−−┘
188// Legend: │
189// ──▶ Normal dependency ┌────────────┐
190// ──▷ Bidirectional propagation dependency │ 4: m4+=2 │
191// m4 variable modified only in node 4 └────────────┘
192
193float b, m3, m4
194dispenso::BiPropGraph g;
195std::array<dispenso::BiPropNode*, 8> N;
196
197dispenso::BiPropNode& N0 = g.addNode([&]() { b += 5; });
198dispenso::BiPropNode& N1 = g.addNode([&]() { b *= 5; });
199dispenso::BiPropNode& N2 = g.addNode([&]() { b /= m4; });
200dispenso::BiPropNode& N3 = g.addNode([&]() { m3 += b*b; });
201dispenso::BiPropNode& N4 = g.addNode([&]() { m4 += 2; });
202
203N3.dependsOn(N1);
204N2.dependsOn(N4);
205N2.biPropDependsOn(N1);
206N1.biPropDependsOn(N0);
207
208// first execution
209b = m3 = m4 = 0.f;
210dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
211dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
212concurrentTaskSetExecutor(concurrentTaskSet, g);
213
214N[4].setIncomplete();
215// if node 4 is incomplete after propagation node 2 become incomplete. Taking in account that node 2
216// bidirectionally depends on nodes 0 and 1 they will be marked as incomplete as well
217b = m4 = 0.f;
218ForwardPropagator forwardPropagator;
219forwardPropagator(g);
220concurrentTaskSetExecutor(concurrentTaskSet, g);
221~~~
222
223Please read tests from `graph_test.cpp` for more examples.
224*/
225
226namespace detail {
227class ExecutorBase;
228
229template <typename F>
230void callFunctor(void* ptr) {
231 (*static_cast<F*>(ptr))();
232}
233
234template <typename F>
235void destroyFunctor(void* ptr) {
236 static_cast<F*>(ptr)->~F();
237 constexpr size_t kFuncSize = static_cast<size_t>(dispenso::detail::nextPow2(sizeof(F)));
238 dispenso::deallocSmallBuffer<kFuncSize>(ptr);
239}
240
241} // namespace detail
242
243namespace dispenso {
244
245namespace detail {
246class ExecutorBase;
247} // namespace detail
248
252class Node {
253 public:
254 Node() = delete;
255 Node(const Node&) = delete;
256 Node& operator=(const Node&) = delete;
258 Node(Node&& other) noexcept
259 : numIncompletePredecessors_(other.numIncompletePredecessors_.load()),
260 numPredecessors_(other.numPredecessors_),
261 invoke_(other.invoke_),
262 destroy_(other.destroy_),
263 funcBuffer_(other.funcBuffer_),
264 dependents_(std::move(other.dependents_)) {
265 other.funcBuffer_ = nullptr;
266 }
267 ~Node() {
268 if (funcBuffer_) {
269 destroy_(funcBuffer_);
270 }
271 }
277 template <typename... Ns>
278 inline void dependsOn(Ns&... nodes) {
279 ((void)std::initializer_list<int>{(dependsOnOneNode(nodes), 0)...});
280 }
285 inline void run() const {
286 invoke_(funcBuffer_);
287 numIncompletePredecessors_.store(kCompleted, std::memory_order_release);
288 }
294 template <class F>
295 inline void forEachDependent(F&& func) const {
296 for (const Node* dependent : dependents_) {
297 func(*dependent);
298 }
299 }
305 template <class F>
306 inline void forEachDependent(F&& func) {
307 for (Node* dependent : dependents_) {
308 func(*dependent);
309 }
310 }
314 inline size_t numPredecessors() const {
315 return numPredecessors_;
316 }
323 inline bool isCompleted() const {
324 return numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted;
325 }
333 inline bool setIncomplete() const {
334 if (numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted) {
335 numIncompletePredecessors_.store(0, std::memory_order_relaxed);
336 return true;
337 }
338 return false;
339 }
340
346 inline void setCompleted() const {
347 numIncompletePredecessors_.store(kCompleted, std::memory_order_relaxed);
348 }
349
350 protected:
352#if DISPENSO_HAS_CONCEPTS
353 template <class F>
354 requires(!std::derived_from<std::remove_cvref_t<F>, Node>)
355 Node(F&& f) : numIncompletePredecessors_(0) {
356#else
357 template <class F, class X = std::enable_if_t<!std::is_base_of<Node, F>::value, void>>
358 Node(F&& f) : numIncompletePredecessors_(0) {
359#endif
360 using FNoRef = typename std::remove_reference<F>::type;
361
362 constexpr size_t kFuncSize = static_cast<size_t>(detail::nextPow2(sizeof(FNoRef)));
363 funcBuffer_ = allocSmallBuffer<kFuncSize>();
364 new (funcBuffer_) FNoRef(std::forward<F>(f));
365 invoke_ = ::detail::callFunctor<FNoRef>;
366 destroy_ = ::detail::destroyFunctor<FNoRef>;
367 }
368
369 void dependsOnOneNode(Node& node) {
370 node.dependents_.emplace_back(this);
371 numPredecessors_++;
372 }
373
374 static constexpr size_t kCompleted = std::numeric_limits<size_t>::max();
375 mutable std::atomic<size_t> numIncompletePredecessors_;
376 size_t numPredecessors_ = 0;
377
378 private:
379 using InvokerType = void (*)(void* ptr);
380
381 InvokerType invoke_;
382 InvokerType destroy_;
383 char* funcBuffer_;
384
385 SmallVector<Node*, DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE> dependents_; // nodes depend on this
386
387 template <class N>
388 friend class SubgraphT;
389 friend class detail::ExecutorBase;
390 template <typename G>
391 friend void setAllNodesIncomplete(const G& graph);
392};
397class BiPropNode : public Node {
398 public:
399 BiPropNode() = delete;
400 BiPropNode(const BiPropNode&) = delete;
401 BiPropNode& operator=(const BiPropNode&) = delete;
403 BiPropNode(BiPropNode&& other) noexcept
404 : Node(std::move(other)), biPropSet_(std::move(other.biPropSet_)) {}
411 template <class... Ns>
412 inline void biPropDependsOn(Ns&... nodes) {
413 ((void)std::initializer_list<int>{(biPropDependsOnOneNode(nodes), 0)...});
414 }
421 inline bool isSameSet(const BiPropNode& node) const {
422 return biPropSet_ && biPropSet_ == node.biPropSet_;
423 }
424
425 private:
426#if DISPENSO_HAS_CONCEPTS
427 template <class T>
428 requires(!std::derived_from<std::remove_cvref_t<T>, BiPropNode>)
429 BiPropNode(T&& f) : Node(std::forward<T>(f)) {}
430#else
431 template <class T, class X = std::enable_if_t<!std::is_base_of<BiPropNode, T>::value, void>>
432 BiPropNode(T&& f) : Node(std::forward<T>(f)) {}
433#endif
434 inline void removeFromBiPropSet() {
435 if (biPropSet_ != nullptr) {
436 auto it = std::find(biPropSet_->begin(), biPropSet_->end(), this);
437 if (it != biPropSet_->end()) {
438 biPropSet_->erase(it);
439 }
440 }
441 }
442
443 DISPENSO_DLL_ACCESS void biPropDependsOnOneNode(BiPropNode& node);
444
445 std::shared_ptr<std::vector<const BiPropNode*>> biPropSet_;
446
447 template <class N>
448 friend class SubgraphT;
449 friend class detail::ExecutorBase;
450};
451
452template <class N>
453class GraphT;
454
460template <class N>
461class DISPENSO_DLL_ACCESS SubgraphT {
462 public:
463 using NodeType = N;
464 SubgraphT() = delete;
465 SubgraphT(const SubgraphT<N>&) = delete;
466 SubgraphT<N>& operator=(const SubgraphT<N>&) = delete;
468 SubgraphT(SubgraphT<N>&& other) noexcept
469 : graph_(other.graph_),
470 nodes_(std::move(other.nodes_)),
471 allocator_(std::move(other.allocator_)) {}
472 ~SubgraphT();
479 template <class T>
480 DISPENSO_REQUIRES(OnceCallableFunc<T>)
481 N& addNode(T&& f) {
482 nodes_.push_back(new (allocator_->alloc()) NodeType(std::forward<T>(f)));
483 return *nodes_.back();
484 }
488 size_t numNodes() const {
489 return nodes_.size();
490 }
499 void reserve(size_t n) {
500 nodes_.reserve(n);
501 }
507 const N& node(size_t index) const {
508 return *nodes_[index];
509 }
515 N& node(size_t index) {
516 return *nodes_[index];
517 }
523 template <class F>
524 inline void forEachNode(F&& func) const {
525 for (const N* node : nodes_) {
526 func(*node);
527 }
528 }
535 template <class F>
536 inline void forEachNode(F&& func) {
537 for (N* node : nodes_) {
538 func(*node);
539 }
540 }
545 void clear();
546
547 private:
548 using DeallocFunc = void (*)(NoLockPoolAllocator*);
549 using PoolPtr = std::unique_ptr<NoLockPoolAllocator, DeallocFunc>;
550
551 static constexpr size_t kNodeSizeP2 = static_cast<size_t>(detail::nextPow2(sizeof(NodeType)));
552
553 explicit SubgraphT(GraphT<N>* graph) : graph_(graph), nodes_(), allocator_(getAllocator()) {}
554
555 inline void removeNodeFromBiPropSet(Node* /* node */) {}
556 void removeNodeFromBiPropSet(BiPropNode* node) {
557 node->removeFromBiPropSet();
558 }
559 void decrementDependentCounters();
560 size_t markNodesWithPredicessors();
561 void removePredecessorDependencies(size_t numGraphPredecessors);
562
563 void destroyNodes();
564
565 static PoolPtr getAllocator();
566 static void releaseAllocator(NoLockPoolAllocator* ptr);
567
568 GraphT<N>* graph_;
569#if defined(_WIN32) && !defined(__MINGW32__)
570#pragma warning(push)
571#pragma warning(disable : 4251)
572#endif
573 std::vector<N*> nodes_;
574
575 PoolPtr allocator_;
576#if defined(_WIN32) && !defined(__MINGW32__)
577#pragma warning(pop)
578#endif
579
580 template <class T>
581 friend class GraphT;
582};
583
593template <class N>
594class DISPENSO_DLL_ACCESS GraphT {
595 public:
596 using NodeType = N;
598 GraphT(const GraphT<N>&) = delete;
599 GraphT& operator=(const GraphT<N>&) = delete;
604 subgraphs_.push_back(SubgraphType(this));
605 }
613 GraphT<N>& operator=(GraphT&& other) noexcept;
620 template <class T>
621 DISPENSO_REQUIRES(OnceCallableFunc<T>)
622 N& addNode(T&& f) {
623 return subgraphs_[0].addNode(std::forward<T>(f));
624 }
628 size_t numNodes() const {
629 return subgraphs_[0].numNodes();
630 }
634 void reserve(size_t n) {
635 subgraphs_[0].reserve(n);
636 }
642 const N& node(size_t index) const {
643 return subgraphs_[0].node(index);
644 }
650 N& node(size_t index) {
651 return subgraphs_[0].node(index);
652 }
660 size_t numSubgraphs() const {
661 return subgraphs_.size();
662 }
668 const SubgraphT<N>& subgraph(size_t index) const {
669 return subgraphs_[index];
670 }
676 SubgraphT<N>& subgraph(size_t index) {
677 return subgraphs_[index];
678 }
684 template <class F>
685 inline void forEachSubgraph(F&& func) const {
686 for (const SubgraphT<N>& subgraph : subgraphs_) {
687 func(subgraph);
688 }
689 }
695 template <class F>
696 inline void forEachSubgraph(F&& func) {
697 for (SubgraphT<N>& subgraph : subgraphs_) {
698 func(subgraph);
699 }
700 }
707 template <class F>
708 inline void forEachNode(F&& func) const {
709 for (const SubgraphT<N>& subgraph : subgraphs_) {
710 for (const N* node : subgraph.nodes_) {
711 func(*node);
712 }
713 }
714 }
720 template <class F>
721 inline void forEachNode(F&& func) {
722 for (SubgraphT<N>& subgraph : subgraphs_) {
723 for (N* node : subgraph.nodes_) {
724 func(*node);
725 }
726 }
727 }
731 inline void clear() {
732 subgraphs_.clear();
733 subgraphs_.push_back(SubgraphType(this));
734 }
738 inline void clearSubgraphs() {
739 for (SubgraphT<N>& subgraph : subgraphs_) {
740 subgraph.destroyNodes();
741 }
742 }
743
744 private:
745 static constexpr size_t kSubgraphSizeP2 =
746 static_cast<size_t>(detail::nextPow2(sizeof(SubgraphType)));
747
748#if defined(_WIN32) && !defined(__MINGW32__)
749#pragma warning(push)
750#pragma warning(disable : 4251)
751#endif
752 std::deque<SubgraphT<N>> subgraphs_;
753#if defined(_WIN32) && !defined(__MINGW32__)
754#pragma warning(pop)
755#endif
756
757 template <class T>
758 friend class SubgraphT;
759};
760
765
770} // namespace dispenso
BiPropNode(BiPropNode &&other) noexcept
Definition graph.h:403
bool isSameSet(const BiPropNode &node) const
Definition graph.h:421
void biPropDependsOn(Ns &... nodes)
Definition graph.h:412
void reserve(size_t n)
Definition graph.h:634
void forEachNode(F &&func) const
Definition graph.h:708
void clear()
Definition graph.h:731
void forEachNode(F &&func)
Definition graph.h:721
void forEachSubgraph(F &&func) const
Definition graph.h:685
size_t numSubgraphs() const
Definition graph.h:660
void forEachSubgraph(F &&func)
Definition graph.h:696
N & node(size_t index)
Definition graph.h:650
const SubgraphT< N > & subgraph(size_t index) const
Definition graph.h:668
GraphT(GraphT< N > &&other)
const N & node(size_t index) const
Definition graph.h:642
SubgraphT< N > & subgraph(size_t index)
Definition graph.h:676
void clearSubgraphs()
Definition graph.h:738
SubgraphT< N > & addSubgraph()
size_t numNodes() const
Definition graph.h:628
bool isCompleted() const
Definition graph.h:323
bool setIncomplete() const
Definition graph.h:333
Node(F &&f)
Definition graph.h:358
void setCompleted() const
Definition graph.h:346
void forEachDependent(F &&func) const
Definition graph.h:295
void forEachDependent(F &&func)
Definition graph.h:306
void dependsOn(Ns &... nodes)
Definition graph.h:278
size_t numPredecessors() const
Definition graph.h:314
Node(Node &&other) noexcept
Definition graph.h:258
void run() const
Definition graph.h:285
const N & node(size_t index) const
Definition graph.h:507
void forEachNode(F &&func) const
Definition graph.h:524
N & node(size_t index)
Definition graph.h:515
size_t numNodes() const
Definition graph.h:488
void reserve(size_t n)
Definition graph.h:499
void forEachNode(F &&func)
Definition graph.h:536
SubgraphT(SubgraphT< N > &&other) noexcept
Definition graph.h:468