dispenso
A library for task parallelism
 
Loading...
Searching...
No Matches
for_each.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
15#pragma once
16
17#include <algorithm>
18
19#include <dispenso/detail/per_thread_info.h>
20#include <dispenso/task_set.h>
21
22namespace dispenso {
23
33 uint32_t maxThreads = std::numeric_limits<int32_t>::max();
42 bool wait = true;
43};
44
56template <typename TaskSetT, typename Iter, typename F>
57void for_each_n(TaskSetT& tasks, Iter start, size_t n, F&& f, ForEachOptions options = {}) {
58 // TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
59 // recursive case?
60 if (!n || !options.maxThreads || detail::PerPoolPerThreadInfo::isParForRecursive(&tasks.pool())) {
61 for (size_t i = 0; i < n; ++i) {
62 f(*start);
63 ++start;
64 }
65 if (options.wait) {
66 tasks.wait();
67 }
68 return;
69 }
70
71 // 0 indicates serial execution per API spec
72 int32_t maxThreads = std::max<int32_t>(options.maxThreads, 1);
73
74 ssize_t numThreads = std::min<ssize_t>(tasks.numPoolThreads() + options.wait, maxThreads);
75 // Reduce threads used if they exceed work to be done.
76 numThreads = std::min<ssize_t>(numThreads, n);
77
78 auto chunking = detail::staticChunkSize(n, numThreads);
79 size_t chunkSize = chunking.ceilChunkSize;
80
81 bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
82
83 // (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
84 ssize_t firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
85
86 ssize_t t;
87 for (t = 0; t < firstLoopLen; ++t) {
88 Iter next = start;
89 std::advance(next, chunkSize);
90 tasks.schedule([start, next, f]() {
91 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
92 for (Iter it = start; it != next; ++it) {
93 f(*it);
94 }
95 });
96 start = next;
97 }
98
99 // Reduce the remaining chunk sizes by 1.
100 chunkSize -= !perfectlyChunked;
101 // Finish submitting all but the last item.
102 for (; t < numThreads - 1; ++t) {
103 Iter next = start;
104 std::advance(next, chunkSize);
105 tasks.schedule([start, next, f]() {
106 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
107 for (Iter it = start; it != next; ++it) {
108 f(*it);
109 }
110 });
111 start = next;
112 }
113
114 Iter end = start;
115 std::advance(end, chunkSize);
116
117 if (options.wait) {
118 for (Iter it = start; it != end; ++it) {
119 f(*it);
120 }
121 tasks.wait();
122 } else {
123 tasks.schedule(
124 [start, end, f]() {
125 auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
126 for (Iter it = start; it != end; ++it) {
127 f(*it);
128 }
129 },
130 ForceQueuingTag());
131 }
132}
133
145template <typename Iter, typename F>
146void for_each_n(Iter start, size_t n, F&& f, ForEachOptions options = {}) {
147 TaskSet taskSet(globalThreadPool());
148 options.wait = true;
149 for_each_n(taskSet, start, n, std::forward<F>(f), options);
150}
151
163template <typename TaskSetT, typename Iter, typename F>
164void for_each(TaskSetT& tasks, Iter start, Iter end, F&& f, ForEachOptions options = {}) {
165 for_each_n(tasks, start, std::distance(start, end), std::forward<F>(f), options);
166}
167
179template <typename Iter, typename F>
180void for_each(Iter start, Iter end, F&& f, ForEachOptions options = {}) {
181 for_each_n(start, std::distance(start, end), std::forward<F>(f), options);
182}
183
184// TODO(bbudge): Implement ranges versions for these in C++20 (currently not in an env where this
185// can be tested)
186
187} // namespace dispenso
void for_each_n(TaskSetT &tasks, Iter start, size_t n, F &&f, ForEachOptions options={})
Definition for_each.h:57
void for_each(TaskSetT &tasks, Iter start, Iter end, F &&f, ForEachOptions options={})
Definition for_each.h:164
detail::OpResult< T > OpResult
Definition pipeline.h:29