20#include <dispenso/detail/can_invoke.h>
21#include <dispenso/detail/per_thread_info.h>
26#if DISPENSO_HAS_CONCEPTS
33template <
typename F,
typename IntegerT>
34concept ParallelForRangeFunc = std::invocable<F, IntegerT, IntegerT>;
42template <
typename F,
typename IntegerT>
43concept ParallelForIndexFunc = std::invocable<F, IntegerT>;
51template <
typename F,
typename StateRef,
typename IntegerT>
52concept ParallelForStateRangeFunc = std::invocable<F, StateRef, IntegerT, IntegerT>;
60template <
typename F,
typename StateRef,
typename IntegerT>
61concept ParallelForStateIndexFunc = std::invocable<F, StateRef, IntegerT>;
83 uint32_t
maxThreads = std::numeric_limits<int32_t>::max();
125template <
typename IntegerT = s
size_t>
133 using size_type = std::conditional_t<std::is_signed<IntegerT>::value, int64_t, uint64_t>;
137 static constexpr IntegerT kStatic = std::numeric_limits<IntegerT>::max();
146 ChunkedRange(IntegerT s, IntegerT e, IntegerT c) : start(s), end(e), chunk(c) {}
153 ChunkedRange(IntegerT s, IntegerT e, Static) : ChunkedRange(s, e, kStatic) {}
161 ChunkedRange(IntegerT s, IntegerT e, Auto) : ChunkedRange(s, e, 0) {}
163 bool isStatic()
const {
164 return chunk == kStatic;
167 bool isAuto()
const {
175 size_type size()
const {
176 return static_cast<size_type
>(end) - start;
179 template <
typename OtherInt>
180 std::tuple<size_type, size_type>
181 calcChunkSize(OtherInt numLaunched,
bool oneOnCaller, size_type minChunkSize)
const {
182 size_type workingThreads =
static_cast<size_type
>(numLaunched) + size_type{oneOnCaller};
183 assert(workingThreads > 0);
186 size_type dynFactor = std::min<size_type>(16, size() / workingThreads);
189 size_type roughChunks = dynFactor * workingThreads;
190 chunkSize = (size() + roughChunks - 1) / roughChunks;
192 }
while (chunkSize < minChunkSize);
193 return {chunkSize, (size() + chunkSize - 1) / chunkSize};
194 }
else if (chunk == kStatic) {
199 return {chunk, (size() + chunk - 1) / chunk};
214template <
typename IntegerA,
typename IntegerB>
215inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
217 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
218 return (chunking == ParForChunking::kStatic)
219 ? ChunkedRange<IntegerT>(start, end,
typename ChunkedRange<IntegerT>::Static())
220 : ChunkedRange<IntegerT>(start, end,
typename ChunkedRange<IntegerT>::Auto());
230template <
typename IntegerA,
typename IntegerB,
typename IntegerC>
231inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
233 return ChunkedRange<std::common_type_t<IntegerA, IntegerB>>(start, end, chunkSize);
239 int& operator*()
const {
243 NoOpIter& operator++() {
246 NoOpIter operator++(
int) {
251struct NoOpContainer {
252 size_t size()
const {
266 void emplace_back(
int) {}
275 int operator()()
const {
284 typename StateContainer,
286void parallel_for_staticImpl(
288 StateContainer& states,
289 const StateGen& defaultState,
290 const ChunkedRange<IntegerT>& range,
294 bool reuseExistingState) {
295 using size_type =
typename ChunkedRange<IntegerT>::size_type;
297 size_type numThreads = std::min<size_type>(taskSet.numPoolThreads() + wait, maxThreads);
299 numThreads = std::min(numThreads, range.size());
301 if (!reuseExistingState) {
305 size_t numToEmplace = states.size() <
static_cast<size_t>(numThreads)
306 ?
static_cast<size_t>(numThreads) - states.size()
309 for (; numToEmplace--;) {
310 states.emplace_back(defaultState());
314 detail::staticChunkSize(
static_cast<ssize_t
>(range.size()),
static_cast<ssize_t
>(numThreads));
315 IntegerT chunkSize =
static_cast<IntegerT
>(chunking.ceilChunkSize);
317 bool perfectlyChunked =
static_cast<size_type
>(chunking.transitionTaskIndex) == numThreads;
320 size_type firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
322 auto stateIt = states.begin();
323 IntegerT start = range.start;
325 for (t = 0; t < firstLoopLen; ++t) {
326 IntegerT next =
static_cast<IntegerT
>(start + chunkSize);
327 taskSet.schedule([it = stateIt++, start, next, f]() {
328 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
335 chunkSize =
static_cast<IntegerT
>(chunkSize - !perfectlyChunked);
337 for (; t < numThreads - 1; ++t) {
338 IntegerT next =
static_cast<IntegerT
>(start + chunkSize);
339 taskSet.schedule([it = stateIt++, start, next, f]() {
340 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
347 f(*stateIt, start, range.end);
351 [stateIt, start, end = range.end, f]() {
352 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
353 f(*stateIt, start, end);
381 typename StateContainer,
385 StateContainer& states,
386 const StateGen& defaultState,
387 const ChunkedRange<IntegerT>& range,
397 using size_type =
typename ChunkedRange<IntegerT>::size_type;
400 uint32_t minItemsPerChunk = std::max<uint32_t>(1, options.minItemsPerChunk);
403 size_type maxThreads = std::max<int32_t>(options.maxThreads, 1);
405 bool isStatic = range.isStatic();
407 const size_type N = taskSet.numPoolThreads();
408 if (N == 0 || !options.maxThreads || range.size() <= minItemsPerChunk ||
409 detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
410 if (!options.reuseExistingState) {
413 if (states.empty()) {
414 states.emplace_back(defaultState());
416 f(*states.begin(), range.start, range.end);
424 if (minItemsPerChunk > 1) {
425 size_type maxWorkers = range.size() / minItemsPerChunk;
426 if (maxWorkers < maxThreads) {
427 maxThreads =
static_cast<uint32_t
>(maxWorkers);
429 if (range.size() / (maxThreads + options.wait) < minItemsPerChunk && range.isAuto()) {
432 }
else if (range.size() <= N + options.wait) {
433 if (range.isAuto()) {
435 }
else if (!range.isStatic()) {
436 maxThreads = range.size() - options.wait;
441 detail::parallel_for_staticImpl(
447 static_cast<ssize_t
>(maxThreads),
449 options.reuseExistingState);
454 const size_type numToLaunch = std::min<size_type>(maxThreads - options.wait, N);
456 if (!options.reuseExistingState) {
460 size_t numToEmplace =
static_cast<size_type
>(states.size()) < (numToLaunch + options.wait)
461 ? (
static_cast<size_t>(numToLaunch) + options.wait) - states.size()
463 for (; numToEmplace--;) {
464 states.emplace_back(defaultState());
467 if (numToLaunch == 1 && !options.wait) {
469 [&s = states.front(), range, f = std::move(f)]() { f(s, range.start, range.end); });
474 auto chunkInfo = range.calcChunkSize(numToLaunch, options.wait, minItemsPerChunk);
475 auto chunkSize = std::get<0>(chunkInfo);
476 auto numChunks = std::get<1>(chunkInfo);
479 alignas(
kCacheLineSize) std::atomic<
decltype(numChunks)> index(0);
480 auto worker = [start = range.start, end = range.end, &index, f, chunkSize, numChunks](
auto& s) {
481 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
484 auto cur = index.fetch_add(1, std::memory_order_relaxed);
485 if (cur >= numChunks) {
488 auto sidx =
static_cast<IntegerT
>(start + cur * chunkSize);
489 if (cur + 1 == numChunks) {
492 auto eidx =
static_cast<IntegerT
>(sidx + chunkSize);
498 auto it = states.begin();
499 for (size_type i = 0; i < numToLaunch; ++i) {
500 taskSet.schedule([&s = *it++, worker]() { worker(s); });
506 Atomic() : index(0) {}
511 void* ptr = detail::alignedMalloc(
sizeof(Atomic),
alignof(Atomic));
512 auto* atm =
new (ptr) Atomic();
514 std::shared_ptr<Atomic> wrapper(atm, detail::AlignedFreeDeleter<Atomic>());
515 auto worker = [start = range.start,
517 wrapper = std::move(wrapper),
520 numChunks](
auto& s) {
521 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
523 auto cur = wrapper->index.fetch_add(1, std::memory_order_relaxed);
524 if (cur >= numChunks) {
527 auto sidx =
static_cast<IntegerT
>(start + cur * chunkSize);
528 if (cur + 1 == numChunks) {
531 auto eidx =
static_cast<IntegerT
>(sidx + chunkSize);
537 auto it = states.begin();
538 for (size_type i = 0; i < numToLaunch; ++i) {
539 taskSet.schedule([&s = *it++, worker]() { worker(s); }, ForceQueuingTag());
553template <
typename TaskSetT,
typename IntegerT,
typename F>
554DISPENSO_REQUIRES(ParallelForRangeFunc<F, IntegerT>)
557 const ChunkedRange<IntegerT>& range,
560 detail::NoOpContainer container;
564 detail::NoOpStateGen(),
566 [f = std::move(f)](
int ,
auto i,
auto j) { f(i, j); },
579template <
typename IntegerT,
typename F>
580DISPENSO_REQUIRES(ParallelForRangeFunc<F, IntegerT>)
581void parallel_for(const ChunkedRange<IntegerT>& range, F&& f,
ParForOptions options = {}) {
582 TaskSet taskSet(globalThreadPool());
584 parallel_for(taskSet, range, std::forward<F>(f), options);
604template <
typename F,
typename IntegerT,
typename StateContainer,
typename StateGen>
606 StateContainer& states,
607 const StateGen& defaultState,
608 const ChunkedRange<IntegerT>& range,
611 TaskSet taskSet(globalThreadPool());
613 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
631 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
632 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
633 std::enable_if_t<detail::CanInvoke<F(IntegerA)>::value,
bool> =
true>
640 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
642 auto range = makeChunkedRange(start, end, options.defaultChunking);
646 [f = std::move(f)](IntegerT s, IntegerT e) {
647 for (IntegerT i = s; i < e; ++i) {
660 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
661 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
662 std::enable_if_t<detail::CanInvoke<F(IntegerA, IntegerB)>::value,
bool> =
true>
668 ParForOptions options = {}) {
670 parallel_for(taskSet, range, std::forward<F>(f), options);
687 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
688 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
690 TaskSet taskSet(globalThreadPool());
692 parallel_for(taskSet, start, end, std::forward<F>(f), options);
718 typename StateContainer,
720 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
721 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
723 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA)>::value,
727 StateContainer& states,
728 const StateGen& defaultState,
733 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
734 auto range = makeChunkedRange(start, end, options.defaultChunking);
740 [f = std::move(f)](
auto& state, IntegerT s, IntegerT e) {
741 for (IntegerT i = s; i < e; ++i) {
754 typename StateContainer,
756 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
757 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
759 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA, IntegerB)>::value,
763 StateContainer& states,
764 const StateGen& defaultState,
768 ParForOptions options = {}) {
770 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
796 typename StateContainer,
798 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
799 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
801 StateContainer& states,
802 const StateGen& defaultState,
807 TaskSet taskSet(globalThreadPool());
809 parallel_for(taskSet, states, defaultState, start, end, std::forward<F>(f), options);
void parallel_for(TaskSetT &taskSet, StateContainer &states, const StateGen &defaultState, const ChunkedRange< IntegerT > &range, F &&f, ParForOptions options={})
ChunkedRange< std::common_type_t< IntegerA, IntegerB > > makeChunkedRange(IntegerA start, IntegerB end, ParForChunking chunking=ParForChunking::kStatic)
uint32_t minItemsPerChunk
ParForChunking defaultChunking