19#include <dispenso/detail/can_invoke.h>
20#include <dispenso/detail/per_thread_info.h>
26#if DISPENSO_HAS_CONCEPTS
33template <
typename F,
typename IntegerT>
34concept ParallelForRangeFunc = std::invocable<F, IntegerT, IntegerT>;
42template <
typename F,
typename IntegerT>
43concept ParallelForIndexFunc = std::invocable<F, IntegerT>;
51template <
typename F,
typename StateRef,
typename IntegerT>
52concept ParallelForStateRangeFunc = std::invocable<F, StateRef, IntegerT, IntegerT>;
60template <
typename F,
typename StateRef,
typename IntegerT>
61concept ParallelForStateIndexFunc = std::invocable<F, StateRef, IntegerT>;
83 uint32_t
maxThreads = std::numeric_limits<int32_t>::max();
125template <
typename IntegerT = s
size_t>
133 using size_type = std::conditional_t<std::is_signed<IntegerT>::value, int64_t, uint64_t>;
137 static constexpr IntegerT kStatic = std::numeric_limits<IntegerT>::max();
146 ChunkedRange(IntegerT s, IntegerT e, IntegerT c) : start(s), end(e), chunk(c) {}
153 ChunkedRange(IntegerT s, IntegerT e, Static) : ChunkedRange(s, e, kStatic) {}
161 ChunkedRange(IntegerT s, IntegerT e, Auto) : ChunkedRange(s, e, 0) {}
163 bool isStatic()
const {
164 return chunk == kStatic;
167 bool isAuto()
const {
175 size_type size()
const {
176 return static_cast<size_type
>(end) - start;
179 template <
typename OtherInt>
180 std::tuple<size_type, size_type>
181 calcChunkSize(OtherInt numLaunched,
bool oneOnCaller, size_type minChunkSize)
const {
182 size_type workingThreads =
static_cast<size_type
>(numLaunched) + size_type{oneOnCaller};
183 assert(workingThreads > 0);
186 size_type dynFactor = std::min<size_type>(16, size() / workingThreads);
189 size_type roughChunks = dynFactor * workingThreads;
190 chunkSize = (size() + roughChunks - 1) / roughChunks;
192 }
while (chunkSize < minChunkSize);
193 return {chunkSize, (size() + chunkSize - 1) / chunkSize};
194 }
else if (chunk == kStatic) {
199 return {chunk, (size() + chunk - 1) / chunk};
214template <
typename IntegerA,
typename IntegerB>
215inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
217 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
218 return (chunking == ParForChunking::kStatic)
219 ? ChunkedRange<IntegerT>(start, end,
typename ChunkedRange<IntegerT>::Static())
220 : ChunkedRange<IntegerT>(start, end,
typename ChunkedRange<IntegerT>::Auto());
230template <
typename IntegerA,
typename IntegerB,
typename IntegerC>
231inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
233 return ChunkedRange<std::common_type_t<IntegerA, IntegerB>>(start, end, chunkSize);
239 using difference_type = std::ptrdiff_t;
240 using value_type = int;
241 using pointer =
int*;
242 using reference =
int&;
243 using iterator_category = std::random_access_iterator_tag;
245 int& operator*()
const {
246 static DISPENSO_THREAD_LOCAL
int dummy = 0;
249 NoOpIter& operator++() {
252 NoOpIter operator++(
int) {
255 NoOpIter& operator--() {
258 NoOpIter operator--(
int) {
261 NoOpIter& operator+=(difference_type) {
264 NoOpIter& operator-=(difference_type) {
267 NoOpIter operator+(difference_type)
const {
270 NoOpIter operator-(difference_type)
const {
273 difference_type operator-(
const NoOpIter&)
const {
276 bool operator==(
const NoOpIter&)
const {
279 bool operator!=(
const NoOpIter&)
const {
282 bool operator<(
const NoOpIter&)
const {
285 int& operator[](difference_type)
const {
286 static DISPENSO_THREAD_LOCAL
int dummy = 0;
291struct NoOpContainer {
292 size_t size()
const {
306 void emplace_back(
int) {}
315 int operator()()
const {
325template <
typename StateContainer,
typename StateGen>
327 StateContainer& states,
328 const StateGen& defaultState,
330 bool reuseExistingState) {
331 if (!reuseExistingState) {
334 for (
size_t i = states.size(); i < numNeeded; ++i) {
335 states.emplace_back(defaultState());
343 typename StateContainer,
345void parallel_for_staticImpl(
347 StateContainer& states,
348 const StateGen& defaultState,
349 const ChunkedRange<IntegerT>& range,
353 bool reuseExistingState) {
354 using size_type =
typename ChunkedRange<IntegerT>::size_type;
356 size_type numThreads = std::min<size_type>(taskSet.numPoolThreads() + wait, maxThreads);
358 numThreads = std::min(numThreads, range.size());
360 detail::initStates(states, defaultState,
static_cast<size_t>(numThreads), reuseExistingState);
363 detail::staticChunkSize(
static_cast<ssize_t
>(range.size()),
static_cast<ssize_t
>(numThreads));
364 IntegerT chunkSize =
static_cast<IntegerT
>(chunking.ceilChunkSize);
366 bool perfectlyChunked =
static_cast<size_type
>(chunking.transitionTaskIndex) == numThreads;
369 size_type numToSchedule = wait ? numThreads - 1 : numThreads;
371 if (numToSchedule > 0) {
375 size_type transitionIdx = perfectlyChunked ? numToSchedule : chunking.transitionTaskIndex;
376 IntegerT smallChunkSize =
static_cast<IntegerT
>(chunkSize - !perfectlyChunked);
378 taskSet.scheduleBulk(
379 static_cast<size_t>(numToSchedule),
380 [&, chunkSize, smallChunkSize, transitionIdx](
size_t idx) {
383 IntegerT thisChunkSize;
384 if (
static_cast<size_type
>(idx) < transitionIdx) {
385 IntegerT i =
static_cast<IntegerT
>(idx);
386 start =
static_cast<IntegerT
>(range.start +
static_cast<IntegerT
>(i * chunkSize));
387 thisChunkSize = chunkSize;
390 IntegerT ti =
static_cast<IntegerT
>(transitionIdx);
391 IntegerT ri =
static_cast<IntegerT
>(idx - transitionIdx);
392 start =
static_cast<IntegerT
>(
393 range.start +
static_cast<IntegerT
>(ti * chunkSize) +
394 static_cast<IntegerT
>(ri * smallChunkSize));
395 thisChunkSize = smallChunkSize;
397 IntegerT end =
static_cast<IntegerT
>(start + thisChunkSize);
399 auto stateIt = states.begin();
400 std::advance(stateIt,
static_cast<ptrdiff_t
>(idx));
402 return [it = stateIt, start, end, f]() {
403 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
411 auto stateIt = states.begin();
412 std::advance(stateIt,
static_cast<ptrdiff_t
>(numThreads - 1));
414 size_type transitionIdx = perfectlyChunked ? numThreads - 1 : chunking.transitionTaskIndex;
415 IntegerT smallChunkSize =
static_cast<IntegerT
>(chunkSize - !perfectlyChunked);
417 if (numThreads - 1 < transitionIdx) {
418 IntegerT i =
static_cast<IntegerT
>(numThreads - 1);
419 lastStart =
static_cast<IntegerT
>(range.start +
static_cast<IntegerT
>(i * chunkSize));
421 IntegerT ti =
static_cast<IntegerT
>(transitionIdx);
422 IntegerT ri =
static_cast<IntegerT
>(numThreads - 1 - transitionIdx);
423 lastStart =
static_cast<IntegerT
>(
424 range.start +
static_cast<IntegerT
>(ti * chunkSize) +
425 static_cast<IntegerT
>(ri * smallChunkSize));
427 f(*stateIt, lastStart, range.end);
432template <
typename IntegerT>
433struct ChunkSizingResult {
434 typename ChunkedRange<IntegerT>::size_type maxThreads;
450template <
typename IntegerT>
451ChunkSizingResult<IntegerT> adjustChunkSizing(
452 const ChunkedRange<IntegerT>& range,
453 typename ChunkedRange<IntegerT>::size_type maxThreads,
455 uint32_t minItemsPerChunk,
456 typename ChunkedRange<IntegerT>::size_type poolThreads,
458 using size_type =
typename ChunkedRange<IntegerT>::size_type;
461 maxThreads = std::min<size_type>(maxThreads, poolThreads + wait);
463 if (minItemsPerChunk > 1) {
465 size_type maxWorkers = range.size() / minItemsPerChunk;
466 if (maxWorkers < maxThreads) {
467 maxThreads = maxWorkers;
470 if (maxThreads > 0 && range.size() / (maxThreads + wait) < minItemsPerChunk && range.isAuto()) {
473 }
else if (range.size() <= poolThreads + wait) {
475 if (range.isAuto()) {
477 }
else if (!range.isStatic()) {
478 maxThreads = range.size() - wait;
482 return {maxThreads, isStatic};
500 typename StateContainer,
503void parallel_for_dynamicImpl(
505 StateContainer& states,
510 typename ChunkedRange<IntegerT>::size_type chunkSize,
511 typename ChunkedRange<IntegerT>::size_type numChunks,
513 ExitAction exitAction,
515 auto worker = [start, end, &index, f, chunkSize, numChunks, exitAction](
auto& s) {
516 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
518 auto cur = index.fetch_add(1, std::memory_order_relaxed);
519 if (cur >= numChunks) {
523 auto sidx =
static_cast<IntegerT
>(start + cur * chunkSize);
524 if (cur + 1 == numChunks) {
527 f(s, sidx,
static_cast<IntegerT
>(sidx + chunkSize));
532 taskSet.scheduleBulk(
static_cast<size_t>(numToLaunch), [&states, &worker](
size_t i) {
533 auto it = states.begin();
534 std::advance(it,
static_cast<ptrdiff_t
>(i));
535 return [&s = *it, worker]() { worker(s); };
539 auto it = states.begin();
540 std::advance(it,
static_cast<ptrdiff_t
>(numToLaunch));
568 typename StateContainer,
572 StateContainer& states,
573 const StateGen& defaultState,
574 const ChunkedRange<IntegerT>& range,
584 using size_type =
typename ChunkedRange<IntegerT>::size_type;
586 uint32_t minItemsPerChunk = std::max<uint32_t>(1, options.minItemsPerChunk);
587 size_type maxThreads = std::max<int32_t>(options.maxThreads, 1);
588 bool isStatic = range.isStatic();
590 const size_type N = taskSet.numPoolThreads();
591 if (N == 0 || !options.maxThreads || range.size() <= minItemsPerChunk ||
592 detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
593 detail::initStates(states, defaultState, 1, options.reuseExistingState);
594 f(*states.begin(), range.start, range.end);
602 detail::adjustChunkSizing(range, maxThreads, isStatic, minItemsPerChunk, N, options.wait);
603 maxThreads = chunkSizing.maxThreads;
604 isStatic = chunkSizing.isStatic;
607 if (maxThreads < 2) {
608 detail::initStates(states, defaultState, 1, options.reuseExistingState);
609 f(*states.begin(), range.start, range.end);
617 detail::parallel_for_staticImpl(
623 static_cast<ssize_t
>(maxThreads),
625 options.reuseExistingState);
629 const size_type numToLaunch = std::min<size_type>(maxThreads - options.wait, N);
634 static_cast<size_t>(numToLaunch + options.wait),
635 options.reuseExistingState);
637 if (numToLaunch == 1 && !options.wait) {
639 [&s = states.front(), range, f = std::move(f)]() { f(s, range.start, range.end); });
643 auto chunkInfo = range.calcChunkSize(numToLaunch, options.wait, minItemsPerChunk);
644 auto chunkSize = std::get<0>(chunkInfo);
645 auto numChunks = std::get<1>(chunkInfo);
648 alignas(
kCacheLineSize) std::atomic<
decltype(numChunks)> index(0);
649 detail::parallel_for_dynamicImpl(
655 static_cast<size_t>(numToLaunch),
662 using SizeType =
decltype(numChunks);
664 std::atomic<SizeType> index;
666 static_assert(
sizeof(ChunkIndex) <=
kCacheLineSize,
"ChunkIndex must fit in one cache line");
667 char* mem = allocSmallBuffer<kCacheLineSize>();
668 auto* ci =
new (mem) ChunkIndex{{0}};
669 SizeType lastExit = numChunks +
static_cast<SizeType
>(numToLaunch) - 1;
670 detail::parallel_for_dynamicImpl(
676 static_cast<size_t>(numToLaunch),
680 [ci, lastExit](
auto cur) {
681 if (cur == lastExit) {
682 deallocSmallBuffer<kCacheLineSize>(ci);
698template <
typename TaskSetT,
typename IntegerT,
typename F>
699DISPENSO_REQUIRES(ParallelForRangeFunc<F, IntegerT>)
702 const ChunkedRange<IntegerT>& range,
705 detail::NoOpContainer container;
709 detail::NoOpStateGen(),
711 [f = std::move(f)](
int ,
auto i,
auto j) { f(i, j); },
724template <
typename IntegerT,
typename F>
725DISPENSO_REQUIRES(ParallelForRangeFunc<F, IntegerT>)
726void parallel_for(const ChunkedRange<IntegerT>& range, F&& f,
ParForOptions options = {}) {
727 TaskSet taskSet(globalThreadPool());
729 parallel_for(taskSet, range, std::forward<F>(f), options);
749template <
typename F,
typename IntegerT,
typename StateContainer,
typename StateGen>
751 StateContainer& states,
752 const StateGen& defaultState,
753 const ChunkedRange<IntegerT>& range,
756 TaskSet taskSet(globalThreadPool());
758 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
776 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
777 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
778 std::enable_if_t<detail::CanInvoke<F(IntegerA)>::value,
bool> =
true>
785 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
787 auto range = makeChunkedRange(start, end, options.defaultChunking);
791 [f = std::move(f)](IntegerT s, IntegerT e) {
792 for (IntegerT i = s; i < e; ++i) {
805 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
806 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
807 std::enable_if_t<detail::CanInvoke<F(IntegerA, IntegerB)>::value,
bool> =
true>
813 ParForOptions options = {}) {
815 parallel_for(taskSet, range, std::forward<F>(f), options);
832 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
833 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
835 TaskSet taskSet(globalThreadPool());
837 parallel_for(taskSet, start, end, std::forward<F>(f), options);
863 typename StateContainer,
865 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
866 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
868 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA)>::value,
872 StateContainer& states,
873 const StateGen& defaultState,
878 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
879 auto range = makeChunkedRange(start, end, options.defaultChunking);
885 [f = std::move(f)](
auto& state, IntegerT s, IntegerT e) {
886 for (IntegerT i = s; i < e; ++i) {
899 typename StateContainer,
901 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
902 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
904 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA, IntegerB)>::value,
908 StateContainer& states,
909 const StateGen& defaultState,
913 ParForOptions options = {}) {
915 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
941 typename StateContainer,
943 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
944 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
946 StateContainer& states,
947 const StateGen& defaultState,
952 TaskSet taskSet(globalThreadPool());
954 parallel_for(taskSet, states, defaultState, start, end, std::forward<F>(f), options);
void parallel_for(TaskSetT &taskSet, StateContainer &states, const StateGen &defaultState, const ChunkedRange< IntegerT > &range, F &&f, ParForOptions options={})
ChunkedRange< std::common_type_t< IntegerA, IntegerB > > makeChunkedRange(IntegerA start, IntegerB end, ParForChunking chunking=ParForChunking::kStatic)
uint32_t minItemsPerChunk
ParForChunking defaultChunking