dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
concurrent_object_arena.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
13#pragma once
14
15#include <dispenso/platform.h>
16
17#include <atomic>
18#include <cassert>
19#include <cstring>
20#include <mutex>
21#include <vector>
22
23namespace detail {
24
25template <class T>
26constexpr T log2i(const T v) {
27 T log2 = 0, val = v;
28 while (val >>= 1)
29 ++log2;
30 return log2;
31}
32
33} // namespace detail
34
35namespace dispenso {
55template <class T, class Index = size_t, size_t alignment = dispenso::kCacheLineSize>
57 ConcurrentObjectArena() = delete;
66 ::detail::log2i(minBuffSize) +
67 ((Index{1} << ::detail::log2i(minBuffSize)) == minBuffSize ? 0 : 1)),
69 kMask((Index{1} << kLog2BuffSize) - 1),
70 pos_(0),
72 buffers_(nullptr),
73 buffersSize_(0),
74 buffersPos_(0) {
76 allocatedSize_.store(kBufferSize, std::memory_order_relaxed);
77 }
78
90 T** otherBuffers = other.buffers_.load(std::memory_order_acquire);
91 T** newBuffers = new T*[buffersSize_];
92 for (Index i = 0; i < buffersSize_; ++i) {
93 void* ptr = detail::alignedMalloc(kBufferSize * sizeof(T), alignment);
94#if defined(__cpp_exceptions)
95 if (ptr == nullptr)
96 throw std::bad_alloc();
97#endif // __cpp_exceptions
98 std::memcpy(ptr, otherBuffers[i], kBufferSize * sizeof(T));
99 newBuffers[i] = static_cast<T*>(ptr);
100 }
101 buffers_.store(newBuffers, std::memory_order_release);
102 }
103
108 : kLog2BuffSize(0),
109 kBufferSize(0),
110 kMask(0),
111 pos_(0),
113 buffers_(nullptr),
114 buffersSize_(0),
115 buffersPos_(0) {
116 swap(*this, other);
117 }
118
120 T** buffers = buffers_.load(std::memory_order_acquire);
121
122 for (Index i = 0; i < buffersPos_; i++)
123 detail::alignedFree(buffers[i]);
124
125 delete[] buffers;
126
127 for (T** p : deleteLater_)
128 delete[] p;
129 }
130
140
158 Index grow_by(const Index delta) {
160 Index oldPos = pos_.load(std::memory_order_relaxed);
161
162 do {
163 Index curSize = allocatedSize_.load(std::memory_order_acquire);
164
165 if (oldPos + delta >= curSize) {
166 const std::lock_guard<std::mutex> guard(resizeMutex_);
167 curSize = allocatedSize_.load(std::memory_order_relaxed);
168 while (oldPos + delta >= curSize) {
170 allocatedSize_.store(curSize + kBufferSize, std::memory_order_release);
172 }
173 }
174
175 newPos = oldPos + delta;
176 } while (!std::atomic_compare_exchange_weak_explicit(
177 &pos_, &oldPos, newPos, std::memory_order_release, std::memory_order_relaxed));
178
179 constructObjects(oldPos, oldPos + delta);
180
181 return oldPos;
182 }
183
192 inline const T& operator[](const Index index) const {
193 const Index bufIndex = index >> kLog2BuffSize;
194 const Index i = index & kMask;
195
196 return buffers_.load(std::memory_order_acquire)[bufIndex][i];
197 }
198
207 inline T& operator[](const Index index) {
208 return const_cast<T&>(
209 const_cast<const ConcurrentObjectArena<T, Index, alignment>&>(*this)[index]);
210 }
211
217 Index size() const {
218 return pos_.load(std::memory_order_relaxed);
219 }
220
225 Index capacity() const {
226 return allocatedSize_.load(std::memory_order_relaxed);
227 }
228
233 Index numBuffers() const {
234 return buffersPos_;
235 }
236
242 const T* getBuffer(const Index index) const {
243 return buffers_.load(std::memory_order_acquire)[index];
244 }
245
251 T* getBuffer(const Index index) {
252 return buffers_.load(std::memory_order_acquire)[index];
253 }
254
260 Index getBufferSize(const Index index) const {
261 const Index numBuffs = numBuffers();
262 assert(index < numBuffs);
263
264 if (index < numBuffs - 1)
265 return kBufferSize;
266 else
267 return pos_.load(std::memory_order_relaxed) - (kBufferSize * (numBuffs - 1));
268 }
269
275 friend void swap(
276 ConcurrentObjectArena<T, Index, alignment>& lhs,
277 ConcurrentObjectArena<T, Index, alignment>& rhs) noexcept {
278 using std::swap;
279
280 swap(lhs.kLog2BuffSize, rhs.kLog2BuffSize);
281 swap(lhs.kBufferSize, rhs.kBufferSize);
282 swap(lhs.kMask, rhs.kMask);
283
284 const Index rhs_pos = rhs.pos_.load(std::memory_order_relaxed);
285 rhs.pos_.store(lhs.pos_.load(std::memory_order_relaxed), std::memory_order_relaxed);
286 lhs.pos_.store(rhs_pos, std::memory_order_relaxed);
287
288 const Index rhs_allocatedSize = rhs.allocatedSize_.load(std::memory_order_relaxed);
289 rhs.allocatedSize_.store(
290 lhs.allocatedSize_.load(std::memory_order_relaxed), std::memory_order_relaxed);
291 lhs.allocatedSize_.store(rhs_allocatedSize, std::memory_order_relaxed);
292
293 T** const rhs_buffers = rhs.buffers_.load(std::memory_order_acquire);
294 rhs.buffers_.store(lhs.buffers_.load(std::memory_order_acquire), std::memory_order_release);
295 lhs.buffers_.store(rhs_buffers, std::memory_order_release);
296
297 swap(lhs.buffersSize_, rhs.buffersSize_);
298 swap(lhs.buffersPos_, rhs.buffersPos_);
299 swap(lhs.deleteLater_, rhs.deleteLater_);
300 }
301
302 private:
303 void allocateBuffer() {
304 void* ptr = detail::alignedMalloc(kBufferSize * sizeof(T), alignment);
305#if defined(__cpp_exceptions)
306 if (ptr == nullptr)
307 throw std::bad_alloc();
308#endif // __cpp_exceptions
309
310 if (buffersPos_ < buffersSize_) {
311 buffers_.load(std::memory_order_acquire)[buffersPos_++] = static_cast<T*>(ptr);
312 } else {
313 const Index oldBuffersSize = buffersSize_;
314 T** oldBuffers = buffers_.load(std::memory_order_acquire);
315
316 buffersSize_ = oldBuffersSize == 0 ? 2 : oldBuffersSize * 2;
317 T** newBuffers = new T*[buffersSize_];
318
319 if (oldBuffers != nullptr) {
320 std::memcpy(newBuffers, oldBuffers, sizeof(T*) * oldBuffersSize);
321 deleteLater_.push_back(oldBuffers);
322 }
323
324 newBuffers[buffersPos_++] = static_cast<T*>(ptr);
325 buffers_.store(newBuffers, std::memory_order_release);
326 }
327 }
328
329 void constructObjects(const Index beginIndex, const Index endIndex) {
330 const Index startBuffer = beginIndex >> kLog2BuffSize;
331 const Index endBuffer = endIndex >> kLog2BuffSize;
332
333 Index bufStart = beginIndex & kMask;
334 for (Index b = startBuffer; b <= endBuffer; ++b) {
335 T* buf = buffers_.load(std::memory_order_acquire)[b];
336 const Index bufEnd = b == endBuffer ? (endIndex & kMask) : kBufferSize;
337 for (Index i = bufStart; i < bufEnd; ++i)
338 new (buf + i) T();
339
340 bufStart = 0;
341 }
342 }
343 //
344 // kBufferSize = 2^kLog2BuffSize
345 // mask = 0b00011111
346 // ──┬──
347 // └─number of 1s is log2BuffSize
348 //
349 std::mutex resizeMutex_;
350
351 Index kLog2BuffSize;
352 Index kBufferSize;
353 Index kMask;
354
355 std::atomic<Index> pos_;
356 std::atomic<Index> allocatedSize_;
357
358 std::atomic<T**> buffers_;
359 Index buffersSize_;
360 Index buffersPos_;
361 std::vector<T**> deleteLater_;
362};
363
364} // namespace dispenso
detail::OpResult< T > OpResult
Definition pipeline.h:29
ConcurrentObjectArena< T, Index, alignment > & operator=(ConcurrentObjectArena< T, Index, alignment > &&other) noexcept
ConcurrentObjectArena(ConcurrentObjectArena< T, Index, alignment > &&other) noexcept
ConcurrentObjectArena(const ConcurrentObjectArena< T, Index, alignment > &other)
ConcurrentObjectArena< T, Index, alignment > & operator=(ConcurrentObjectArena< T, Index, alignment > const &other)
ConcurrentObjectArena(const Index minBuffSize)