dispenso 1.4.1
A library for task parallelism
Loading...
Searching...
No Matches
for_each.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
16#pragma once
17
18#include <algorithm>
19
20#include <dispenso/detail/per_thread_info.h>
21#include <dispenso/task_set.h>
22
23namespace dispenso {
24
25#if DISPENSO_HAS_CONCEPTS
32template <typename F, typename Iter>
33concept ForEachFunc = std::invocable<F, decltype(*std::declval<Iter>())>;
34#endif // DISPENSO_HAS_CONCEPTS
35
45 uint32_t maxThreads = std::numeric_limits<int32_t>::max();
54 bool wait = true;
55};
56
68template <typename TaskSetT, typename Iter, typename F>
69DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
70void for_each_n(TaskSetT& tasks, Iter start, size_t n, F&& f, ForEachOptions options = {}) {
71 // TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
72 // recursive case?
73 if (!n || !options.maxThreads || detail::PerPoolPerThreadInfo::isParForRecursive(&tasks.pool())) {
74 for (size_t i = 0; i < n; ++i) {
75 f(*start);
76 ++start;
77 }
78 if (options.wait) {
79 tasks.wait();
80 }
81 return;
82 }
83
84 // 0 indicates serial execution per API spec
85 int32_t maxThreads = std::max<int32_t>(options.maxThreads, 1);
86
87 ssize_t numThreads = std::min<ssize_t>(tasks.numPoolThreads() + options.wait, maxThreads);
88 // Reduce threads used if they exceed work to be done.
89 numThreads = std::min<ssize_t>(numThreads, n);
90
91 auto chunking = detail::staticChunkSize(n, numThreads);
92 size_t chunkSize = chunking.ceilChunkSize;
93
94 bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
95
96 // (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
97 ssize_t firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
98
99 ssize_t t;
100 for (t = 0; t < firstLoopLen; ++t) {
101 Iter next = start;
102 std::advance(next, chunkSize);
103 tasks.schedule([start, next, f]() {
104 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
105 for (Iter it = start; it != next; ++it) {
106 f(*it);
107 }
108 });
109 start = next;
110 }
111
112 // Reduce the remaining chunk sizes by 1.
113 chunkSize -= !perfectlyChunked;
114 // Finish submitting all but the last item.
115 for (; t < numThreads - 1; ++t) {
116 Iter next = start;
117 std::advance(next, chunkSize);
118 tasks.schedule([start, next, f]() {
119 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
120 for (Iter it = start; it != next; ++it) {
121 f(*it);
122 }
123 });
124 start = next;
125 }
126
127 Iter end = start;
128 std::advance(end, chunkSize);
129
130 if (options.wait) {
131 for (Iter it = start; it != end; ++it) {
132 f(*it);
133 }
134 tasks.wait();
135 } else {
136 tasks.schedule(
137 [start, end, f]() {
138 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
139 for (Iter it = start; it != end; ++it) {
140 f(*it);
141 }
142 },
143 ForceQueuingTag());
144 }
145}
146
158template <typename Iter, typename F>
159DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
160void for_each_n(Iter start, size_t n, F&& f, ForEachOptions options = {}) {
161 TaskSet taskSet(globalThreadPool());
162 options.wait = true;
163 for_each_n(taskSet, start, n, std::forward<F>(f), options);
164}
165
177template <typename TaskSetT, typename Iter, typename F>
178DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
179void for_each(TaskSetT& tasks, Iter start, Iter end, F&& f, ForEachOptions options = {}) {
180 for_each_n(tasks, start, std::distance(start, end), std::forward<F>(f), options);
181}
182
194template <typename Iter, typename F>
195DISPENSO_REQUIRES(ForEachFunc<F, Iter>)
196void for_each(Iter start, Iter end, F&& f, ForEachOptions options = {}) {
197 for_each_n(start, std::distance(start, end), std::forward<F>(f), options);
198}
199
200// TODO(bbudge): Implement ranges versions for these in C++20 (currently not in an env where this
201// can be tested)
202
203} // namespace dispenso