dispenso 1.5.1
A library for task parallelism
Loading...
Searching...
No Matches
graph.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <atomic>
17#include <deque>
18#include <limits>
19#include <memory>
20#include <type_traits>
21#include <vector>
22
24#include <dispenso/platform.h>
28
38#ifndef DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE
39#define DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE 4
40#endif
41
42static_assert(
43 DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE >= 1 && DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE <= 64,
44 "DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE must be between 1 and 64");
45
46/*
47Terminology
48--------------------------------------------------------------------------------
49The Node depends on the predecessor. The dependent depends on the node.
50~~~
51┌─────────────┐ ┌──────┐ ┌───────────┐
52│ predecessor │ ──▶ │ node │ ──▶ │ dependent │
53└─────────────┘ └──────┘ └───────────┘
54~~~
55
56Graph construction
57--------------------------------------------------------------------------------
58The Graph class can be used to created tasks with dependencies and execute it once.
59Graphs must not contain cycles.
60Example:
61~~~
62//
63// ┌────────────┐ ┌───────────────┐ ┌───────────────────┐
64// │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
65// └────────────┘ └───────────────┘ └───────────────────┘
66// ▲
67// ┌────────────┐ ┌───────────────┐ │
68// │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
69// └────────────┘ └───────────────┘
70
71std::array<float, 5> r;
72
73dispenso::Graph graph;
74
75dispenso::Node& N0 = graph.addNode([&]() { r[0] += 1; });
76dispenso::Node& N2 = graph.addNode([&]() { r[2] += 8; });
77dispenso::Node& N1 = graph.addNode([&]() { r[1] += r[0] * 2; });
78dispenso::Node& N3 = graph.addNode([&]() { r[3] += r[2] / 2; });
79dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
80
81N4.dependsOn(N1, N3);
82N1.dependsOn(N0);
83N3.dependsOn(N2);
84
85dispenso::TaskSet taskSet(dispenso::globalThreadPool());
86dispenso::ParallelForExecutor parallelForExecutor;
87parallelForExecutor(taskSet, graph);
88~~~
89
90Partial revaluation
91--------------------------------------------------------------------------------
92If graph is big or we need to recompute graph partially we can execute it again.
93After execution of the graph all nodes change their state from "incomplete" to
94"completed". If order to evaluate whole graph again we can use function `setAllNodesIncomplete`
95Example:
96~~~
97r = {0, 0, 0, 0, 0};
98setAllNodesIncomplete(graph);
99parallelForExecutor(taskSet, graph);
100~~~
101
102The graph can be recomputed partially if we have new input data for one or several nodes in the
103graph. It order to do it we need to call `setIncomplete()` method for every node which we need to
104recompute and after use functor `ForwardPropagator` to mark as "incomplete" all dependents.
105
106Example:
107~~~
108N1.setIncomplete();
109r[1] = r[4] = 0;
110ForwardPropagator forwardPropagator;
111forwardPropagator(graph);
112evaluateGraph(graph);
113~~~
114In this exaple only node 1 and 4 will be invoked.
115
116 Subgraphs
117--------------------------------------------------------------------------------
118It is possible to organize nodes into subgraphs and destroy and recreate if we have static and
119dynamic parts of the computation graph
120Example:
121~~~
122//
123// ∙----subgraph1---∙ ∙---subgraph2-------∙
124// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ ┌───────────────────┐
125// ¦ │ 0: r[0]+=1 │ ──▶ │ 1: r[1]=r[0]*2│ ──▶ │ 4: r[4]=r[1]+r[3] │
126// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦ └───────────────────┘
127// ¦ ¦ ¦ ¦ ▲
128// ¦ ┌────────────┐ ¦ ¦ ┌───────────────┐ ¦ │
129// ¦ │ 2: r[2]+=8 │ ──▶ │ 3: r[3]=r[2]/2│ ──────┘
130// ¦ └────────────┘ ¦ ¦ └───────────────┘ ¦
131// ∙----------------∙ ∙-------------------∙
132std::array<float, 5> r;
133dispenso::Graph graph;
134
135dispenso::Subgraph& subgraph1 = graph.addSubgraph();
136dispenso::Subgraph& subgraph2 = graph.addSubgraph();
137
138dispenso::Node& N0 = subgraph1.addNode([&]() { r[0] += 1; });
139dispenso::Node& N2 = subgraph1.addNode([&]() { r[2] += 8; });
140dispenso::Node& N1 = subgraph2.addNode([&]() { r[1] += r[0] * 2; });
141dispenso::Node& N3 = subgraph2.addNode([&]() { r[3] += r[2] / 2; });
142dispenso::Node& N4 = graph.addNode([&]() { r[4] += r[1] + r[3]; });
143
144N4.dependsOn(N1, N3);
145N1.dependsOn(N0);
146N3.dependsOn(N2);
147
148// evaluate graph first time
149r = {0, 0, 0, 0, 0};
150dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
151dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
152concurrentTaskSetExecutor(concurrentTaskSet, graph);
153
154// disconnect and destroy nodes of subgraph2
155// it invalidates node references/pointers of this subgraph
156subgraph2.clear();
157
158// create another nodes
159dispenso::Node& newN1 = subgraph2.addNode([&]() { r[1] += r[0] * 20; });
160dispenso::Node& newN3 = subgraph2.addNode([&]() { r[3] += r[2] / 20; });
161newN1.dependsOn(N0);
162newN3.dependsOn(N2);
163N4.dependsOn(newN1, newN3);
164
165// and revaluae the graph
166setAllNodesIncomplete(movedGraph);
167concurrentTaskSetExecutor(concurrentTaskSet, graph);
168~~~
169
170Bidirectional propagation dependency
171--------------------------------------------------------------------------------
172In certain scenarios, nodes may alter the same memory. In such instances, it becomes necessary to
173compute the predecessors of the node, even if they possess a "completed" state following state
174propagation. To facilitate this process automatically, we introduce the notion of a bidirectional
175propagation dependency (`BiProp`).
176
177Example:
178~~~
179// ┌─────────────────┐
180// │ 3: m3+=b*b │
181// └─────────────────┘
182// ▲
183// ┌−-----------−−−−−−−−−│−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−┐
184// ╎ ┌───────────┐ ┌─────────────────┐ ┌────────────┐ ╎
185// ╎ │ 0: b+=5 │ ──▷ │ 1: b*=5 │ ──▷ │ 2: b/=m4 │ ╎
186// ╎ └───────────┘ └─────────────────┘ └────────────┘ ╎
187// └−−−-−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−▲−−−−−−−−−−−−┘
188// Legend: │
189// ──▶ Normal dependency ┌────────────┐
190// ──▷ Bidirectional propagation dependency │ 4: m4+=2 │
191// m4 variable modified only in node 4 └────────────┘
192
193float b, m3, m4
194dispenso::BiPropGraph g;
195std::array<dispenso::BiPropNode*, 8> N;
196
197dispenso::BiPropNode& N0 = g.addNode([&]() { b += 5; });
198dispenso::BiPropNode& N1 = g.addNode([&]() { b *= 5; });
199dispenso::BiPropNode& N2 = g.addNode([&]() { b /= m4; });
200dispenso::BiPropNode& N3 = g.addNode([&]() { m3 += b*b; });
201dispenso::BiPropNode& N4 = g.addNode([&]() { m4 += 2; });
202
203N3.dependsOn(N1);
204N2.dependsOn(N4);
205N2.biPropDependsOn(N1);
206N1.biPropDependsOn(N0);
207
208// first execution
209b = m3 = m4 = 0.f;
210dispenso::ConcurrentTaskSet concurrentTaskSet(dispenso::globalThreadPool());
211dispenso::ConcurrentTaskSetExecutor concurrentTaskSetExecutor;
212concurrentTaskSetExecutor(concurrentTaskSet, g);
213
214N[4].setIncomplete();
215// if node 4 is incomplete after propagation node 2 become incomplete. Taking in account that node 2
216// bidirectionally depends on nodes 0 and 1 they will be marked as incomplete as well
217b = m4 = 0.f;
218ForwardPropagator forwardPropagator;
219forwardPropagator(g);
220concurrentTaskSetExecutor(concurrentTaskSet, g);
221~~~
222
223Please read tests from `graph_test.cpp` for more examples.
224*/
225
226namespace detail {
227class ExecutorBase;
228
229template <typename F>
230void callFunctor(void* ptr) {
231 (*static_cast<F*>(ptr))();
232}
233
234template <typename F>
235void destroyFunctor(void* ptr) {
236 static_cast<F*>(ptr)->~F();
237 constexpr size_t kFuncSize = static_cast<size_t>(dispenso::detail::nextPow2(sizeof(F)));
238 dispenso::deallocSmallBuffer<kFuncSize>(ptr);
239}
240
241} // namespace detail
242
243namespace dispenso {
247class Node {
248 public:
249 Node() = delete;
250 Node(const Node&) = delete;
251 Node& operator=(const Node&) = delete;
253 Node(Node&& other) noexcept
254 : numIncompletePredecessors_(other.numIncompletePredecessors_.load()),
255 numPredecessors_(other.numPredecessors_),
256 invoke_(other.invoke_),
257 destroy_(other.destroy_),
258 funcBuffer_(other.funcBuffer_),
259 dependents_(std::move(other.dependents_)) {
260 other.funcBuffer_ = nullptr;
261 }
262 ~Node() {
263 if (funcBuffer_) {
264 destroy_(funcBuffer_);
265 }
266 }
272 template <typename... Ns>
273 inline void dependsOn(Ns&... nodes) {
274 ((void)std::initializer_list<int>{(dependsOnOneNode(nodes), 0)...});
275 }
280 inline void run() const {
281 invoke_(funcBuffer_);
282 numIncompletePredecessors_.store(kCompleted, std::memory_order_release);
283 }
289 template <class F>
290 inline void forEachDependent(F&& func) const {
291 for (const Node* dependent : dependents_) {
292 func(*dependent);
293 }
294 }
300 template <class F>
301 inline void forEachDependent(F&& func) {
302 for (Node* dependent : dependents_) {
303 func(*dependent);
304 }
305 }
309 inline size_t numPredecessors() const {
310 return numPredecessors_;
311 }
318 inline bool isCompleted() const {
319 return numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted;
320 }
328 inline bool setIncomplete() const {
329 if (numIncompletePredecessors_.load(std::memory_order_relaxed) == kCompleted) {
330 numIncompletePredecessors_.store(0, std::memory_order_relaxed);
331 return true;
332 }
333 return false;
334 }
335
341 inline void setCompleted() const {
342 numIncompletePredecessors_.store(kCompleted, std::memory_order_relaxed);
343 }
344
345 protected:
347 template <class F, class X = std::enable_if_t<!std::is_base_of<Node, F>::value, void>>
348 Node(F&& f) : numIncompletePredecessors_(0) {
349 using FNoRef = typename std::remove_reference<F>::type;
350
351 constexpr size_t kFuncSize = static_cast<size_t>(detail::nextPow2(sizeof(FNoRef)));
352 funcBuffer_ = allocSmallBuffer<kFuncSize>();
353 new (funcBuffer_) FNoRef(std::forward<F>(f));
354 invoke_ = ::detail::callFunctor<FNoRef>;
355 destroy_ = ::detail::destroyFunctor<FNoRef>;
356 }
357
358 void dependsOnOneNode(Node& node) {
359 node.dependents_.emplace_back(this);
360 numPredecessors_++;
361 }
362
363 static constexpr size_t kCompleted = std::numeric_limits<size_t>::max();
364 mutable std::atomic<size_t> numIncompletePredecessors_;
365 size_t numPredecessors_ = 0;
366
367 private:
368 using InvokerType = void (*)(void* ptr);
369
370 InvokerType invoke_;
371 InvokerType destroy_;
372 char* funcBuffer_;
373
374 SmallVector<Node*, DISPENSO_GRAPH_DEPENDENTS_INLINE_SIZE> dependents_; // nodes depend on this
375
376 template <class N>
377 friend class SubgraphT;
378 friend class ::detail::ExecutorBase;
379 template <typename G>
380 friend void setAllNodesIncomplete(const G& graph);
381};
386class BiPropNode : public Node {
387 public:
388 BiPropNode() = delete;
389 BiPropNode(const BiPropNode&) = delete;
390 BiPropNode& operator=(const BiPropNode&) = delete;
392 BiPropNode(BiPropNode&& other) noexcept
393 : Node(std::move(other)), biPropSet_(std::move(other.biPropSet_)) {}
400 template <class... Ns>
401 inline void biPropDependsOn(Ns&... nodes) {
402 ((void)std::initializer_list<int>{(biPropDependsOnOneNode(nodes), 0)...});
403 }
410 inline bool isSameSet(const BiPropNode& node) const {
411 return biPropSet_ && biPropSet_ == node.biPropSet_;
412 }
413
414 private:
415 template <class T, class X = std::enable_if_t<!std::is_base_of<BiPropNode, T>::value, void>>
416 BiPropNode(T&& f) : Node(std::forward<T>(f)) {}
417 inline void removeFromBiPropSet() {
418 if (biPropSet_ != nullptr) {
419 auto it = std::find(biPropSet_->begin(), biPropSet_->end(), this);
420 if (it != biPropSet_->end()) {
421 biPropSet_->erase(it);
422 }
423 }
424 }
425
426 DISPENSO_DLL_ACCESS void biPropDependsOnOneNode(BiPropNode& node);
427
428 std::shared_ptr<std::vector<const BiPropNode*>> biPropSet_;
429
430 template <class N>
431 friend class SubgraphT;
432 friend class ::detail::ExecutorBase;
433};
434
435template <class N>
436class GraphT;
437
443template <class N>
444class DISPENSO_DLL_ACCESS SubgraphT {
445 public:
446 using NodeType = N;
447 SubgraphT() = delete;
448 SubgraphT(const SubgraphT<N>&) = delete;
449 SubgraphT<N>& operator=(const SubgraphT<N>&) = delete;
451 SubgraphT(SubgraphT<N>&& other) noexcept
452 : graph_(other.graph_),
453 nodes_(std::move(other.nodes_)),
454 allocator_(std::move(other.allocator_)) {}
455 ~SubgraphT();
462 template <class T>
463 DISPENSO_REQUIRES(OnceCallableFunc<T>)
464 N& addNode(T&& f) {
465 nodes_.push_back(new (allocator_->alloc()) NodeType(std::forward<T>(f)));
466 return *nodes_.back();
467 }
471 size_t numNodes() const {
472 return nodes_.size();
473 }
482 void reserve(size_t n) {
483 nodes_.reserve(n);
484 }
490 const N& node(size_t index) const {
491 return *nodes_[index];
492 }
498 N& node(size_t index) {
499 return *nodes_[index];
500 }
506 template <class F>
507 inline void forEachNode(F&& func) const {
508 for (const N* node : nodes_) {
509 func(*node);
510 }
511 }
518 template <class F>
519 inline void forEachNode(F&& func) {
520 for (N* node : nodes_) {
521 func(*node);
522 }
523 }
528 void clear();
529
530 private:
531 using DeallocFunc = void (*)(NoLockPoolAllocator*);
532 using PoolPtr = std::unique_ptr<NoLockPoolAllocator, DeallocFunc>;
533
534 static constexpr size_t kNodeSizeP2 = static_cast<size_t>(detail::nextPow2(sizeof(NodeType)));
535
536 explicit SubgraphT(GraphT<N>* graph) : graph_(graph), nodes_(), allocator_(getAllocator()) {}
537
538 inline void removeNodeFromBiPropSet(Node* /* node */) {}
539 void removeNodeFromBiPropSet(BiPropNode* node) {
540 node->removeFromBiPropSet();
541 }
542 void decrementDependentCounters();
543 size_t markNodesWithPredicessors();
544 void removePredecessorDependencies(size_t numGraphPredecessors);
545
546 void destroyNodes();
547
548 static PoolPtr getAllocator();
549 static void releaseAllocator(NoLockPoolAllocator* ptr);
550
551 GraphT<N>* graph_;
552#if defined(_WIN32) && !defined(__MINGW32__)
553#pragma warning(push)
554#pragma warning(disable : 4251)
555#endif
556 std::vector<N*> nodes_;
557
558 PoolPtr allocator_;
559#if defined(_WIN32) && !defined(__MINGW32__)
560#pragma warning(pop)
561#endif
562
563 template <class T>
564 friend class GraphT;
565};
566
576template <class N>
577class DISPENSO_DLL_ACCESS GraphT {
578 public:
579 using NodeType = N;
581 GraphT(const GraphT<N>&) = delete;
582 GraphT& operator=(const GraphT<N>&) = delete;
587 subgraphs_.push_back(SubgraphType(this));
588 }
596 GraphT<N>& operator=(GraphT&& other) noexcept;
603 template <class T>
604 DISPENSO_REQUIRES(OnceCallableFunc<T>)
605 N& addNode(T&& f) {
606 return subgraphs_[0].addNode(std::forward<T>(f));
607 }
611 size_t numNodes() const {
612 return subgraphs_[0].numNodes();
613 }
619 const N& node(size_t index) const {
620 return subgraphs_[0].node(index);
621 }
627 N& node(size_t index) {
628 return subgraphs_[0].node(index);
629 }
637 size_t numSubgraphs() const {
638 return subgraphs_.size();
639 }
645 const SubgraphT<N>& subgraph(size_t index) const {
646 return subgraphs_[index];
647 }
653 SubgraphT<N>& subgraph(size_t index) {
654 return subgraphs_[index];
655 }
661 template <class F>
662 inline void forEachSubgraph(F&& func) const {
663 for (const SubgraphT<N>& subgraph : subgraphs_) {
664 func(subgraph);
665 }
666 }
672 template <class F>
673 inline void forEachSubgraph(F&& func) {
674 for (SubgraphT<N>& subgraph : subgraphs_) {
675 func(subgraph);
676 }
677 }
684 template <class F>
685 inline void forEachNode(F&& func) const {
686 for (const SubgraphT<N>& subgraph : subgraphs_) {
687 for (const N* node : subgraph.nodes_) {
688 func(*node);
689 }
690 }
691 }
697 template <class F>
698 inline void forEachNode(F&& func) {
699 for (SubgraphT<N>& subgraph : subgraphs_) {
700 for (N* node : subgraph.nodes_) {
701 func(*node);
702 }
703 }
704 }
708 inline void clear() {
709 subgraphs_.clear();
710 subgraphs_.push_back(SubgraphType(this));
711 }
715 inline void clearSubgraphs() {
716 for (SubgraphT<N>& subgraph : subgraphs_) {
717 subgraph.destroyNodes();
718 }
719 }
720
721 private:
722 static constexpr size_t kSubgraphSizeP2 =
723 static_cast<size_t>(detail::nextPow2(sizeof(SubgraphType)));
724
725#if defined(_WIN32) && !defined(__MINGW32__)
726#pragma warning(push)
727#pragma warning(disable : 4251)
728#endif
729 std::deque<SubgraphT<N>> subgraphs_;
730#if defined(_WIN32) && !defined(__MINGW32__)
731#pragma warning(pop)
732#endif
733
734 template <class T>
735 friend class SubgraphT;
736};
737
742
747} // namespace dispenso
BiPropNode(BiPropNode &&other) noexcept
Definition graph.h:392
bool isSameSet(const BiPropNode &node) const
Definition graph.h:410
void biPropDependsOn(Ns &... nodes)
Definition graph.h:401
void forEachNode(F &&func) const
Definition graph.h:685
void clear()
Definition graph.h:708
void forEachNode(F &&func)
Definition graph.h:698
void forEachSubgraph(F &&func) const
Definition graph.h:662
size_t numSubgraphs() const
Definition graph.h:637
void forEachSubgraph(F &&func)
Definition graph.h:673
N & node(size_t index)
Definition graph.h:627
const SubgraphT< N > & subgraph(size_t index) const
Definition graph.h:645
GraphT(GraphT< N > &&other)
const N & node(size_t index) const
Definition graph.h:619
SubgraphT< N > & subgraph(size_t index)
Definition graph.h:653
void clearSubgraphs()
Definition graph.h:715
SubgraphT< N > & addSubgraph()
size_t numNodes() const
Definition graph.h:611
bool isCompleted() const
Definition graph.h:318
bool setIncomplete() const
Definition graph.h:328
Node(F &&f)
Definition graph.h:348
void setCompleted() const
Definition graph.h:341
void forEachDependent(F &&func) const
Definition graph.h:290
void forEachDependent(F &&func)
Definition graph.h:301
void dependsOn(Ns &... nodes)
Definition graph.h:273
size_t numPredecessors() const
Definition graph.h:309
Node(Node &&other) noexcept
Definition graph.h:253
void run() const
Definition graph.h:280
const N & node(size_t index) const
Definition graph.h:490
void forEachNode(F &&func) const
Definition graph.h:507
N & node(size_t index)
Definition graph.h:498
size_t numNodes() const
Definition graph.h:471
void reserve(size_t n)
Definition graph.h:482
void forEachNode(F &&func)
Definition graph.h:519
SubgraphT(SubgraphT< N > &&other) noexcept
Definition graph.h:451