]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/common: allow interruptible parallel_for_each to handle errorated future 42147/head
authorXuehan Xu <xxhdx1985126@gmail.com>
Fri, 2 Jul 2021 04:05:02 +0000 (12:05 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Sun, 4 Jul 2021 03:41:07 +0000 (11:41 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/common/errorator.h
src/crimson/common/interruptible_future.h

index aa9f196cec9b218b72f7c433f96b49a813f2dbdc..f9ec7bf4a3f14685025514016b3c4e9abb9c07e7 100644 (file)
@@ -12,6 +12,9 @@
 
 namespace crimson::interruptible {
 
+template <typename, typename>
+class parallel_for_each_state;
+
 template <typename, typename>
 class interruptible_future_detail;
 
@@ -439,6 +442,8 @@ private:
       return std::move(maybe_handle_error).get_result();
     }
 
+  protected:
+    using base_t::get_exception;
   public:
     using errorator_type = ::crimson::errorator<AllowedErrors...>;
     using promise_type = seastar::promise<ValueT>;
@@ -717,6 +722,8 @@ private:
     template<typename, typename>
     friend class ::crimson::interruptible::interruptible_future_detail;
     friend class ::crimson::parallel_for_each_state<AllowedErrors...>;
+    template <typename IC, typename FT>
+    friend class ::crimson::interruptible::parallel_for_each_state;
   };
 
   class Enabler {};
index 748154867abd838438d0eec4ba81dba94596dc8f..51b50f04ebb6029c7fc0c2e4d3bfd3d907df75cf 100644 (file)
@@ -228,6 +228,79 @@ Result non_futurized_call_with_interruption(
   }
 }
 
+template <typename InterruptCond, typename Errorator>
+struct interruptible_errorator;
+
+template <typename T>
+struct parallel_for_each_ret {
+  static_assert(seastar::is_future<T>::value);
+  using type = seastar::future<>;
+};
+
+template <template <typename...> typename ErroratedFuture, typename T>
+struct parallel_for_each_ret<
+  ErroratedFuture<
+    ::crimson::errorated_future_marker<T>>> {
+  using type = ErroratedFuture<::crimson::errorated_future_marker<void>>;
+};
+
+template <typename InterruptCond, typename FutureType>
+class parallel_for_each_state final : private seastar::continuation_base<> {
+  using elem_ret_t = std::conditional_t<
+    is_interruptible_future<FutureType>::value,
+    typename FutureType::core_type,
+    FutureType>;
+  using future_t = interruptible_future_detail<
+    InterruptCond,
+    typename parallel_for_each_ret<elem_ret_t>::type>;
+  std::vector<future_t> _incomplete;
+  seastar::promise<> _result;
+  std::exception_ptr _ex;
+private:
+  void wait_for_one() noexcept {
+    while (!_incomplete.empty() && _incomplete.back().available()) {
+      if (_incomplete.back().failed()) {
+       _ex = _incomplete.back().get_exception();
+      }
+      _incomplete.pop_back();
+    }
+    if (!_incomplete.empty()) {
+      seastar::internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
+      _incomplete.pop_back();
+      return;
+    }
+    if (__builtin_expect(bool(_ex), false)) {
+      _result.set_exception(std::move(_ex));
+    } else {
+      _result.set_value();
+    }
+    delete this;
+  }
+  virtual void run_and_dispose() noexcept override {
+    if (_state.failed()) {
+      _ex = std::move(_state).get_exception();
+    }
+    _state = {};
+    wait_for_one();
+  }
+  task* waiting_task() noexcept override { return _result.waiting_task(); }
+public:
+  parallel_for_each_state(size_t n) {
+    _incomplete.reserve(n);
+  }
+  void add_future(future_t&& f) {
+    _incomplete.push_back(std::move(f));
+  }
+  future_t get_future() {
+    auto ret = _result.get_future();
+    wait_for_one();
+    return ret;
+  }
+  static future_t now() {
+    return seastar::now();
+  }
+};
+
 template <typename InterruptCond, typename T>
 class [[nodiscard]] interruptible_future_detail<InterruptCond, seastar::future<T>>
   : private seastar::future<T> {
@@ -456,6 +529,8 @@ private:
   friend class ::seastar::internal::when_all_state_component;
   template <typename Lock, typename Func>
   friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+  template <typename IC, typename FT>
+  friend class parallel_for_each_state;
 };
 
 template <typename InterruptCond, typename Errorator>
@@ -516,6 +591,7 @@ public:
   using core_type::available;
   using core_type::failed;
   using core_type::core_type;
+  using core_type::get_exception;
 
   using value_type = typename core_type::value_type;
 
@@ -803,6 +879,7 @@ public:
   }
 
 private:
+  using core_type::_then;
   template <typename Func>
   [[gnu::always_inline]]
   auto handle_interruption(Func&& func) {
@@ -856,6 +933,8 @@ private:
   friend class interruptible_future_detail;
   template <typename Lock, typename Func>
   friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+  template <typename IC, typename FT>
+  friend class parallel_for_each_state;
 };
 
 template <typename InterruptCond, typename T = void>
