dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
for_each.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
16#pragma once
17
18#include <algorithm>
19
20#include <dispenso/detail/per_thread_info.h>
21#include <dispenso/task_set.h>
22
23namespace dispenso {
24
34 uint32_t maxThreads = std::numeric_limits<int32_t>::max();
43 bool wait = true;
44};
45
57template <typename TaskSetT, typename Iter, typename F>
58void for_each_n(TaskSetT& tasks, Iter start, size_t n, F&& f, ForEachOptions options = {}) {
59 // TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
60 // recursive case?
61 if (!n || !options.maxThreads || detail::PerPoolPerThreadInfo::isParForRecursive(&tasks.pool())) {
62 for (size_t i = 0; i < n; ++i) {
63 f(*start);
64 ++start;
65 }
66 if (options.wait) {
67 tasks.wait();
68 }
69 return;
70 }
71
72 // 0 indicates serial execution per API spec
73 int32_t maxThreads = std::max<int32_t>(options.maxThreads, 1);
74
75 ssize_t numThreads = std::min<ssize_t>(tasks.numPoolThreads() + options.wait, maxThreads);
76 // Reduce threads used if they exceed work to be done.
77 numThreads = std::min<ssize_t>(numThreads, n);
78
79 auto chunking = detail::staticChunkSize(n, numThreads);
80 size_t chunkSize = chunking.ceilChunkSize;
81
82 bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
83
84 // (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
85 ssize_t firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
86
87 ssize_t t;
88 for (t = 0; t < firstLoopLen; ++t) {
89 Iter next = start;
90 std::advance(next, chunkSize);
91 tasks.schedule([start, next, f]() {
92 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
93 for (Iter it = start; it != next; ++it) {
94 f(*it);
95 }
96 });
97 start = next;
98 }
99
100 // Reduce the remaining chunk sizes by 1.
101 chunkSize -= !perfectlyChunked;
102 // Finish submitting all but the last item.
103 for (; t < numThreads - 1; ++t) {
104 Iter next = start;
105 std::advance(next, chunkSize);
106 tasks.schedule([start, next, f]() {
107 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
108 for (Iter it = start; it != next; ++it) {
109 f(*it);
110 }
111 });
112 start = next;
113 }
114
115 Iter end = start;
116 std::advance(end, chunkSize);
117
118 if (options.wait) {
119 for (Iter it = start; it != end; ++it) {
120 f(*it);
121 }
122 tasks.wait();
123 } else {
124 tasks.schedule(
125 [start, end, f]() {
126 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
127 for (Iter it = start; it != end; ++it) {
128 f(*it);
129 }
130 },
131 ForceQueuingTag());
132 }
133}
134
146template <typename Iter, typename F>
147void for_each_n(Iter start, size_t n, F&& f, ForEachOptions options = {}) {
148 TaskSet taskSet(globalThreadPool());
149 options.wait = true;
150 for_each_n(taskSet, start, n, std::forward<F>(f), options);
151}
152
164template <typename TaskSetT, typename Iter, typename F>
165void for_each(TaskSetT& tasks, Iter start, Iter end, F&& f, ForEachOptions options = {}) {
166 for_each_n(tasks, start, std::distance(start, end), std::forward<F>(f), options);
167}
168
180template <typename Iter, typename F>
181void for_each(Iter start, Iter end, F&& f, ForEachOptions options = {}) {
182 for_each_n(start, std::distance(start, end), std::forward<F>(f), options);
183}
184
185// TODO(bbudge): Implement ranges versions for these in C++20 (currently not in an env where this
186// can be tested)
187
188} // namespace dispenso
void for_each_n(TaskSetT &tasks, Iter start, size_t n, F &&f, ForEachOptions options={})
Definition for_each.h:58
void for_each(TaskSetT &tasks, Iter start, Iter end, F &&f, ForEachOptions options={})
Definition for_each.h:165