]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add parallel_for_each to errorator
authorXuehan Xu <xxhdx1985126@gmail.com>
Thu, 10 Jun 2021 06:21:01 +0000 (14:21 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Wed, 16 Jun 2021 09:04:10 +0000 (17:04 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/common/errorator.h

index 518cd8ad203c419283c649856eae1fc93c9da77c..41183ead19154913962bfe4914903e612ce17989 100644 (file)
@@ -1,5 +1,5 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
 
 #pragma once
 
@@ -713,6 +713,7 @@ private:
     friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
     template<typename, typename>
     friend class ::crimson::interruptible::interruptible_future_detail;
+    friend class errorator<AllowedErrors...>::parallel_for_each_state;
   };
 
   class Enabler {};
@@ -859,6 +860,44 @@ public:
     return make_ready_future<>();
   }
 
+  template <typename Iterator, typename Func>
+  static inline errorator<AllowedErrors...>::future<>
+  parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
+    parallel_for_each_state* s = nullptr;
+    // Process all elements, giving each future the following treatment:
+    //   - available, not failed: do nothing
+    //   - available, failed: collect exception in ex
+    //   - not available: collect in s (allocating it if needed)
+    for (;first != last; ++first) {
+      auto f = seastar::futurize_invoke(std::forward<Func>(func), *first);
+      if (!f.available() || f.failed()) {
+        if (!s) {
+          using itraits = std::iterator_traits<Iterator>;
+          auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
+                first, last, typename itraits::iterator_category()) + 1);
+          s = new parallel_for_each_state(n);
+        }
+        s->add_future(std::move(f));
+      }
+    }
+    // If any futures were not available, hand off to parallel_for_each_state::start().
+    // Otherwise we can return a result immediately.
+    if (s) {
+      // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
+      // so this isn't a leak
+      return s->get_future();
+    }
+    return seastar::make_ready_future<>();
+  }
+
+  template <typename Container, typename Func>
+  static inline auto parallel_for_each(Container&& container, Func&& func) noexcept {
+    return parallel_for_each(
+        std::begin(container),
+        std::end(container),
+        std::forward<Func>(func));
+  }
+
 private:
   template <class T, class = std::void_t<T>>
   class futurize {
@@ -1007,6 +1046,53 @@ private:
     return e.error_t<std::decay_t<ErrorT>>::to_exception_ptr();
   }
 
+  class parallel_for_each_state final : private seastar::continuation_base<> {
+    using future_t = errorator<AllowedErrors...>::future<>;
+    std::vector<future_t> _incomplete;
+    seastar::promise<> _result;
+    std::exception_ptr _ex;
+  private:
+    void wait_for_one() noexcept {
+      while (!_incomplete.empty() && _incomplete.back().available()) {
+        if (_incomplete.back().failed()) {
+          _ex = _incomplete.back().get_exception();
+        }
+        _incomplete.pop_back();
+      }
+      if (!_incomplete.empty()) {
+        seastar::internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
+        _incomplete.pop_back();
+        return;
+      }
+      if (__builtin_expect(bool(_ex), false)) {
+        _result.set_exception(std::move(_ex));
+      } else {
+        _result.set_value();
+      }
+      delete this;
+    }
+    virtual void run_and_dispose() noexcept override {
+      if (_state.failed()) {
+        _ex = std::move(_state).get_exception();
+      }
+      _state = {};
+      wait_for_one();
+    }
+    task* waiting_task() noexcept override { return _result.waiting_task(); }
+  public:
+    parallel_for_each_state(size_t n) {
+      _incomplete.reserve(n);
+    }
+    void add_future(future_t&& f) {
+      _incomplete.push_back(std::move(f));
+    }
+    future_t get_future() {
+      auto ret = _result.get_future();
+      wait_for_one();
+      return ret;
+    }
+  };
+
   // needed because of:
   //  * return_errorator_t::template futurize<...> in `safe_then()`,
   //  * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`.