-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#pragma once
friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
template<typename, typename>
friend class ::crimson::interruptible::interruptible_future_detail;
+ friend class errorator<AllowedErrors...>::parallel_for_each_state;
};
class Enabler {};
return make_ready_future<>();
}
+ template <typename Iterator, typename Func>
+ static inline errorator<AllowedErrors...>::future<>
+ parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
+ parallel_for_each_state* s = nullptr;
+ // Process all elements, giving each future the following treatment:
+ // - available, not failed: do nothing
+ // - available, failed: collect exception in ex
+ // - not available: collect in s (allocating it if needed)
+ for (;first != last; ++first) {
+ auto f = seastar::futurize_invoke(std::forward<Func>(func), *first);
+ if (!f.available() || f.failed()) {
+ if (!s) {
+ using itraits = std::iterator_traits<Iterator>;
+ auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
+ first, last, typename itraits::iterator_category()) + 1);
+ s = new parallel_for_each_state(n);
+ }
+ s->add_future(std::move(f));
+ }
+ }
+ // If any futures were not available, hand off to parallel_for_each_state::start().
+ // Otherwise we can return a result immediately.
+ if (s) {
+ // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
+ // so this isn't a leak
+ return s->get_future();
+ }
+ return seastar::make_ready_future<>();
+ }
+
+ template <typename Container, typename Func>
+ static inline auto parallel_for_each(Container&& container, Func&& func) noexcept {
+ return parallel_for_each(
+ std::begin(container),
+ std::end(container),
+ std::forward<Func>(func));
+ }
+
private:
template <class T, class = std::void_t<T>>
class futurize {
return e.error_t<std::decay_t<ErrorT>>::to_exception_ptr();
}
+ class parallel_for_each_state final : private seastar::continuation_base<> {
+ using future_t = errorator<AllowedErrors...>::future<>;
+ std::vector<future_t> _incomplete;
+ seastar::promise<> _result;
+ std::exception_ptr _ex;
+ private:
+ void wait_for_one() noexcept {
+ while (!_incomplete.empty() && _incomplete.back().available()) {
+ if (_incomplete.back().failed()) {
+ _ex = _incomplete.back().get_exception();
+ }
+ _incomplete.pop_back();
+ }
+ if (!_incomplete.empty()) {
+ seastar::internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
+ _incomplete.pop_back();
+ return;
+ }
+ if (__builtin_expect(bool(_ex), false)) {
+ _result.set_exception(std::move(_ex));
+ } else {
+ _result.set_value();
+ }
+ delete this;
+ }
+ virtual void run_and_dispose() noexcept override {
+ if (_state.failed()) {
+ _ex = std::move(_state).get_exception();
+ }
+ _state = {};
+ wait_for_one();
+ }
+ task* waiting_task() noexcept override { return _result.waiting_task(); }
+ public:
+ parallel_for_each_state(size_t n) {
+ _incomplete.reserve(n);
+ }
+ void add_future(future_t&& f) {
+ _incomplete.push_back(std::move(f));
+ }
+ future_t get_future() {
+ auto ret = _result.get_future();
+ wait_for_one();
+ return ret;
+ }
+ };
+
// needed because of:
// * return_errorator_t::template futurize<...> in `safe_then()`,
// * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`.