From 0908b11dafd6cb68128a7cc084e66648c79dbbe4 Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Thu, 10 Jun 2021 14:21:01 +0800 Subject: [PATCH] crimson: add parallel_for_each to errorator Signed-off-by: Xuehan Xu --- src/crimson/common/errorator.h | 90 +++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h index 518cd8ad203c4..41183ead19154 100644 --- a/src/crimson/common/errorator.h +++ b/src/crimson/common/errorator.h @@ -1,5 +1,5 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab #pragma once @@ -713,6 +713,7 @@ private: friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); template friend class ::crimson::interruptible::interruptible_future_detail; + friend class errorator::parallel_for_each_state; }; class Enabler {}; @@ -859,6 +860,44 @@ public: return make_ready_future<>(); } + template + static inline errorator::future<> + parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept { + parallel_for_each_state* s = nullptr; + // Process all elements, giving each future the following treatment: + // - available, not failed: do nothing + // - available, failed: collect exception in ex + // - not available: collect in s (allocating it if needed) + for (;first != last; ++first) { + auto f = seastar::futurize_invoke(std::forward(func), *first); + if (!f.available() || f.failed()) { + if (!s) { + using itraits = std::iterator_traits; + auto n = (seastar::internal::iterator_range_estimate_vector_capacity( + first, last, typename itraits::iterator_category()) + 1); + s = new parallel_for_each_state(n); + } + s->add_future(std::move(f)); + } + } + // If any futures were not available, hand off to parallel_for_each_state::start(). + // Otherwise we can return a result immediately. + if (s) { + // s->get_future() takes ownership of s (and chains it to one of the futures it contains) + // so this isn't a leak + return s->get_future(); + } + return seastar::make_ready_future<>(); + } + + template + static inline auto parallel_for_each(Container&& container, Func&& func) noexcept { + return parallel_for_each( + std::begin(container), + std::end(container), + std::forward(func)); + } + private: template > class futurize { @@ -1007,6 +1046,53 @@ private: return e.error_t>::to_exception_ptr(); } + class parallel_for_each_state final : private seastar::continuation_base<> { + using future_t = errorator::future<>; + std::vector _incomplete; + seastar::promise<> _result; + std::exception_ptr _ex; + private: + void wait_for_one() noexcept { + while (!_incomplete.empty() && _incomplete.back().available()) { + if (_incomplete.back().failed()) { + _ex = _incomplete.back().get_exception(); + } + _incomplete.pop_back(); + } + if (!_incomplete.empty()) { + seastar::internal::set_callback(_incomplete.back(), static_cast*>(this)); + _incomplete.pop_back(); + return; + } + if (__builtin_expect(bool(_ex), false)) { + _result.set_exception(std::move(_ex)); + } else { + _result.set_value(); + } + delete this; + } + virtual void run_and_dispose() noexcept override { + if (_state.failed()) { + _ex = std::move(_state).get_exception(); + } + _state = {}; + wait_for_one(); + } + task* waiting_task() noexcept override { return _result.waiting_task(); } + public: + parallel_for_each_state(size_t n) { + _incomplete.reserve(n); + } + void add_future(future_t&& f) { + _incomplete.push_back(std::move(f)); + } + future_t get_future() { + auto ret = _result.get_future(); + wait_for_one(); + return ret; + } + }; + // needed because of: // * return_errorator_t::template futurize<...> in `safe_then()`, // * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`. -- 2.39.5