dispenso 1.6.0
A library for task parallelism
Loading...
Searching...
No Matches
concurrent_object_arena.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <dispenso/platform.h>
17
18#include <atomic>
19#include <cassert>
20#include <cstring>
21#include <mutex>
22#include <vector>
23
24namespace detail {
25
26template <class T>
27constexpr T log2i(const T v) {
28 T log2 = 0, val = v;
29 while (val >>= 1)
30 ++log2;
31 return log2;
32}
33
34} // namespace detail
35
36namespace dispenso {
56template <class T, class Index = size_t, size_t alignment = dispenso::kCacheLineSize>
58 ConcurrentObjectArena() = delete;
66 explicit ConcurrentObjectArena(const Index minBuffSize, const Index initialSize = 0)
67 : kLog2BuffSize(
68 ::detail::log2i(minBuffSize) +
69 ((Index{1} << ::detail::log2i(minBuffSize)) == minBuffSize ? 0 : 1)),
70 kBufferSize(Index{1} << kLog2BuffSize),
71 kMask((Index{1} << kLog2BuffSize) - 1),
72 pos_(0),
73 allocatedSize_(0),
74 buffers_(nullptr),
75 buffersSize_(0),
76 buffersPos_(0) {
77 allocateBuffer();
78 allocatedSize_.store(kBufferSize, std::memory_order_relaxed);
79 if (initialSize > 0) {
80 grow_by(initialSize);
81 }
82 }
83
88 : kLog2BuffSize(other.kLog2BuffSize),
89 kBufferSize(other.kBufferSize),
90 kMask(other.kMask),
91 pos_(other.pos_.load(std::memory_order_relaxed)),
92 allocatedSize_(other.allocatedSize_.load(std::memory_order_relaxed)),
93 buffersSize_(other.buffersSize_),
94 buffersPos_(other.buffersPos_) {
95 static_assert(
96 std::is_trivially_copyable<T>::value,
97 "ConcurrentObjectArena copy constructor uses memcpy; T must be trivially copyable.");
98 T** otherBuffers = other.buffers_.load(std::memory_order_acquire);
99 T** newBuffers = new T*[buffersSize_];
100 for (Index i = 0; i < buffersSize_; ++i) {
101 void* ptr = detail::alignedMalloc(kBufferSize * sizeof(T), alignment);
102#if defined(__cpp_exceptions)
103 if (ptr == nullptr)
104 throw std::bad_alloc();
105#endif // __cpp_exceptions
106 std::memcpy(ptr, otherBuffers[i], kBufferSize * sizeof(T));
107 newBuffers[i] = static_cast<T*>(ptr);
108 }
109 buffers_.store(newBuffers, std::memory_order_release);
110 }
111
116 : kLog2BuffSize(0),
117 kBufferSize(0),
118 kMask(0),
119 pos_(0),
120 allocatedSize_(0),
121 buffers_(nullptr),
122 buffersSize_(0),
123 buffersPos_(0) {
124 swap(*this, other);
125 }
126
128 T** buffers = buffers_.load(std::memory_order_acquire);
129
130 for (Index i = 0; i < buffersPos_; i++)
131 detail::alignedFree(buffers[i]);
132
133 delete[] buffers;
134
135 for (T** p : deleteLater_)
136 delete[] p;
137 }
138
142 ConcurrentObjectArena<T, Index, alignment>& operator=(
143 ConcurrentObjectArena<T, Index, alignment> const& other) {
144 ConcurrentObjectArena<T, Index, alignment> copy(other);
145 swap(*this, copy);
146 return *this;
147 }
148
152 ConcurrentObjectArena<T, Index, alignment>& operator=(
153 ConcurrentObjectArena<T, Index, alignment>&& other) noexcept {
154 swap(*this, other);
155 return *this;
156 }
166 Index grow_by(const Index delta) {
167 Index newPos;
168 Index oldPos = pos_.load(std::memory_order_relaxed);
169
170 do {
171 Index curSize = allocatedSize_.load(std::memory_order_acquire);
172
173 if (oldPos + delta >= curSize) {
174 const std::lock_guard<std::mutex> guard(resizeMutex_);
175 curSize = allocatedSize_.load(std::memory_order_relaxed);
176 while (oldPos + delta >= curSize) {
177 allocateBuffer();
178 allocatedSize_.store(curSize + kBufferSize, std::memory_order_release);
179 curSize = curSize + kBufferSize;
180 }
181 }
182
183 newPos = oldPos + delta;
184 } while (!std::atomic_compare_exchange_weak_explicit(
185 &pos_, &oldPos, newPos, std::memory_order_release, std::memory_order_relaxed));
186
187 constructObjects(oldPos, oldPos + delta);
188
189 return oldPos;
190 }
191
200 inline const T& operator[](const Index index) const {
201 const Index bufIndex = index >> kLog2BuffSize;
202 const Index i = index & kMask;
203
204 return buffers_.load(std::memory_order_acquire)[bufIndex][i];
205 }
206
215 inline T& operator[](const Index index) {
216 return const_cast<T&>(
217 const_cast<const ConcurrentObjectArena<T, Index, alignment>&>(*this)[index]);
218 }
219
225 Index size() const {
226 return pos_.load(std::memory_order_relaxed);
227 }
228
233 Index capacity() const {
234 return allocatedSize_.load(std::memory_order_relaxed);
235 }
236
241 Index numBuffers() const {
242 return buffersPos_;
243 }
244
250 const T* getBuffer(const Index index) const {
251 return buffers_.load(std::memory_order_acquire)[index];
252 }
253
259 T* getBuffer(const Index index) {
260 return buffers_.load(std::memory_order_acquire)[index];
261 }
262
268 Index getBufferSize(const Index index) const {
269 const Index numBuffs = numBuffers();
270 assert(index < numBuffs);
271
272 if (index < numBuffs - 1)
273 return kBufferSize;
274 else
275 return pos_.load(std::memory_order_relaxed) - (kBufferSize * (numBuffs - 1));
276 }
277
283 friend void swap(
286 using std::swap;
287
288 swap(lhs.kLog2BuffSize, rhs.kLog2BuffSize);
289 swap(lhs.kBufferSize, rhs.kBufferSize);
290 swap(lhs.kMask, rhs.kMask);
291
292 const Index rhs_pos = rhs.pos_.load(std::memory_order_relaxed);
293 rhs.pos_.store(lhs.pos_.load(std::memory_order_relaxed), std::memory_order_relaxed);
294 lhs.pos_.store(rhs_pos, std::memory_order_relaxed);
295
296 const Index rhs_allocatedSize = rhs.allocatedSize_.load(std::memory_order_relaxed);
297 rhs.allocatedSize_.store(
298 lhs.allocatedSize_.load(std::memory_order_relaxed), std::memory_order_relaxed);
299 lhs.allocatedSize_.store(rhs_allocatedSize, std::memory_order_relaxed);
300
301 T** const rhs_buffers = rhs.buffers_.load(std::memory_order_acquire);
302 rhs.buffers_.store(lhs.buffers_.load(std::memory_order_acquire), std::memory_order_release);
303 lhs.buffers_.store(rhs_buffers, std::memory_order_release);
304
305 swap(lhs.buffersSize_, rhs.buffersSize_);
306 swap(lhs.buffersPos_, rhs.buffersPos_);
307 swap(lhs.deleteLater_, rhs.deleteLater_);
308 }
309
310 private:
311 void allocateBuffer() {
312 void* ptr = detail::alignedMalloc(kBufferSize * sizeof(T), alignment);
313#if defined(__cpp_exceptions)
314 if (ptr == nullptr)
315 throw std::bad_alloc();
316#endif // __cpp_exceptions
317
318 if (buffersPos_ < buffersSize_) {
319 buffers_.load(std::memory_order_acquire)[buffersPos_++] = static_cast<T*>(ptr);
320 } else {
321 const Index oldBuffersSize = buffersSize_;
322 T** oldBuffers = buffers_.load(std::memory_order_acquire);
323
324 buffersSize_ = oldBuffersSize == 0 ? 2 : oldBuffersSize * 2;
325 T** newBuffers = new T*[buffersSize_];
326
327 if (oldBuffers != nullptr) {
328 std::memcpy(newBuffers, oldBuffers, sizeof(T*) * oldBuffersSize);
329 deleteLater_.push_back(oldBuffers);
330 }
331
332 newBuffers[buffersPos_++] = static_cast<T*>(ptr);
333 buffers_.store(newBuffers, std::memory_order_release);
334 }
335 }
336
337 void constructObjects(const Index beginIndex, const Index endIndex) {
338 const Index startBuffer = beginIndex >> kLog2BuffSize;
339 const Index endBuffer = endIndex >> kLog2BuffSize;
340
341 Index bufStart = beginIndex & kMask;
342 for (Index b = startBuffer; b <= endBuffer; ++b) {
343 T* buf = buffers_.load(std::memory_order_acquire)[b];
344 const Index bufEnd = b == endBuffer ? (endIndex & kMask) : kBufferSize;
345 for (Index i = bufStart; i < bufEnd; ++i)
346 new (buf + i) T();
347
348 bufStart = 0;
349 }
350 }
351 //
352 // kBufferSize = 2^kLog2BuffSize
353 // mask = 0b00011111
354 // ──┬──
355 // └─number of 1s is log2BuffSize
356 //
357 std::mutex resizeMutex_;
358
359 Index kLog2BuffSize;
360 Index kBufferSize;
361 Index kMask;
362
363 std::atomic<Index> pos_;
364 std::atomic<Index> allocatedSize_;
365
366 std::atomic<T**> buffers_;
367 Index buffersSize_;
368 Index buffersPos_;
369 std::vector<T**> deleteLater_;
370};
371
372} // namespace dispenso
Index getBufferSize(const Index index) const
const T * getBuffer(const Index index) const
ConcurrentObjectArena(const Index minBuffSize, const Index initialSize=0)
ConcurrentObjectArena(ConcurrentObjectArena< T, Index, alignment > &&other) noexcept
ConcurrentObjectArena(const ConcurrentObjectArena< T, Index, alignment > &other)
uint32_t log2(uint64_t v)
Compute log base 2 of a value (runtime).
Definition util.h:193