dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
graph.cpp
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#include <dispenso/graph.h>
9
10#include <iterator>
11#include <mutex>
12
13namespace {
14constexpr size_t kToDelete = std::numeric_limits<size_t>::max();
15
16void set_union(
17 std::vector<const dispenso::BiPropNode*>& s1,
18 const std::vector<const dispenso::BiPropNode*>& s2) {
19 std::vector<const dispenso::BiPropNode*> tmp(s1);
20 s1.clear();
21 std::set_union(tmp.cbegin(), tmp.cend(), s2.cbegin(), s2.cend(), std::back_inserter(s1));
22}
23
24void set_insert(std::vector<const dispenso::BiPropNode*>& s, const dispenso::BiPropNode* node) {
25 auto it = std::upper_bound(s.begin(), s.end(), node);
26 if (it == s.begin() || *(it - 1) != node) {
27 s.insert(it, node);
28 }
29}
30} // anonymous namespace
31
32namespace dispenso {
33
34void BiPropNode::biPropDependsOnOneNode(BiPropNode& node) {
35 Node::dependsOnOneNode(node);
36 if (node.biPropSet_ == nullptr && biPropSet_ == nullptr) {
37 biPropSet_ = std::make_shared<std::vector<const BiPropNode*>>();
38 set_insert(*biPropSet_, this);
39 set_insert(*biPropSet_, &node);
40 node.biPropSet_ = biPropSet_;
41 } else if (node.biPropSet_ != nullptr && biPropSet_ != nullptr) {
42 set_union(*biPropSet_, *node.biPropSet_);
43 node.biPropSet_ = biPropSet_;
44 } else if (biPropSet_ == nullptr) {
45 biPropSet_ = node.biPropSet_;
46 set_insert(*biPropSet_, this);
47 } else {
48 node.biPropSet_ = biPropSet_;
49 set_insert(*biPropSet_, &node);
50 }
51}
52
53template <class N>
55 decrementDependentCounters();
56 const size_t numGraphPredecessors = markNodesWithPredicessors();
57 if (numGraphPredecessors != 0) {
58 removePredecessorDependencies(numGraphPredecessors);
59 }
60 destroyNodes();
61}
62
63template <class N>
65 for (NodeType* n : nodes_) {
66 n->~NodeType();
67 }
68 allocator_->clear();
69 nodes_.clear();
70}
71
72template <class N>
73SubgraphT<N>::~SubgraphT() {
74 for (NodeType* n : nodes_) {
75 n->~NodeType();
76 }
77}
78
79template <class N>
80void SubgraphT<N>::decrementDependentCounters() {
81 for (N* node : nodes_) {
82 for (Node* const dependent : node->dependents_) {
83 dependent->numPredecessors_--;
84 }
85 removeNodeFromBiPropSet(node);
86 }
87}
88
89template <class N>
90size_t SubgraphT<N>::markNodesWithPredicessors() {
91 size_t numGraphPredecessors = 0;
92 for (N* node : nodes_) {
93 if (node->numPredecessors_ != 0) {
94 numGraphPredecessors += node->numPredecessors_;
95 node->numPredecessors_ = kToDelete;
96 }
97 }
98 return numGraphPredecessors;
99}
100
101template <class N>
102void SubgraphT<N>::removePredecessorDependencies(size_t numGraphPredecessors) {
103 for (SubgraphT<N>& subgraph : graph_->subgraphs_) {
104 if (&subgraph == this) {
105 continue;
106 }
107 for (N* node : subgraph.nodes_) {
108 std::vector<Node*>& dependents = node->dependents_;
109 size_t num = dependents.size();
110 for (size_t i = 0; i < num;) {
111 if (dependents[i]->numPredecessors_ == kToDelete) {
112 dependents[i] = dependents[num - 1];
113 --num;
114 if (--numGraphPredecessors == 0) {
115 dependents.resize(num);
116 return;
117 }
118 } else {
119 i++;
120 }
121 }
122 dependents.resize(num);
123 }
124 }
125}
126
127namespace {
128constexpr size_t kMaxCache = 8;
129// Don't cache too-large allocators. This way we will have at most 8*(2**16) = 512K outstanding
130// nodes worth of memory per node type.
131// TODO(bbudge): Make these caching values macro configurable for lightweight platforms.
132constexpr size_t kMaxChunkCapacity = 1 << 16;
133
134using AlignedNodePoolPtr =
135 std::unique_ptr<NoLockPoolAllocator, detail::AlignedFreeDeleter<NoLockPoolAllocator>>;
136
137std::vector<AlignedNodePoolPtr> g_sgcache[2];
138std::mutex g_sgcacheMtx;
139
140template <class T>
141constexpr size_t kCacheIndex = size_t{std::is_same<T, BiPropNode>::value};
142
143} // namespace
144
145template <class N>
146typename SubgraphT<N>::PoolPtr SubgraphT<N>::getAllocator() {
147 AlignedNodePoolPtr ptr;
148
149 auto& cache = g_sgcache[kCacheIndex<N>];
150
151 {
152 std::lock_guard<std::mutex> lk(g_sgcacheMtx);
153 if (cache.empty()) {
154 void* alloc =
155 detail::alignedMalloc(sizeof(NoLockPoolAllocator), alignof(NoLockPoolAllocator));
156 auto* pool = new (alloc)
157 NoLockPoolAllocator(sizeof(NodeType), 128 * sizeof(NodeType), ::malloc, ::free);
158 ptr.reset(pool);
159 } else {
160 ptr = std::move(cache.back());
161 ptr->clear();
162 cache.pop_back();
163 }
164 }
165 return PoolPtr(ptr.release(), releaseAllocator);
166}
167
168template <class N>
169void SubgraphT<N>::releaseAllocator(NoLockPoolAllocator* ptr) {
170 if (!ptr) {
171 return;
172 }
173 if (ptr->totalChunkCapacity() < kMaxChunkCapacity) {
174 auto& cache = g_sgcache[kCacheIndex<N>];
175 {
176 std::lock_guard<std::mutex> lk(g_sgcacheMtx);
177 if (cache.size() < kMaxCache) {
178 cache.emplace_back(ptr);
179 return;
180 }
181 }
182 }
183 detail::AlignedFreeDeleter<NoLockPoolAllocator>()(ptr);
184}
185
186template <class N>
187GraphT<N>::GraphT(GraphT<N>&& other) : subgraphs_(std::move(other.subgraphs_)) {
188 for (SubgraphT<N>& subgraph : subgraphs_) {
189 subgraph.graph_ = this;
190 }
191}
192
193template <class N>
195 subgraphs_ = std::move(other.subgraphs_);
196 for (SubgraphT<N>& subgraph : subgraphs_) {
197 subgraph.graph_ = this;
198 }
199 return *this;
200}
201
202template <class N>
204 subgraphs_.push_back(SubgraphType(this));
205 return subgraphs_.back();
206}
207
208template class DISPENSO_DLL_ACCESS SubgraphT<Node>;
209template class DISPENSO_DLL_ACCESS SubgraphT<BiPropNode>;
210template class DISPENSO_DLL_ACCESS GraphT<Node>;
211template class DISPENSO_DLL_ACCESS GraphT<BiPropNode>;
212} // namespace dispenso
const SubgraphT< N > & subgraph(size_t index) const
Definition graph.h:588
SubgraphT< N > & addSubgraph()
Definition graph.cpp:203
detail::OpResult< T > OpResult
Definition pipeline.h:29