@@ -979,7 +1058,8 @@ public:
     Func &&f, InterruptCond &&cond, Params&&... params) {
     using func_result_t = std::invoke_result_t<Func, Params...>;
     using func_ertr_t =
-      typename seastar::template futurize<func_result_t>::errorator_type;
+      typename seastar::template futurize<
+       func_result_t>::core_type::errorator_type;
     using with_trans_ertr =
       typename func_ertr_t::template extend_ertr<errorator<Error>>;
 
@@ -1019,7 +1099,7 @@ public:
          ::seastar::do_for_each(begin, end,
            [action=std::move(action),
            interrupt_condition=interrupt_cond<InterruptCond>]
-           (decltype(*begin) x) {
+           (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action),
@@ -1031,7 +1111,7 @@ public:
          ::crimson::do_for_each(begin, end,
            [action=std::move(action),
            interrupt_condition=interrupt_cond<InterruptCond>]
-           (decltype(*begin) x) {
+           (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action),
@@ -1051,7 +1131,7 @@ public:
          ::seastar::do_for_each(begin, end,
            [action=std::move(action),
            interrupt_condition=interrupt_cond<InterruptCond>]
-           (decltype(*begin) x) {
+           (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action),
@@ -1063,7 +1143,7 @@ public:
          ::crimson::do_for_each(begin, end,
            [action=std::move(action),
            interrupt_condition=interrupt_cond<InterruptCond>]
-           (decltype(*begin) x) {
+           (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action),
@@ -1129,56 +1209,14 @@ public:
     }
   }
 
-  class parallel_for_each_state final : private seastar::continuation_base<> {
-    using future_t = interruptible_future_detail<InterruptCond, seastar::future<>>;
-    std::vector<future_t> _incomplete;
-    seastar::promise<> _result;
-    std::exception_ptr _ex;
-  private:
-    void wait_for_one() noexcept {
-      while (!_incomplete.empty() && _incomplete.back().available()) {
-       if (_incomplete.back().failed()) {
-         _ex = _incomplete.back().get_exception();
-       }
-       _incomplete.pop_back();
-      }
-      if (!_incomplete.empty()) {
-       seastar::internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
-       _incomplete.pop_back();
-       return;
-      }
-      if (__builtin_expect(bool(_ex), false)) {
-       _result.set_exception(std::move(_ex));
-      } else {
-       _result.set_value();
-      }
-      delete this;
-    }
-    virtual void run_and_dispose() noexcept override {
-      if (_state.failed()) {
-       _ex = std::move(_state).get_exception();
-      }
-      _state = {};
-      wait_for_one();
-    }
-    task* waiting_task() noexcept override { return _result.waiting_task(); }
-  public:
-    parallel_for_each_state(size_t n) {
-      _incomplete.reserve(n);
-    }
-    void add_future(future_t&& f) {
-      _incomplete.push_back(std::move(f));
-    }
-    future_t get_future() {
-      auto ret = _result.get_future();
-      wait_for_one();
-      return ret;
-    }
-  };
   template <typename Iterator, typename Func>
-  static inline interruptible_future_detail<InterruptCond, seastar::future<>>
-  parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
-    parallel_for_each_state* s = nullptr;
+  static inline auto parallel_for_each(
+    Iterator begin,
+    Iterator end,
+    Func&& func
+  ) noexcept {
+    using ResultType = std::invoke_result_t<Func, typename Iterator::reference>;
+    parallel_for_each_state<InterruptCond, ResultType>* s = nullptr;
     auto decorated_func =
       [func=std::forward<Func>(func),
       interrupt_condition=interrupt_cond<InterruptCond>]
@@ -1199,7 +1237,7 @@ public:
          using itraits = std::iterator_traits<Iterator>;
          auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
                begin, end, typename itraits::iterator_category()) + 1);
-         s = new parallel_for_each_state(n);
+         s = new parallel_for_each_state<InterruptCond, ResultType>(n);
        }
        s->add_future(std::move(f));
       }
@@ -1211,7 +1249,7 @@ public:
       // so this isn't a leak
       return s->get_future();
     }
-    return seastar::make_ready_future<>();
+    return parallel_for_each_state<InterruptCond, ResultType>::now();
   }
 
   template <typename Container, typename Func>
@@ -1386,9 +1424,13 @@ struct futurize<
   using type = ::crimson::interruptible::interruptible_future_detail<
     InterruptCond,
     ErroratedFuture<::crimson::errorated_future_marker<T...>>>;
+  using core_type = ErroratedFuture<
+      ::crimson::errorated_future_marker<T...>>;
   using errorator_type =
-    typename ErroratedFuture<
-      ::crimson::errorated_future_marker<T...>>::errorator_type;
+    ::crimson::interruptible::interruptible_errorator<
+      InterruptCond,
+      typename ErroratedFuture<
+       ::crimson::errorated_future_marker<T...>>::errorator_type>;
 
   template<typename Func, typename... FuncArgs>
   static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
@@ -1411,7 +1453,8 @@ struct futurize<
 
   template <typename Arg>
   static inline type make_exception_future(Arg&& arg) noexcept {
-    return errorator_type::template make_exception_future2<T...>(std::forward<Arg>(arg));
+    return core_type::errorator_type::template make_exception_future2<T...>(
+       std::forward<Arg>(arg));
   }
 
   template<typename PromiseT, typename Func>