}
};
+template <typename Iterator, typename Func, typename... AllowedErrors>
+static inline typename errorator<AllowedErrors...>::template future<>
+parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
+ parallel_for_each_state<AllowedErrors...>* s = nullptr;
+ // Process all elements, giving each future the following treatment:
+ // - available, not failed: do nothing
+ // - available, failed: collect exception in ex
+ // - not available: collect in s (allocating it if needed)
+ for (;first != last; ++first) {
+ auto f = seastar::futurize_invoke(std::forward<Func>(func), *first);
+ if (!f.available() || f.failed()) {
+ if (!s) {
+ using itraits = std::iterator_traits<Iterator>;
+ auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
+ first, last, typename itraits::iterator_category()) + 1);
+ s = new parallel_for_each_state<AllowedErrors...>(n);
+ }
+ s->add_future(std::move(f));
+ }
+ }
+ // If any futures were not available, hand off to parallel_for_each_state::start().
+ // Otherwise we can return a result immediately.
+ if (s) {
+ // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
+ // so this isn't a leak
+ return s->get_future();
+ }
+ return seastar::make_ready_future<>();
+}
+
} // namespace crimson
template <class T>
static inline constexpr bool is_error_v = std::is_base_of_v<error_t<T>, T>;
+template <typename... AllowedErrors>
+struct errorator;
+
+template <typename Iterator, typename Func, typename... AllowedErrors>
+static inline typename errorator<AllowedErrors...>::template future<>
+parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept;
+
template <class... AllowedErrors>
struct errorator {
return make_ready_future<>();
}
- template <typename Iterator, typename Func>
- static inline errorator<AllowedErrors...>::future<>
- parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
- parallel_for_each_state<AllowedErrors...>* s = nullptr;
- // Process all elements, giving each future the following treatment:
- // - available, not failed: do nothing
- // - available, failed: collect exception in ex
- // - not available: collect in s (allocating it if needed)
- for (;first != last; ++first) {
- auto f = seastar::futurize_invoke(std::forward<Func>(func), *first);
- if (!f.available() || f.failed()) {
- if (!s) {
- using itraits = std::iterator_traits<Iterator>;
- auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
- first, last, typename itraits::iterator_category()) + 1);
- s = new parallel_for_each_state<AllowedErrors...>(n);
- }
- s->add_future(std::move(f));
- }
- }
- // If any futures were not available, hand off to parallel_for_each_state::start().
- // Otherwise we can return a result immediately.
- if (s) {
- // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
- // so this isn't a leak
- return s->get_future();
- }
- return seastar::make_ready_future<>();
- }
-
template <typename Container, typename Func>
static inline auto parallel_for_each(Container&& container, Func&& func) noexcept {
- return parallel_for_each(
+ return crimson::parallel_for_each<decltype(std::begin(container)), Func, AllowedErrors...>(
std::begin(container),
std::end(container),
std::forward<Func>(func));
}
+ template <typename Iterator, typename Func>
+ static inline errorator<AllowedErrors...>::future<>
+ parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
+ return crimson::parallel_for_each<Iterator, Func, AllowedErrors...>(
+ first,
+ last,
+ std::forward<Func>(func));
+ }
private:
template <class T>
class futurize {