]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/common: re-implement do_for_each
authorXinyu Huang <xinyu.huang@intel.com>
Fri, 18 Nov 2022 02:43:42 +0000 (02:43 +0000)
committerXinyu Huang <xinyu.huang@intel.com>
Wed, 23 Nov 2022 02:17:02 +0000 (02:17 +0000)
The current implementation of crimson::do_for_each might meet
stack overflow when future is available but seastar::need_preempt
is true. This new implementation mirror to the seastar::do_for_each
with crimson errorator mechanism will solve this problem.

Fixes: https://tracker.ceph.com/issues/58005.
Signed-off-by: Xinyu Huang <xinyu.huang@intel.com>
src/crimson/common/errorator.h

index aee05f669157528411cb645094b747224483e51e..f5668168d1a5750af2f3d8713995aaacf4e824ec 100644 (file)
@@ -23,36 +23,74 @@ class interruptible_future_detail;
 
 namespace crimson {
 
-template<typename Iterator, typename AsyncAction>
-inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) {
-  using futurator = \
-    ::seastar::futurize<std::invoke_result_t<AsyncAction, decltype(*begin)>>;
+// crimson::do_for_each_state is the mirror of seastar::do_for_each_state with FutureT
+template <typename Iterator, typename AsyncAction, typename FutureT>
+class do_for_each_state final : public seastar::continuation_base<> {
+  Iterator _begin;
+  Iterator _end;
+  AsyncAction _action;
+  seastar::promise<> _pr;
 
-  if (begin == end) {
-    return futurator::type::errorator_type::template make_ready_future<>();
+public:
+  do_for_each_state(Iterator begin, Iterator end, AsyncAction action,
+      FutureT&& first_unavailable)
+    : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
+      seastar::internal::set_callback(std::move(first_unavailable), this);
   }
-  while (true) {
-    auto f = futurator::invoke(action, *begin);
-    ++begin;
-    if (begin == end) {
-      return f;
+  virtual void run_and_dispose() noexcept override {
+    std::unique_ptr<do_for_each_state> zis(this);
+    if (_state.failed()) {
+      _pr.set_urgent_state(std::move(_state));
+      return;
     }
-    if (!f.available() || seastar::need_preempt()) {
-      return std::move(f)._then(
-        [ action = std::move(action),
-          begin = std::move(begin),
-          end = std::move(end)
-        ] () mutable {
-          return ::crimson::do_for_each(std::move(begin),
-                                        std::move(end),
-                                        std::move(action));
-      });
+    while (_begin != _end) {
+      auto f = seastar::futurize_invoke(_action, *_begin);
+      ++_begin;
+      if (f.failed()) {
+        f._forward_to(std::move(_pr));
+        return;
+      }
+      if (!f.available() || seastar::need_preempt()) {
+        _state = {};
+       seastar::internal::set_callback(std::move(f), this);
+        zis.release();
+        return;
+      }
     }
+    _pr.set_value();
+  }
+  task* waiting_task() noexcept override {
+    return _pr.waiting_task();
+  }
+  FutureT get_future() {
+    return _pr.get_future();
+  }
+};
+
+template<typename Iterator, typename AsyncAction,
+  typename FutureT = std::invoke_result_t<AsyncAction, typename Iterator::reference>>
+inline FutureT do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
+  while (begin != end) {
+    auto f = seastar::futurize_invoke(action, *begin);
+    ++begin;
     if (f.failed()) {
       return f;
     }
+    if (!f.available() || seastar::need_preempt()) {
+      // s will be freed by run_and_dispose()
+      auto* s = new crimson::do_for_each_state<Iterator, AsyncAction, FutureT>{
+        std::move(begin), std::move(end), std::move(action), std::move(f)};
+        return s->get_future();
+    }
   }
+  return seastar::make_ready_future<>();
 }
+
+template<typename Iterator, typename AsyncAction>
+inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) {
+  return ::crimson::do_for_each_impl(begin, end, std::move(action));
+}
+
 template<typename Container, typename AsyncAction>
 inline auto do_for_each(Container& c, AsyncAction action) {
   return ::crimson::do_for_each(std::begin(c), std::end(c), std::move(action));
@@ -742,11 +780,18 @@ private:
     auto _then(Func&& func) {
       return base_t::then(std::forward<Func>(func));
     }
+    template <class T>
+    auto _forward_to(T&& pr) {
+      return base_t::forward_to(std::forward<T>(pr));
+    }
     template<typename Iterator, typename AsyncAction>
     friend inline auto ::crimson::do_for_each(Iterator begin,
                                               Iterator end,
                                               AsyncAction action);
 
+    template <typename Iterator, typename AsyncAction, typename FutureT>
+    friend class ::crimson::do_for_each_state;
+
     template<typename AsyncAction>
     friend inline auto ::crimson::repeat(AsyncAction action);