dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
concurrent_object_arena.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <dispenso/platform.h>
17
18#include <atomic>
19#include <cassert>
20#include <cstring>
21#include <mutex>
22#include <vector>
23
24namespace detail {
25
26template <class T>
27constexpr T log2i(const T v) {
28 T log2 = 0, val = v;
29 while (val >>= 1)
30 ++log2;
31 return log2;
32}
33
34} // namespace detail
35
36namespace dispenso {
56template <class T, class Index = size_t, size_t alignment = dispenso::kCacheLineSize>
58 ConcurrentObjectArena() = delete;
65 explicit ConcurrentObjectArena(const Index minBuffSize)
66 : kLog2BuffSize(
67 ::detail::log2i(minBuffSize) +
68 ((Index{1} << ::detail::log2i(minBuffSize)) == minBuffSize ? 0 : 1)),
69 kBufferSize(Index{1} << kLog2BuffSize),
70 kMask((Index{1} << kLog2BuffSize) - 1),
71 pos_(0),
72 allocatedSize_(0),
73 buffers_(nullptr),
74 buffersSize_(0),
75 buffersPos_(0) {
76 allocateBuffer();
77 allocatedSize_.store(kBufferSize, std::memory_order_relaxed);
78 }
79
84 : kLog2BuffSize(other.kLog2BuffSize),
85 kBufferSize(other.kBufferSize),
86 kMask(other.kMask),
87 pos_(other.pos_.load(std::memory_order_relaxed)),
88 allocatedSize_(other.allocatedSize_.load(std::memory_order_relaxed)),
89 buffersSize_(other.buffersSize_),
90 buffersPos_(other.buffersPos_) {
91 T** otherBuffers = other.buffers_.load(std::memory_order_acquire);
92 T** newBuffers = new T*[buffersSize_];
93 for (Index i = 0; i < buffersSize_; ++i) {
94 void* ptr = detail::alignedMalloc(kBufferSize * sizeof(T), alignment);
95#if defined(__cpp_exceptions)
96 if (ptr == nullptr)
97 throw std::bad_alloc();
98#endif // __cpp_exceptions
99 std::memcpy(ptr, otherBuffers[i], kBufferSize * sizeof(T));
100 newBuffers[i] = static_cast<T*>(ptr);
101 }
102 buffers_.store(newBuffers, std::memory_order_release);
103 }
104
109 : kLog2BuffSize(0),
110 kBufferSize(0),
111 kMask(0),
112 pos_(0),
113 allocatedSize_(0),
114 buffers_(nullptr),
115 buffersSize_(0),
116 buffersPos_(0) {
117 swap(*this, other);
118 }
119
121 T** buffers = buffers_.load(std::memory_order_acquire);
122
123 for (Index i = 0; i < buffersPos_; i++)
124 detail::alignedFree(buffers[i]);
125
126 delete[] buffers;
127
128 for (T** p : deleteLater_)
129 delete[] p;
130 }
131
141
147 swap(*this, other);
148 return *this;
149 }
159 Index grow_by(const Index delta) {
160 Index newPos;
161 Index oldPos = pos_.load(std::memory_order_relaxed);
162
163 do {
164 Index curSize = allocatedSize_.load(std::memory_order_acquire);
165
166 if (oldPos + delta >= curSize) {
167 const std::lock_guard<std::mutex> guard(resizeMutex_);
168 curSize = allocatedSize_.load(std::memory_order_relaxed);
169 while (oldPos + delta >= curSize) {
170 allocateBuffer();
171 allocatedSize_.store(curSize + kBufferSize, std::memory_order_release);
172 curSize = curSize + kBufferSize;
173 }
174 }
175
176 newPos = oldPos + delta;
177 } while (!std::atomic_compare_exchange_weak_explicit(
178 &pos_, &oldPos, newPos, std::memory_order_release, std::memory_order_relaxed));
179
180 constructObjects(oldPos, oldPos + delta);
181
182 return oldPos;
183 }
184
193 inline const T& operator[](const Index index) const {
194 const Index bufIndex = index >> kLog2BuffSize;
195 const Index i = index & kMask;
196
197 return buffers_.load(std::memory_order_acquire)[bufIndex][i];
198 }
199
208 inline T& operator[](const Index index) {
209 return const_cast<T&>(
210 const_cast<const ConcurrentObjectArena<T, Index, alignment>&>(*this)[index]);
211 }
212
218 Index size() const {
219 return pos_.load(std::memory_order_relaxed);
220 }
221
226 Index capacity() const {
227 return allocatedSize_.load(std::memory_order_relaxed);
228 }
229
234 Index numBuffers() const {
235 return buffersPos_;
236 }
237
243 const T* getBuffer(const Index index) const {
244 return buffers_.load(std::memory_order_acquire)[index];
245 }
246
252 T* getBuffer(const Index index) {
253 return buffers_.load(std::memory_order_acquire)[index];
254 }
255
261 Index getBufferSize(const Index index) const {
262 const Index numBuffs = numBuffers();
263 assert(index < numBuffs);
264
265 if (index < numBuffs - 1)
266 return kBufferSize;
267 else
268 return pos_.load(std::memory_order_relaxed) - (kBufferSize * (numBuffs - 1));
269 }
270
276 friend void swap(
277 ConcurrentObjectArena<T, Index, alignment>& lhs,
278 ConcurrentObjectArena<T, Index, alignment>& rhs) noexcept {
279 using std::swap;
280
281 swap(lhs.kLog2BuffSize, rhs.kLog2BuffSize);
282 swap(lhs.kBufferSize, rhs.kBufferSize);
283 swap(lhs.kMask, rhs.kMask);
284
285 const Index rhs_pos = rhs.pos_.load(std::memory_order_relaxed);
286 rhs.pos_.store(lhs.pos_.load(std::memory_order_relaxed), std::memory_order_relaxed);
287 lhs.pos_.store(rhs_pos, std::memory_order_relaxed);
288
289 const Index rhs_allocatedSize = rhs.allocatedSize_.load(std::memory_order_relaxed);
290 rhs.allocatedSize_.store(
291 lhs.allocatedSize_.load(std::memory_order_relaxed), std::memory_order_relaxed);
292 lhs.allocatedSize_.store(rhs_allocatedSize, std::memory_order_relaxed);
293
294 T** const rhs_buffers = rhs.buffers_.load(std::memory_order_acquire);
295 rhs.buffers_.store(lhs.buffers_.load(std::memory_order_acquire), std::memory_order_release);
296 lhs.buffers_.store(rhs_buffers, std::memory_order_release);
297
298 swap(lhs.buffersSize_, rhs.buffersSize_);
299 swap(lhs.buffersPos_, rhs.buffersPos_);
300 swap(lhs.deleteLater_, rhs.deleteLater_);
301 }
302
303 private:
304 void allocateBuffer() {
305 void* ptr = detail::alignedMalloc(kBufferSize * sizeof(T), alignment);
306#if defined(__cpp_exceptions)
307 if (ptr == nullptr)
308 throw std::bad_alloc();
309#endif // __cpp_exceptions
310
311 if (buffersPos_ < buffersSize_) {
312 buffers_.load(std::memory_order_acquire)[buffersPos_++] = static_cast<T*>(ptr);
313 } else {
314 const Index oldBuffersSize = buffersSize_;
315 T** oldBuffers = buffers_.load(std::memory_order_acquire);
316
317 buffersSize_ = oldBuffersSize == 0 ? 2 : oldBuffersSize * 2;
318 T** newBuffers = new T*[buffersSize_];
319
320 if (oldBuffers != nullptr) {
321 std::memcpy(newBuffers, oldBuffers, sizeof(T*) * oldBuffersSize);
322 deleteLater_.push_back(oldBuffers);
323 }
324
325 newBuffers[buffersPos_++] = static_cast<T*>(ptr);
326 buffers_.store(newBuffers, std::memory_order_release);
327 }
328 }
329
330 void constructObjects(const Index beginIndex, const Index endIndex) {
331 const Index startBuffer = beginIndex >> kLog2BuffSize;
332 const Index endBuffer = endIndex >> kLog2BuffSize;
333
334 Index bufStart = beginIndex & kMask;
335 for (Index b = startBuffer; b <= endBuffer; ++b) {
336 T* buf = buffers_.load(std::memory_order_acquire)[b];
337 const Index bufEnd = b == endBuffer ? (endIndex & kMask) : kBufferSize;
338 for (Index i = bufStart; i < bufEnd; ++i)
339 new (buf + i) T();
340
341 bufStart = 0;
342 }
343 }
344 //
345 // kBufferSize = 2^kLog2BuffSize
346 // mask = 0b00011111
347 // ──┬──
348 // └─number of 1s is log2BuffSize
349 //
350 std::mutex resizeMutex_;
351
352 Index kLog2BuffSize;
353 Index kBufferSize;
354 Index kMask;
355
356 std::atomic<Index> pos_;
357 std::atomic<Index> allocatedSize_;
358
359 std::atomic<T**> buffers_;
360 Index buffersSize_;
361 Index buffersPos_;
362 std::vector<T**> deleteLater_;
363};
364
365} // namespace dispenso
ConcurrentObjectArena< T, Index, alignment > & operator=(ConcurrentObjectArena< T, Index, alignment > &&other) noexcept
ConcurrentObjectArena(ConcurrentObjectArena< T, Index, alignment > &&other) noexcept
ConcurrentObjectArena(const ConcurrentObjectArena< T, Index, alignment > &other)
ConcurrentObjectArena< T, Index, alignment > & operator=(ConcurrentObjectArena< T, Index, alignment > const &other)
ConcurrentObjectArena(const Index minBuffSize)
uint32_t log2(uint64_t v)
Compute log base 2 of a value (runtime).
Definition util.h:193