dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
resource_pool.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
14#pragma once
15
16#include <blockingconcurrentqueue.h>
17#include <dispenso/platform.h>
19
20namespace dispenso {
21
22template <typename T>
23class ResourcePool;
24
29template <typename T>
30class Resource {
31 public:
32 Resource(Resource&& other) : resource_(other.resource_), pool_(other.pool_) {
33 other.resource_ = nullptr;
34 }
35
36 Resource& operator=(Resource&& other) {
37 if (&other != this) {
38 recycle();
39 resource_ = other.resource_;
40 pool_ = other.pool_;
41 other.resource_ = nullptr;
42 }
43 return *this;
44 }
45
51 T& get() {
52 return *resource_;
53 }
54
55 ~Resource() {
56 recycle();
57 }
58
59 private:
60 Resource(T* res, ResourcePool<T>* pool) : resource_(res), pool_(pool) {}
61
62 void recycle();
63
64 T* resource_;
65 ResourcePool<T>* pool_;
66
67 friend class ResourcePool<T>;
68};
69
74template <typename T>
76 public:
84 template <typename F>
85 ResourcePool(size_t size, const F& init)
86 : pool_(size),
87 backingResources_(reinterpret_cast<char*>(
88 detail::alignedMalloc(size * detail::alignToCacheLine(sizeof(T))))),
89 size_(size) {
90 char* buf = backingResources_;
91
92 // There are three reasons we create our own buffer and use placement new:
93 // 1. We want to be able to handle non-movable non-copyable objects
94 // * Note that we could do this with std::deque
95 // 2. We want to minimize memory allocations, since that can be a common point of contention in
96 // multithreaded programs.
97 // 3. We can easily ensure that the objects are cache aligned to help avoid false sharing.
98
99 for (size_t i = 0; i < size; ++i) {
100 pool_.enqueue(new (buf) T(init()));
101 buf += detail::alignToCacheLine(sizeof(T));
102 }
103 }
104
111 T* t;
112 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
113 pool_.wait_dequeue(t);
114 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
115 return Resource<T>(t, this);
116 }
117
123 assert(pool_.size_approx() == size_);
124 for (size_t i = 0; i < size_; ++i) {
125 T* t;
126 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
127 pool_.wait_dequeue(t);
128 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
129 t->~T();
130 }
131 detail::alignedFree(backingResources_);
132 }
133
134 private:
135 void recycle(T* t) {
136 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
137 pool_.enqueue(t);
138 DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
139 }
140
141 moodycamel::BlockingConcurrentQueue<T*> pool_;
142 char* backingResources_;
143 size_t size_;
144
145 friend class Resource<T>;
146};
147
148template <typename T>
149void Resource<T>::recycle() {
150 if (resource_) {
151 pool_->recycle(resource_);
152 }
153}
154
155} // namespace dispenso
Resource< T > acquire()
ResourcePool(size_t size, const F &init)
detail::OpResult< T > OpResult
Definition pipeline.h:29