22#include "detail/can_invoke.h"
23#include "detail/par_for_stripe.h"
24#include "detail/per_thread_info.h"
28#if DISPENSO_HAS_CONCEPTS
35template <
typename F,
typename IntegerT>
36concept ParallelForRangeFunc = std::invocable<F, IntegerT, IntegerT>;
44template <
typename F,
typename IntegerT>
45concept ParallelForIndexFunc = std::invocable<F, IntegerT>;
53template <
typename F,
typename StateRef,
typename IntegerT>
54concept ParallelForStateRangeFunc = std::invocable<F, StateRef, IntegerT, IntegerT>;
62template <
typename F,
typename StateRef,
typename IntegerT>
63concept ParallelForStateIndexFunc = std::invocable<F, StateRef, IntegerT>;
86 kAuto DISPENSO_DEPRECATED(
"Use ParForChunking::kAdaptive (kAuto will be removed in 2.0).") =
99 uint32_t
maxThreads = std::numeric_limits<int32_t>::max();
168template <
typename IntegerT = s
size_t>
176 using size_type = std::conditional_t<std::is_signed<IntegerT>::value, int64_t, uint64_t>;
180 static constexpr IntegerT kStatic = std::numeric_limits<IntegerT>::max();
189 ChunkedRange(IntegerT s, IntegerT e, IntegerT c) : start(s), end(e), chunk(c) {}
196 ChunkedRange(IntegerT s, IntegerT e, Static) : ChunkedRange(s, e, kStatic) {}
204 ChunkedRange(IntegerT s, IntegerT e, Auto) : ChunkedRange(s, e, 0) {}
206 bool isStatic()
const {
207 return chunk == kStatic;
210 bool isAuto()
const {
218 size_type size()
const {
219 return static_cast<size_type
>(end) - start;
222 template <
typename OtherInt>
223 std::tuple<size_type, size_type> calcChunkSize(
224 OtherInt numLaunched,
226 size_type minChunkSize,
227 uint32_t granularity = 1,
228 size_type maxDynFactor = 16)
const {
229 size_type workingThreads =
static_cast<size_type
>(numLaunched) + size_type{oneOnCaller};
230 assert(workingThreads > 0);
233 size_type dynFactor = std::min<size_type>(maxDynFactor, size() / workingThreads);
236 size_type roughChunks = dynFactor * workingThreads;
237 chunkSize = (size() + roughChunks - 1) / roughChunks;
238 if (granularity > 1) {
240 chunkSize = ((chunkSize + granularity - 1) / granularity) * granularity;
243 }
while (chunkSize < minChunkSize);
244 return {chunkSize, (size() + chunkSize - 1) / chunkSize};
245 }
else if (chunk == kStatic) {
250 return {chunk, (size() + chunk - 1) / chunk};
265template <
typename IntegerA,
typename IntegerB>
266inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
268 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
269 return (chunking == ParForChunking::kStatic)
270 ? ChunkedRange<IntegerT>(start, end,
typename ChunkedRange<IntegerT>::Static())
271 : ChunkedRange<IntegerT>(start, end,
typename ChunkedRange<IntegerT>::Auto());
281template <
typename IntegerA,
typename IntegerB,
typename IntegerC>
282inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
284 return ChunkedRange<std::common_type_t<IntegerA, IntegerB>>(start, end, chunkSize);
290 using difference_type = std::ptrdiff_t;
291 using value_type = int;
292 using pointer =
int*;
293 using reference =
int&;
294 using iterator_category = std::random_access_iterator_tag;
296 int& operator*()
const {
297 static DISPENSO_THREAD_LOCAL
int dummy = 0;
300 NoOpIter& operator++() {
303 NoOpIter operator++(
int) {
306 NoOpIter& operator--() {
309 NoOpIter operator--(
int) {
312 NoOpIter& operator+=(difference_type) {
315 NoOpIter& operator-=(difference_type) {
318 NoOpIter operator+(difference_type)
const {
321 NoOpIter operator-(difference_type)
const {
324 difference_type operator-(
const NoOpIter&)
const {
327 bool operator==(
const NoOpIter&)
const {
330 bool operator!=(
const NoOpIter&)
const {
333 bool operator<(
const NoOpIter&)
const {
336 int& operator[](difference_type)
const {
337 static DISPENSO_THREAD_LOCAL
int dummy = 0;
342struct NoOpContainer {
343 size_t size()
const {
357 void emplace_back(
int) {}
366 int operator()()
const {
372template <
typename IntegerT>
373inline IntegerT roundDownToGranularity(IntegerT size, uint32_t granularity) {
374 if (granularity <= 1) {
377 using U =
typename std::make_unsigned<IntegerT>::type;
378 return static_cast<IntegerT
>((
static_cast<U
>(size) / granularity) * granularity);
382template <
typename IntegerT>
383inline IntegerT roundUpToGranularity(IntegerT size, uint32_t granularity) {
384 if (granularity <= 1) {
387 using U =
typename std::make_unsigned<IntegerT>::type;
388 return static_cast<IntegerT
>(
389 ((
static_cast<U
>(size) + granularity - 1) / granularity) * granularity);
395template <
typename StateContainer,
typename StateGen>
397 StateContainer& states,
398 const StateGen& defaultState,
400 bool reuseExistingState) {
401 if (!reuseExistingState) {
404 for (
size_t i = states.size(); i < numNeeded; ++i) {
405 states.emplace_back(defaultState());
409template <
typename IntegerT>
410struct ChunkSizingResult {
411 typename ChunkedRange<IntegerT>::size_type maxThreads;
416template <
typename IntegerT>
417ChunkSizingResult<IntegerT> adjustChunkSizing(
418 const ChunkedRange<IntegerT>& range,
419 typename ChunkedRange<IntegerT>::size_type maxThreads,
421 uint32_t minItemsPerChunk,
422 typename ChunkedRange<IntegerT>::size_type poolThreads,
424 using size_type =
typename ChunkedRange<IntegerT>::size_type;
426 maxThreads = std::min<size_type>(maxThreads, poolThreads + 1);
428 if (minItemsPerChunk > 1) {
429 size_type maxWorkers = range.size() / minItemsPerChunk;
430 if (maxWorkers < maxThreads) {
431 maxThreads = maxWorkers;
433 if (maxThreads > 0 && range.size() / (maxThreads + wait) < minItemsPerChunk && range.isAuto()) {
436 }
else if (range.size() <= poolThreads + wait) {
437 if (range.isAuto()) {
439 }
else if (!range.isStatic()) {
440 maxThreads = range.size() - wait;
444 return {maxThreads, isStatic};
448template <
typename IntegerT>
449struct GranularityInfo {
450 uint32_t granularity;
455template <
typename IntegerT>
456GranularityInfo<IntegerT> computeGranularity(
457 const ChunkedRange<IntegerT>& range,
458 uint32_t requested) {
459 using size_type =
typename ChunkedRange<IntegerT>::size_type;
460 uint32_t granularity = (range.chunk == 0 || range.chunk == ChunkedRange<IntegerT>::kStatic)
461 ? std::max<uint32_t>(1, requested)
464 IntegerT trimmedEnd = range.end;
465 bool hasTail =
false;
466 if (granularity > 1) {
467 size_type rem = range.size() % granularity;
469 trimmedEnd =
static_cast<IntegerT
>(range.end -
static_cast<IntegerT
>(rem));
473 return {granularity, trimmedEnd, hasTail};
477template <
typename TaskSetT,
typename IntegerT,
typename F,
typename StateContainer>
478void parallel_for_adaptiveWaitDispatch(
480 StateContainer& states,
481 const ChunkedRange<IntegerT>& parRange,
484 uint32_t minItemsPerChunk,
485 uint32_t granularity) {
486 using size_type =
typename ChunkedRange<IntegerT>::size_type;
487 size_type numStripeWorkers =
static_cast<size_type
>(numToLaunch) + 1;
488 auto adaptiveChunkInfo =
489 parRange.calcChunkSize(numToLaunch,
true, minItemsPerChunk, granularity, 64);
490 auto adaptiveChunkSize = std::get<0>(adaptiveChunkInfo);
492 detail::StripeState<IntegerT> stripeState;
493 detail::initStripeState(
497 static_cast<uint32_t
>(numStripeWorkers),
498 static_cast<IntegerT
>(adaptiveChunkSize),
500 auto stateBegin = states.begin();
501 auto worker = [&stripeState, &f](
auto& userState, uint32_t myIdx) {
502 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
503 detail::runStripeWorker(stripeState, myIdx, userState, f);
505 if (numToLaunch > 0) {
506 taskSet.scheduleBulk(numToLaunch, [stateBegin, worker](
size_t idx) {
507 auto stateIt = stateBegin;
508 std::advance(stateIt,
static_cast<ptrdiff_t
>(idx));
509 uint32_t myIdx =
static_cast<uint32_t
>(idx);
510 return [&userState = *stateIt, myIdx, worker]() { worker(userState, myIdx); };
513 auto callerIt = states.begin();
514 std::advance(callerIt,
static_cast<ptrdiff_t
>(numToLaunch));
515 worker(*callerIt,
static_cast<uint32_t
>(numToLaunch));
525#include "detail/par_for_dynamic.h"
526#include "detail/par_for_static.h"
553 typename StateContainer,
557 StateContainer& states,
558 const StateGen& defaultState,
559 const ChunkedRange<IntegerT>& range,
569 using size_type =
typename ChunkedRange<IntegerT>::size_type;
571 auto granInfo = detail::computeGranularity(range, options.granularity);
572 uint32_t granularity = granInfo.granularity;
573 IntegerT trimmedEnd = granInfo.trimmedEnd;
574 bool hasTail = granInfo.hasTail;
576 auto runTail = [&]() {
578 f(*states.begin(), trimmedEnd, range.end);
582 uint32_t minItemsPerChunk = std::max<uint32_t>(1, options.minItemsPerChunk);
583 size_type maxThreads = std::max<int32_t>(options.maxThreads, 1);
584 bool isStatic = range.isStatic();
586 ChunkedRange<IntegerT> parRange = range;
587 parRange.end = trimmedEnd;
589 const size_type N = taskSet.numPoolThreads();
593 if (parRange.empty() || N == 0 ||
594 detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
595 detail::initStates(states, defaultState, 1, options.reuseExistingState);
596 f(*states.begin(), range.start, range.end);
604 detail::adjustChunkSizing(parRange, maxThreads, isStatic, minItemsPerChunk, N, options.wait);
605 maxThreads = chunkSizing.maxThreads;
606 isStatic = chunkSizing.isStatic;
608 if (maxThreads < 2) {
609 detail::initStates(states, defaultState, 1, options.reuseExistingState);
610 f(*states.begin(), range.start, range.end);
618 detail::parallel_for_staticImpl(
624 static_cast<ssize_t
>(maxThreads),
626 options.reuseExistingState,
632 const size_type numToLaunch = std::min<size_type>(maxThreads - options.wait, N);
637 static_cast<size_t>(numToLaunch + options.wait),
638 options.reuseExistingState);
640 bool useAdaptive = range.chunk == 0;
642 auto chunkInfo = parRange.calcChunkSize(numToLaunch, options.wait, minItemsPerChunk, granularity);
643 auto chunkSize = std::get<0>(chunkInfo);
644 auto numChunks = std::get<1>(chunkInfo);
646 if (useAdaptive && options.wait) {
647 detail::parallel_for_adaptiveWaitDispatch(
652 static_cast<size_t>(numToLaunch),
660 alignas(
kCacheLineSize) std::atomic<
decltype(numChunks)> index(0);
661 detail::parallel_for_dynamicImpl(
667 static_cast<size_t>(numToLaunch),
675 detail::parallel_for_dynamicNoWaitDispatch(
680 static_cast<size_t>(numToLaunch),
697template <
typename TaskSetT,
typename IntegerT,
typename F>
698DISPENSO_REQUIRES(ParallelForRangeFunc<F, IntegerT>)
701 const ChunkedRange<IntegerT>& range,
704 detail::NoOpContainer container;
708 detail::NoOpStateGen(),
710 [f = std::move(f)](
int ,
auto i,
auto j) { f(i, j); },
723template <
typename IntegerT,
typename F>
724DISPENSO_REQUIRES(ParallelForRangeFunc<F, IntegerT>)
725void parallel_for(const ChunkedRange<IntegerT>& range, F&& f,
ParForOptions options = {}) {
726 TaskSet taskSet(globalThreadPool());
728 parallel_for(taskSet, range, std::forward<F>(f), options);
748template <
typename F,
typename IntegerT,
typename StateContainer,
typename StateGen>
750 StateContainer& states,
751 const StateGen& defaultState,
752 const ChunkedRange<IntegerT>& range,
755 TaskSet taskSet(globalThreadPool());
757 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
770#if DISPENSO_HAS_CONCEPTS
771template <
typename TaskSetT, std::
integral IntegerA, std::
integral IntegerB,
typename F>
772 requires std::invocable<F, IntegerA>
779 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
780 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
781 std::enable_if_t<detail::CanInvoke<F(IntegerA)>::value,
bool> =
true>
789 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
791 auto range = makeChunkedRange(start, end, options.defaultChunking);
795 [f = std::move(f)](IntegerT s, IntegerT e) {
796 for (IntegerT i = s; i < e; ++i) {
804#if DISPENSO_HAS_CONCEPTS
805template <
typename TaskSetT, std::
integral IntegerA, std::
integral IntegerB,
typename F>
806 requires std::invocable<F, IntegerA, IntegerB>
813 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
814 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
815 std::enable_if_t<detail::CanInvoke<F(IntegerA, IntegerB)>::value,
bool> =
true>
822 ParForOptions options = {}) {
824 parallel_for(taskSet, range, std::forward<F>(f), options);
837#if DISPENSO_HAS_CONCEPTS
838template <std::
integral IntegerA, std::
integral IntegerB,
typename F>
844 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
845 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
848 TaskSet taskSet(globalThreadPool());
850 parallel_for(taskSet, start, end, std::forward<F>(f), options);
871#if DISPENSO_HAS_CONCEPTS
874 std::integral IntegerA,
875 std::integral IntegerB,
877 typename StateContainer,
879 requires std::invocable<F, typename StateContainer::reference, IntegerA>
886 typename StateContainer,
888 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
889 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
891 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA)>::value,
896 StateContainer& states,
897 const StateGen& defaultState,
902 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
903 auto range = makeChunkedRange(start, end, options.defaultChunking);
909 [f = std::move(f)](
auto& state, IntegerT s, IntegerT e) {
910 for (IntegerT i = s; i < e; ++i) {
918#if DISPENSO_HAS_CONCEPTS
921 std::integral IntegerA,
922 std::integral IntegerB,
924 typename StateContainer,
926 requires std::invocable<F, typename StateContainer::reference, IntegerA, IntegerB>
933 typename StateContainer,
935 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
936 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
938 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA, IntegerB)>::value,
943 StateContainer& states,
944 const StateGen& defaultState,
948 ParForOptions options = {}) {
950 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
972#if DISPENSO_HAS_CONCEPTS
974 std::integral IntegerA,
975 std::integral IntegerB,
977 typename StateContainer,
984 typename StateContainer,
986 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
987 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
990 StateContainer& states,
991 const StateGen& defaultState,
996 TaskSet taskSet(globalThreadPool());
998 parallel_for(taskSet, states, defaultState, start, end, std::forward<F>(f), options);
void parallel_for(TaskSetT &taskSet, StateContainer &states, const StateGen &defaultState, const ChunkedRange< IntegerT > &range, F &&f, ParForOptions options={})
ChunkedRange< std::common_type_t< IntegerA, IntegerB > > makeChunkedRange(IntegerA start, IntegerB end, ParForChunking chunking=ParForChunking::kStatic)
uint32_t minItemsPerChunk
ParForChunking defaultChunking