19#include <dispenso/detail/can_invoke.h>
20#include <dispenso/detail/per_thread_info.h>
86template <
typename IntegerT = s
size_t>
94 using size_type = std::conditional_t<std::is_signed<IntegerT>::value,
int64_t,
uint64_t>;
98 static constexpr IntegerT kStatic = std::numeric_limits<IntegerT>::max();
124 bool isStatic()
const {
125 return chunk == kStatic;
128 bool isAuto()
const {
136 size_type size()
const {
137 return static_cast<size_type
>(end) - start;
140 template <
typename OtherInt>
141 std::tuple<size_type, size_type>
142 calcChunkSize(OtherInt numLaunched,
bool oneOnCaller, size_type minChunkSize)
const {
143 size_type workingThreads =
static_cast<size_type
>(numLaunched) + size_type{oneOnCaller};
144 assert(workingThreads > 0);
147 size_type dynFactor = std::min<size_type>(16, size() / workingThreads);
150 size_type roughChunks = dynFactor * workingThreads;
151 chunkSize = (size() + roughChunks - 1) / roughChunks;
153 }
while (chunkSize < minChunkSize);
154 return {chunkSize, (size() + chunkSize - 1) / chunkSize};
155 }
else if (chunk == kStatic) {
160 return {chunk, (size() + chunk - 1) / chunk};
175template <
typename IntegerA,
typename IntegerB>
176inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
178 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
179 return (
chunking == ParForChunking::kStatic)
191template <
typename IntegerA,
typename IntegerB,
typename IntegerC>
192inline ChunkedRange<std::common_type_t<IntegerA, IntegerB>>
200 int& operator*()
const {
204 NoOpIter& operator++() {
207 NoOpIter operator++(
int) {
212struct NoOpContainer {
213 size_t size()
const {
227 void emplace_back(
int) {}
236 int operator()()
const {
245 typename StateContainer,
247void parallel_for_staticImpl(
249 StateContainer& states,
250 const StateGen& defaultState,
251 const ChunkedRange<IntegerT>& range,
255 bool reuseExistingState) {
256 using size_type =
typename ChunkedRange<IntegerT>::size_type;
258 size_type numThreads = std::min<size_type>(taskSet.numPoolThreads() + wait, maxThreads);
260 numThreads = std::min(numThreads, range.size());
262 if (!reuseExistingState) {
266 size_t numToEmplace = states.size() <
static_cast<size_t>(numThreads)
267 ?
static_cast<size_t>(numThreads) - states.size()
270 for (; numToEmplace--;) {
271 states.emplace_back(defaultState());
275 detail::staticChunkSize(
static_cast<ssize_t
>(range.size()),
static_cast<ssize_t
>(numThreads));
276 IntegerT chunkSize =
static_cast<IntegerT
>(chunking.ceilChunkSize);
278 bool perfectlyChunked =
static_cast<size_type
>(chunking.transitionTaskIndex) == numThreads;
281 size_type firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
283 auto stateIt = states.begin();
284 IntegerT start = range.start;
286 for (t = 0; t < firstLoopLen; ++t) {
287 IntegerT next =
static_cast<IntegerT
>(start + chunkSize);
288 taskSet.schedule([it = stateIt++, start, next, f]() {
289 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
296 chunkSize =
static_cast<IntegerT
>(chunkSize - !perfectlyChunked);
298 for (; t < numThreads - 1; ++t) {
299 IntegerT next =
static_cast<IntegerT
>(start + chunkSize);
300 taskSet.schedule([it = stateIt++, start, next, f]() {
301 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
308 f(*stateIt, start, range.end);
312 [stateIt, start, end = range.end, f]() {
313 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
314 f(*stateIt, start, end);
342 typename StateContainer,
358 using size_type =
typename ChunkedRange<IntegerT>::size_type;
361 uint32_t minItemsPerChunk = std::max<uint32_t>(1, options.minItemsPerChunk);
364 size_type maxThreads = std::max<int32_t>(options.maxThreads, 1);
366 bool isStatic = range.isStatic();
368 const size_type N = taskSet.numPoolThreads();
369 if (N == 0 || !options.maxThreads || range.size() <= minItemsPerChunk ||
370 detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
371 if (!options.reuseExistingState) {
374 if (states.empty()) {
375 states.emplace_back(defaultState());
377 f(*states.begin(), range.start, range.end);
385 if (minItemsPerChunk > 1) {
386 size_type maxWorkers = range.size() / minItemsPerChunk;
387 if (maxWorkers < maxThreads) {
388 maxThreads =
static_cast<uint32_t
>(maxWorkers);
390 if (range.size() / (maxThreads + options.wait) < minItemsPerChunk && range.isAuto()) {
393 }
else if (range.size() <= N + options.wait) {
394 if (range.isAuto()) {
396 }
else if (!range.isStatic()) {
397 maxThreads = range.size() - options.wait;
402 detail::parallel_for_staticImpl(
408 static_cast<ssize_t
>(maxThreads),
410 options.reuseExistingState);
415 const size_type numToLaunch = std::min<size_type>(maxThreads - options.wait, N);
417 if (!options.reuseExistingState) {
421 size_t numToEmplace =
static_cast<size_type
>(states.size()) < (numToLaunch + options.wait)
422 ? (
static_cast<size_t>(numToLaunch) + options.wait) - states.size()
424 for (; numToEmplace--;) {
425 states.emplace_back(defaultState());
428 if (numToLaunch == 1 && !options.wait) {
430 [&s = states.front(), range, f = std::move(f)]() { f(s, range.start, range.end); });
435 auto chunkInfo = range.calcChunkSize(numToLaunch, options.wait, minItemsPerChunk);
436 auto chunkSize = std::get<0>(chunkInfo);
437 auto numChunks = std::get<1>(chunkInfo);
440 alignas(kCacheLineSize) std::atomic<
decltype(numChunks)> index(0);
441 auto worker = [start = range.start, end = range.end, &index, f, chunkSize, numChunks](
auto& s) {
442 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
445 auto cur = index.fetch_add(1, std::memory_order_relaxed);
446 if (cur >= numChunks) {
449 auto sidx =
static_cast<IntegerT
>(start + cur * chunkSize);
450 if (cur + 1 == numChunks) {
453 auto eidx =
static_cast<IntegerT
>(sidx + chunkSize);
459 auto it = states.begin();
460 for (size_type i = 0; i < numToLaunch; ++i) {
461 taskSet.schedule([&s = *it++, worker]() { worker(s); });
467 Atomic() : index(0) {}
468 alignas(kCacheLineSize) std::atomic<
decltype(numChunks)> index;
469 char buffer[kCacheLineSize -
sizeof(index)];
472 void* ptr = detail::alignedMalloc(
sizeof(Atomic),
alignof(Atomic));
473 auto* atm =
new (ptr) Atomic();
475 std::shared_ptr<Atomic> wrapper(atm, detail::AlignedFreeDeleter<Atomic>());
476 auto worker = [start = range.start,
478 wrapper = std::move(wrapper),
481 numChunks](
auto& s) {
482 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
484 auto cur = wrapper->index.fetch_add(1, std::memory_order_relaxed);
485 if (cur >= numChunks) {
488 auto sidx =
static_cast<IntegerT
>(start + cur * chunkSize);
489 if (cur + 1 == numChunks) {
492 auto eidx =
static_cast<IntegerT
>(sidx + chunkSize);
498 auto it = states.begin();
499 for (size_type i = 0; i < numToLaunch; ++i) {
500 taskSet.schedule([&s = *it++, worker]() { worker(s); }, ForceQueuingTag());
514template <
typename TaskSetT,
typename IntegerT,
typename F>
520 detail::NoOpContainer container;
524 detail::NoOpStateGen(),
526 [f = std::move(f)](
int ,
auto i,
auto j) { f(i, j); },
539template <
typename IntegerT,
typename F>
541 TaskSet taskSet(globalThreadPool());
543 parallel_for(taskSet, range, std::forward<F>(f), options);
563template <
typename F,
typename IntegerT,
typename StateContainer,
typename StateGen>
570 TaskSet taskSet(globalThreadPool());
572 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
590 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
591 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
592 std::enable_if_t<detail::CanInvoke<F(IntegerA)>::value,
bool> =
true>
599 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
601 auto range = makeChunkedRange(start, end, options.defaultChunking);
605 [f = std::move(f)](IntegerT s, IntegerT e) {
606 for (IntegerT i = s; i < e; ++i) {
618 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
619 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
620 std::enable_if_t<detail::CanInvoke<F(IntegerA, IntegerB)>::value,
bool> =
true>
626 ParForOptions options = {}) {
628 parallel_for(taskSet, range, std::forward<F>(f), options);
645 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
646 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
648 TaskSet taskSet(globalThreadPool());
650 parallel_for(taskSet, start, end, std::forward<F>(f), options);
676 typename StateContainer,
678 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
679 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
681 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA)>::value,
691 using IntegerT = std::common_type_t<IntegerA, IntegerB>;
692 auto range = makeChunkedRange(start, end, options.defaultChunking);
698 [f = std::move(f)](
auto& state, IntegerT s, IntegerT e) {
699 for (IntegerT i = s; i < e; ++i) {
711 typename StateContainer,
713 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
714 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true,
716 detail::CanInvoke<F(
typename StateContainer::reference, IntegerA, IntegerB)>::value,
720 StateContainer& states,
721 const StateGen& defaultState,
725 ParForOptions options = {}) {
727 parallel_for(taskSet, states, defaultState, range, std::forward<F>(f), options);
753 typename StateContainer,
755 std::enable_if_t<std::is_integral<IntegerA>::value,
bool> =
true,
756 std::enable_if_t<std::is_integral<IntegerB>::value,
bool> =
true>
764 TaskSet taskSet(globalThreadPool());
766 parallel_for(taskSet, states, defaultState, start, end, std::forward<F>(f), options);
void parallel_for(TaskSetT &taskSet, StateContainer &states, const StateGen &defaultState, const ChunkedRange< IntegerT > &range, F &&f, ParForOptions options={})
ChunkedRange< std::common_type_t< IntegerA, IntegerB > > makeChunkedRange(IntegerA start, IntegerB end, ParForChunking chunking=ParForChunking::kStatic)
detail::OpResult< T > OpResult
ChunkedRange(IntegerT s, IntegerT e, Auto)
ChunkedRange(IntegerT s, IntegerT e, IntegerT c)
ChunkedRange(IntegerT s, IntegerT e, Static)
uint32_t minItemsPerChunk
ParForChunking defaultChunking