21#include <dispenso/detail/per_thread_info.h>
27#if DISPENSO_HAS_CONCEPTS
34template <
typename F,
typename Iter>
35concept ForEachFunc = std::invocable<F, decltype(*std::declval<Iter>())>;
47 uint32_t
maxThreads = std::numeric_limits<int32_t>::max();
62template <
typename TaskSetT,
typename Iter,
typename F>
63void for_each_n_schedule(
69 ssize_t transitionIdx,
70 size_t smallChunkSize,
72 std::random_access_iterator_tag) {
73 ssize_t numToSchedule = options.
wait ? numThreads - 1 : numThreads;
75 if (numToSchedule > 0) {
77 static_cast<size_t>(numToSchedule),
78 [start, &f, chunkSize, smallChunkSize, transitionIdx](
size_t idx) {
79 ssize_t sidx =
static_cast<ssize_t
>(idx);
81 ssize_t thisChunkSize;
82 if (sidx < transitionIdx) {
83 offset = sidx *
static_cast<ssize_t
>(chunkSize);
84 thisChunkSize =
static_cast<ssize_t
>(chunkSize);
86 offset = transitionIdx *
static_cast<ssize_t
>(chunkSize) +
87 (sidx - transitionIdx) *
static_cast<ssize_t
>(smallChunkSize);
88 thisChunkSize =
static_cast<ssize_t
>(smallChunkSize);
90 Iter s = start + offset;
91 Iter e = s + thisChunkSize;
93 auto recurseInfo = PerPoolPerThreadInfo::parForRecurse();
94 for (Iter it = s; it != e; ++it) {
102 ssize_t lastIdx = numThreads - 1;
104 ssize_t thisChunkSize;
105 if (lastIdx < transitionIdx) {
106 offset = lastIdx *
static_cast<ssize_t
>(chunkSize);
107 thisChunkSize =
static_cast<ssize_t
>(chunkSize);
109 offset = transitionIdx *
static_cast<ssize_t
>(chunkSize) +
110 (lastIdx - transitionIdx) *
static_cast<ssize_t
>(smallChunkSize);
111 thisChunkSize =
static_cast<ssize_t
>(smallChunkSize);
113 Iter lastStart = start + offset;
114 Iter lastEnd = lastStart + thisChunkSize;
115 for (Iter it = lastStart; it != lastEnd; ++it) {
123template <
typename TaskSetT,
typename Iter,
typename F,
typename IterCategory>
124void for_each_n_schedule(
130 ssize_t transitionIdx,
131 size_t smallChunkSize,
132 const ForEachOptions& options,
134 SmallVector<Iter, 64> boundaries;
135 boundaries.reserve(
static_cast<size_t>(numThreads) + 1);
136 boundaries.push_back(start);
137 for (ssize_t t = 0; t < numThreads; ++t) {
138 size_t cs = (t < transitionIdx) ? chunkSize : smallChunkSize;
139 Iter next = boundaries[t];
140 std::advance(next,
static_cast<ptrdiff_t
>(cs));
141 boundaries.push_back(next);
144 ssize_t numToSchedule = options.wait ? numThreads - 1 : numThreads;
146 if (numToSchedule > 0) {
147 tasks.scheduleBulk(
static_cast<size_t>(numToSchedule), [&boundaries, &f](
size_t idx) {
148 return [s = boundaries[idx], e = boundaries[idx + 1], f]() {
149 auto recurseInfo = PerPoolPerThreadInfo::parForRecurse();
150 for (Iter it = s; it != e; ++it) {
158 for (Iter it = boundaries[numThreads - 1]; it != boundaries[numThreads]; ++it) {
178template <
typename TaskSetT,
typename Iter,
typename F>
179DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
180void for_each_n(TaskSetT& tasks, Iter start,
size_t n, F&& f,
ForEachOptions options = {}) {
183 if (!n || !options.maxThreads || detail::PerPoolPerThreadInfo::isParForRecursive(&tasks.pool())) {
184 for (
size_t i = 0; i < n; ++i) {
195 int32_t maxThreads = std::max<int32_t>(options.maxThreads, 1);
197 ssize_t numThreads = std::min<ssize_t>(tasks.numPoolThreads() + options.wait, maxThreads);
199 numThreads = std::min<ssize_t>(numThreads, n);
201 auto chunking = detail::staticChunkSize(n, numThreads);
202 size_t chunkSize = chunking.ceilChunkSize;
204 bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
205 ssize_t transitionIdx = chunking.transitionTaskIndex;
206 size_t smallChunkSize = chunkSize - !perfectlyChunked;
208 detail::for_each_n_schedule(
217 typename std::iterator_traits<Iter>::iterator_category{});
231template <
typename Iter,
typename F>
232DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
234 TaskSet taskSet(globalThreadPool());
236 for_each_n(taskSet, start, n, std::forward<F>(f), options);
250template <
typename TaskSetT,
typename Iter,
typename F>
251DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
252void for_each(TaskSetT& tasks, Iter start, Iter end, F&& f,
ForEachOptions options = {}) {
253 for_each_n(tasks, start, std::distance(start, end), std::forward<F>(f), options);
267template <
typename Iter,
typename F>
268DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
270 for_each_n(start, std::distance(start, end), std::forward<F>(f), options);