20#include <dispenso/detail/can_invoke.h>
21#include <dispenso/detail/per_thread_info.h>
45 uint32_t
maxThreads = std::numeric_limits<int32_t>::max();
87template <
typename IntegerT = s
size_t>
95 using size_type = std::conditional_t<std::is_signed<IntegerT>::value, int64_t, uint64_t>;
99 static constexpr IntegerT kStatic = std::numeric_limits<IntegerT>::max();
108 ChunkedRange(IntegerT s, IntegerT e, IntegerT c) : start(s), end(e), chunk(c) {}
125 bool isStatic()
const {
126 return chunk == kStatic;
129 bool isAuto()
const {
137 size_type size()
const {
138 return static_cast<size_type
>(end) - start;
141 template <
typename OtherInt>
142 std::tuple<size_type, size_type>
143 calcChunkSize(OtherInt numLaunched,
bool oneOnCaller, size_type minChunkSize)
const {
144 size_type workingThreads =
static_cast<size_type
>(numLaunched) + size_type{oneOnCaller};
145 assert(workingThreads > 0);
148 size_type dynFactor = std::min<size_type>(16, size() / workingThreads);
151 size_type roughChunks = dynFactor * workingThreads;
152 chunkSize = (size() + roughChunks - 1) / roughChunks;
154 }
while (chunkSize < minChunkSize);
155 return {chunkSize, (size() + chunkSize - 1) / chunkSize};
156 }
else if (chunk == kStatic) {
161 return {chunk, (size() + chunk - 1) / chunk};
176template <
typename IntegerA,
typename IntegerB>
177inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
179 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
180 return (chunking == ParForChunking::kStatic)
192template <
typename IntegerA,
typename IntegerB,
typename IntegerC>
193inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
201 int& operator*()
const {
205 NoOpIter& operator++() {
208 NoOpIter operator++(
int) {
213struct NoOpContainer {
214 size_t size()
const {
228 void emplace_back(
int) {}
237 int operator()()
const {
246 typename StateContainer,
248void parallel_for_staticImpl(
250 StateContainer& states,
251 const StateGen& defaultState,
252 const ChunkedRange<IntegerT>& range,
256 bool reuseExistingState) {
257 using size_type =
typename ChunkedRange<IntegerT>::size_type;
259 size_type numThreads = std::min<size_type>(taskSet.numPoolThreads() + wait, maxThreads);
261 numThreads = std::min(numThreads, range.size());
263 if (!reuseExistingState) {
267 size_t numToEmplace = states.size() <
static_cast<size_t>(numThreads)
268 ?
static_cast<size_t>(numThreads) - states.size()
271 for (; numToEmplace--;) {
272 states.emplace_back(defaultState());
276 detail::staticChunkSize(
static_cast<ssize_t
>(range.size()),
static_cast<ssize_t
>(numThreads));
277 IntegerT chunkSize =
static_cast<IntegerT
>(chunking.ceilChunkSize);
279 bool perfectlyChunked =
static_cast<size_type
>(chunking.transitionTaskIndex) == numThreads;
282 size_type firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
284 auto stateIt = states.begin();
285 IntegerT start = range.start;
287 for (t = 0; t < firstLoopLen; ++t) {
288 IntegerT next =
static_cast<IntegerT
>(start + chunkSize);
289 taskSet.schedule([it = stateIt++, start, next, f]() {
290 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
297 chunkSize =
static_cast<IntegerT
>(chunkSize - !perfectlyChunked);
299 for (; t < numThreads - 1; ++t) {
300 IntegerT next =
static_cast<IntegerT
>(start + chunkSize);
301 taskSet.schedule([it = stateIt++, start, next, f]() {
302 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
309 f(*stateIt, start, range.end);
313 [stateIt, start, end = range.end, f]() {
314 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
315 f(*stateIt, start, end);
343 typename StateContainer,
347 StateContainer& states,
348 const StateGen& defaultState,
359 using size_type =
typename ChunkedRange<IntegerT>::size_type;
362 uint32_t minItemsPerChunk = std::max<uint32_t>(1, options.minItemsPerChunk);
365 size_type maxThreads = std::max<int32_t>(options.maxThreads, 1);
367 bool isStatic = range.isStatic();
369 const size_type N = taskSet.numPoolThreads();
370 if (N == 0 || !options.maxThreads || range.size() <= minItemsPerChunk ||
371 detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
372 if (!options.reuseExistingState) {
375 if (states.empty()) {
376 states.emplace_back(defaultState());
378 f(*states.begin(), range.start, range.end);
386 if (minItemsPerChunk > 1) {
387 size_type maxWorkers = range.size() / minItemsPerChunk;
388 if (maxWorkers < maxThreads) {
389 maxThreads =
static_cast<uint32_t
>(maxWorkers);
391 if (range.size() / (maxThreads + options.wait) < minItemsPerChunk && range.isAuto()) {
394 }
else if (range.size() <= N + options.wait) {
395 if (range.isAuto()) {
397 }
else if (!range.isStatic()) {
398 maxThreads = range.size() - options.wait;
403 detail::parallel_for_staticImpl(
409 static_cast<ssize_t
>(maxThreads),
411 options.reuseExistingState);
416 const size_type numToLaunch = std::min<size_type>(maxThreads - options.wait, N);
418 if (!options.reuseExistingState) {
422 size_t numToEmplace =
static_cast<size_type
>(states.size()) < (numToLaunch + options.wait)
423 ? (
static_cast<size_t>(numToLaunch) + options.wait) - states.size()
425 for (; numToEmplace--;) {
426 states.emplace_back(defaultState());
429 if (numToLaunch == 1 && !options.wait) {
431 [&s = states.front(), range, f = std::move(f)]() { f(s, range.start, range.end); });
436 auto chunkInfo = range.calcChunkSize(numToLaunch, options.wait, minItemsPerChunk);
437 auto chunkSize = std::get<0>(chunkInfo);
438 auto numChunks = std::get<1>(chunkInfo);
441 alignas(
kCacheLineSize) std::atomic<
decltype(numChunks)> index(0);
442 auto worker = [start = range.start, end = range.end, &index, f, chunkSize, numChunks](
auto& s) {
443 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
446 auto cur = index.fetch_add(1, std::memory_order_relaxed);
447 if (cur >= numChunks) {
450 auto sidx =
static_cast<IntegerT
>(start + cur * chunkSize);
451 if (cur + 1 == numChunks) {
454 auto eidx =
static_cast<IntegerT
>(sidx + chunkSize);
460 auto it = states.begin();
461 for (size_type i = 0; i < numToLaunch; ++i) {
462 taskSet.schedule([&s = *it++, worker]() { worker(s); });
468 Atomic() : index(0) {}
473 void* ptr = detail::alignedMalloc(
sizeof(Atomic),
alignof(Atomic));
474 auto* atm =
new (ptr) Atomic();
476 std::shared_ptr<Atomic> wrapper(atm, detail::AlignedFreeDeleter<Atomic>());
477 auto worker = [start = range.start,
479 wrapper = std::move(wrapper),
482 numChunks](
auto& s) {
483 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
485 auto cur = wrapper->index.fetch_add(1, std::memory_order_relaxed);
486 if (cur >= numChunks) {
489 auto sidx =
static_cast<IntegerT
>(start + cur * chunkSize);
490 if (cur + 1 == numChunks) {
493 auto eidx =
static_cast<IntegerT
>(sidx + chunkSize);
499 auto it = states.begin();
500 for (size_type i = 0; i < numToLaunch; ++i) {
501 taskSet.schedule([&s = *it++, worker]() { worker(s); }, ForceQueuingTag());
515template <
typename TaskSetT,
typename IntegerT,
typename F>
521 detail::NoOpContainer container;
525 detail::NoOpStateGen(),
527 [f = std::move(f)](
int ,
auto i,
auto j) { f(i, j); },
540template <
typename IntegerT,
typename F>
542 TaskSet taskSet(globalThreadPool());
544 parallel_for(taskSet, range, std::forward<F>(f), options);
564template <
typename F,
typename IntegerT,
typename StateContainer,
typename StateGen>
566 StateContainer& states,
567 const StateGen& defaultState,
571 TaskSet taskSet(globalThreadPool());
573 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
591 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
592 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
593 std::enable_if_t<detail::CanInvoke<F(IntegerA)>::value,
bool> =
true>
600 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
602 auto range = makeChunkedRange(start, end, options.defaultChunking);
606 [f = std::move(f)](IntegerT s, IntegerT e) {
607 for (IntegerT i = s; i < e; ++i) {
619 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
620 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
621 std::enable_if_t<detail::CanInvoke<F(IntegerA, IntegerB)>::value,
bool> =
true>
627 ParForOptions options = {}) {
629 parallel_for(taskSet, range, std::forward<F>(f), options);
646 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
647 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
649 TaskSet taskSet(globalThreadPool());
651 parallel_for(taskSet, start, end, std::forward<F>(f), options);
677 typename StateContainer,
679 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
680 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
682 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA)>::value,
686 StateContainer& states,
687 const StateGen& defaultState,
692 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
693 auto range = makeChunkedRange(start, end, options.defaultChunking);
699 [f = std::move(f)](
auto& state, IntegerT s, IntegerT e) {
700 for (IntegerT i = s; i < e; ++i) {
712 typename StateContainer,
714 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
715 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
717 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA, IntegerB)>::value,
721 StateContainer& states,
722 const StateGen& defaultState,
726 ParForOptions options = {}) {
728 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
754 typename StateContainer,
756 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
757 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
759 StateContainer& states,
760 const StateGen& defaultState,
765 TaskSet taskSet(globalThreadPool());
767 parallel_for(taskSet, states, defaultState, start, end, std::forward<F>(f), options);
void parallel_for(TaskSetT &taskSet, StateContainer &states, const StateGen &defaultState, const ChunkedRange< IntegerT > &range, F &&f, ParForOptions options={})
ChunkedRange< std::common_type_t< IntegerA, IntegerB > > makeChunkedRange(IntegerA start, IntegerB end, ParForChunking chunking=ParForChunking::kStatic)
ChunkedRange(IntegerT s, IntegerT e, Auto)
ChunkedRange(IntegerT s, IntegerT e, IntegerT c)
ChunkedRange(IntegerT s, IntegerT e, Static)
uint32_t minItemsPerChunk
ParForChunking defaultChunking