#include <atomic>
#include <iostream>
#include <sstream>
class Buffer {
public:
Buffer() : data_(1024, 0), useCount_(0) {}
void process(int value) {
for (size_t i = 0; i < data_.size(); ++i) {
data_[i] = value + static_cast<int>(i);
}
useCount_++;
}
int checksum() const {
int sum = 0;
for (int val : data_) {
sum += val;
}
return sum;
}
int useCount() const {
return useCount_;
}
private:
std::vector<int> data_;
int useCount_;
};
class DatabaseConnection {
public:
DatabaseConnection(int id) : connectionId_(id), queryCount_(0) {}
int executeQuery(int param) {
queryCount_++;
return connectionId_ * 1000 + param;
}
int connectionId() const {
return connectionId_;
}
int queryCount() const {
return queryCount_;
}
private:
int connectionId_;
int queryCount_;
};
int main() {
std::cout << "Example 1: Basic ResourcePool usage\n";
{
std::atomic<int> totalChecksum(0);
auto resource = bufferPool.acquire();
resource.get().process(static_cast<int>(i));
totalChecksum.fetch_add(resource.get().checksum(), std::memory_order_relaxed);
});
std::cout << " Total checksum from 100 operations: " << totalChecksum.load() << "\n";
}
std::cout << "\nExample 2: Connection pool pattern\n";
{
std::atomic<int> nextConnectionId(0);
return DatabaseConnection(nextConnectionId.fetch_add(1, std::memory_order_relaxed));
});
std::atomic<int> totalResult(0);
auto conn = connectionPool.acquire();
int result = conn.get().executeQuery(static_cast<int>(i));
totalResult.fetch_add(result, std::memory_order_relaxed);
});
std::cout << " Total query result sum: " << totalResult.load() << "\n";
std::cout << " (50 queries distributed across 3 connections)\n";
}
std::cout << "\nExample 3: Using ResourcePool to limit concurrency\n";
{
struct Permit {};
constexpr size_t kMaxConcurrent = 2;
std::atomic<int> currentActive(0);
std::atomic<int> maxObserved(0);
auto permit = permits.acquire();
int active = currentActive.fetch_add(1, std::memory_order_relaxed) + 1;
int maxSeen = maxObserved.load(std::memory_order_relaxed);
while (active > maxSeen &&
!maxObserved.compare_exchange_weak(maxSeen, active, std::memory_order_relaxed)) {
}
volatile int work = 0;
for (int j = 0; j < 1000; ++j) {
work += j;
}
(void)work;
(void)i;
currentActive.fetch_sub(1, std::memory_order_relaxed);
});
std::cout << " Max concurrent operations observed: " << maxObserved.load() << " (limit was "
<< kMaxConcurrent << ")\n";
}
std::cout << "\nExample 4: Resources with expensive initialization\n";
{
std::atomic<int> initCount(0);
initCount.fetch_add(1, std::memory_order_relaxed);
std::stringstream ss;
ss.precision(10);
return ss;
});
auto stream = streamPool.acquire();
stream.get().str("");
stream.get() << "Value: " << i * 3.14159;
});
std::cout << " Total initializations: " << initCount.load() << " (pool size: 2)\n";
std::cout << " (Only 2 streams were created, reused for 20 operations)\n";
}
std::cout << "\nExample 5: Sequential resource acquisition pattern\n";
{
std::atomic<int> resultSum(0);
auto resA = poolA.acquire();
auto resB = poolB.acquire();
resultSum.fetch_add(resA.get() + resB.get(), std::memory_order_relaxed);
(void)i;
});
std::cout << " Result sum: " << resultSum.load() << " (expected: 3000)\n";
}
std::cout << "\nAll ResourcePool examples completed successfully!\n";
return 0;
}
void parallel_for(TaskSetT &taskSet, StateContainer &states, const StateGen &defaultState, const ChunkedRange< IntegerT > &range, F &&f, ParForOptions options={})