dispenso 1.5.1
A library for task parallelism
Loading...
Searching...
No Matches
for_each.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
16#pragma once
17
18#include <algorithm>
19#include <iterator>
20
21#include <dispenso/detail/per_thread_info.h>
23#include <dispenso/task_set.h>
24
25namespace dispenso {
26
27#if DISPENSO_HAS_CONCEPTS
34template <typename F, typename Iter>
35concept ForEachFunc = std::invocable<F, decltype(*std::declval<Iter>())>;
36#endif // DISPENSO_HAS_CONCEPTS
37
47 uint32_t maxThreads = std::numeric_limits<int32_t>::max();
56 bool wait = true;
57};
58
59namespace detail {
60
61// Random-access iterators: compute chunk boundaries arithmetically, avoiding SmallVector.
62template <typename TaskSetT, typename Iter, typename F>
63void for_each_n_schedule(
64 TaskSetT& tasks,
65 Iter start,
66 F&& f,
67 ssize_t numThreads,
68 size_t chunkSize,
69 ssize_t transitionIdx,
70 size_t smallChunkSize,
71 const ForEachOptions& options,
72 std::random_access_iterator_tag) {
73 ssize_t numToSchedule = options.wait ? numThreads - 1 : numThreads;
74
75 if (numToSchedule > 0) {
76 tasks.scheduleBulk(
77 static_cast<size_t>(numToSchedule),
78 [start, &f, chunkSize, smallChunkSize, transitionIdx](size_t idx) {
79 ssize_t sidx = static_cast<ssize_t>(idx);
80 ssize_t offset;
81 ssize_t thisChunkSize;
82 if (sidx < transitionIdx) {
83 offset = sidx * static_cast<ssize_t>(chunkSize);
84 thisChunkSize = static_cast<ssize_t>(chunkSize);
85 } else {
86 offset = transitionIdx * static_cast<ssize_t>(chunkSize) +
87 (sidx - transitionIdx) * static_cast<ssize_t>(smallChunkSize);
88 thisChunkSize = static_cast<ssize_t>(smallChunkSize);
89 }
90 Iter s = start + offset;
91 Iter e = s + thisChunkSize;
92 return [s, e, f]() {
93 auto recurseInfo = PerPoolPerThreadInfo::parForRecurse();
94 for (Iter it = s; it != e; ++it) {
95 f(*it);
96 }
97 };
98 });
99 }
100
101 if (options.wait) {
102 ssize_t lastIdx = numThreads - 1;
103 ssize_t offset;
104 ssize_t thisChunkSize;
105 if (lastIdx < transitionIdx) {
106 offset = lastIdx * static_cast<ssize_t>(chunkSize);
107 thisChunkSize = static_cast<ssize_t>(chunkSize);
108 } else {
109 offset = transitionIdx * static_cast<ssize_t>(chunkSize) +
110 (lastIdx - transitionIdx) * static_cast<ssize_t>(smallChunkSize);
111 thisChunkSize = static_cast<ssize_t>(smallChunkSize);
112 }
113 Iter lastStart = start + offset;
114 Iter lastEnd = lastStart + thisChunkSize;
115 for (Iter it = lastStart; it != lastEnd; ++it) {
116 f(*it);
117 }
118 tasks.wait();
119 }
120}
121
122// Non-random-access iterators: pre-compute boundary iterators into SmallVector.
123template <typename TaskSetT, typename Iter, typename F, typename IterCategory>
124void for_each_n_schedule(
125 TaskSetT& tasks,
126 Iter start,
127 F&& f,
128 ssize_t numThreads,
129 size_t chunkSize,
130 ssize_t transitionIdx,
131 size_t smallChunkSize,
132 const ForEachOptions& options,
133 IterCategory) {
134 SmallVector<Iter, 64> boundaries;
135 boundaries.reserve(static_cast<size_t>(numThreads) + 1);
136 boundaries.push_back(start);
137 for (ssize_t t = 0; t < numThreads; ++t) {
138 size_t cs = (t < transitionIdx) ? chunkSize : smallChunkSize;
139 Iter next = boundaries[t];
140 std::advance(next, static_cast<ptrdiff_t>(cs));
141 boundaries.push_back(next);
142 }
143
144 ssize_t numToSchedule = options.wait ? numThreads - 1 : numThreads;
145
146 if (numToSchedule > 0) {
147 tasks.scheduleBulk(static_cast<size_t>(numToSchedule), [&boundaries, &f](size_t idx) {
148 return [s = boundaries[idx], e = boundaries[idx + 1], f]() {
149 auto recurseInfo = PerPoolPerThreadInfo::parForRecurse();
150 for (Iter it = s; it != e; ++it) {
151 f(*it);
152 }
153 };
154 });
155 }
156
157 if (options.wait) {
158 for (Iter it = boundaries[numThreads - 1]; it != boundaries[numThreads]; ++it) {
159 f(*it);
160 }
161 tasks.wait();
162 }
163}
164
165} // namespace detail
166
178template <typename TaskSetT, typename Iter, typename F>
179DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
180void for_each_n(TaskSetT& tasks, Iter start, size_t n, F&& f, ForEachOptions options = {}) {
181 // TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
182 // recursive case?
183 if (!n || !options.maxThreads || detail::PerPoolPerThreadInfo::isParForRecursive(&tasks.pool())) {
184 for (size_t i = 0; i < n; ++i) {
185 f(*start);
186 ++start;
187 }
188 if (options.wait) {
189 tasks.wait();
190 }
191 return;
192 }
193
194 // 0 indicates serial execution per API spec
195 int32_t maxThreads = std::max<int32_t>(options.maxThreads, 1);
196
197 ssize_t numThreads = std::min<ssize_t>(tasks.numPoolThreads() + options.wait, maxThreads);
198 // Reduce threads used if they exceed work to be done.
199 numThreads = std::min<ssize_t>(numThreads, n);
200
201 auto chunking = detail::staticChunkSize(n, numThreads);
202 size_t chunkSize = chunking.ceilChunkSize;
203
204 bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
205 ssize_t transitionIdx = chunking.transitionTaskIndex;
206 size_t smallChunkSize = chunkSize - !perfectlyChunked;
207
208 detail::for_each_n_schedule(
209 tasks,
210 start,
211 std::forward<F>(f),
212 numThreads,
213 chunkSize,
214 transitionIdx,
215 smallChunkSize,
216 options,
217 typename std::iterator_traits<Iter>::iterator_category{});
218}
219
231template <typename Iter, typename F>
232DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
233void for_each_n(Iter start, size_t n, F&& f, ForEachOptions options = {}) {
234 TaskSet taskSet(globalThreadPool());
235 options.wait = true;
236 for_each_n(taskSet, start, n, std::forward<F>(f), options);
237}
238
250template <typename TaskSetT, typename Iter, typename F>
251DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
252void for_each(TaskSetT& tasks, Iter start, Iter end, F&& f, ForEachOptions options = {}) {
253 for_each_n(tasks, start, std::distance(start, end), std::forward<F>(f), options);
254}
255
267template <typename Iter, typename F>
268DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
269void for_each(Iter start, Iter end, F&& f, ForEachOptions options = {}) {
270 for_each_n(start, std::distance(start, end), std::forward<F>(f), options);
271}
272
273// TODO(bbudge): Implement ranges versions for these in C++20 (currently not in an env where this
274// can be tested)
275
276} // namespace dispenso