From: Xinyu Huang Date: Fri, 18 Nov 2022 02:43:42 +0000 (+0000) Subject: crimson/common: re-implement do_for_each X-Git-Tag: v18.1.0~830^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=298af6b3f0bfc77e7a07b4a5a5772a5dc18910d7;p=ceph-ci.git crimson/common: re-implement do_for_each The current implementation of crimson::do_for_each might meet stack overflow when future is available but seastar::need_preempt is true. This new implementation mirror to the seastar::do_for_each with crimson errorator mechanism will solve this problem. Fixes: https://tracker.ceph.com/issues/58005. Signed-off-by: Xinyu Huang --- diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h index aee05f66915..f5668168d1a 100644 --- a/src/crimson/common/errorator.h +++ b/src/crimson/common/errorator.h @@ -23,36 +23,74 @@ class interruptible_future_detail; namespace crimson { -template -inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) { - using futurator = \ - ::seastar::futurize>; +// crimson::do_for_each_state is the mirror of seastar::do_for_each_state with FutureT +template +class do_for_each_state final : public seastar::continuation_base<> { + Iterator _begin; + Iterator _end; + AsyncAction _action; + seastar::promise<> _pr; - if (begin == end) { - return futurator::type::errorator_type::template make_ready_future<>(); +public: + do_for_each_state(Iterator begin, Iterator end, AsyncAction action, + FutureT&& first_unavailable) + : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) { + seastar::internal::set_callback(std::move(first_unavailable), this); } - while (true) { - auto f = futurator::invoke(action, *begin); - ++begin; - if (begin == end) { - return f; + virtual void run_and_dispose() noexcept override { + std::unique_ptr zis(this); + if (_state.failed()) { + _pr.set_urgent_state(std::move(_state)); + return; } - if (!f.available() || seastar::need_preempt()) { - return std::move(f)._then( - [ action = std::move(action), - begin = std::move(begin), - end = std::move(end) - ] () mutable { - return ::crimson::do_for_each(std::move(begin), - std::move(end), - std::move(action)); - }); + while (_begin != _end) { + auto f = seastar::futurize_invoke(_action, *_begin); + ++_begin; + if (f.failed()) { + f._forward_to(std::move(_pr)); + return; + } + if (!f.available() || seastar::need_preempt()) { + _state = {}; + seastar::internal::set_callback(std::move(f), this); + zis.release(); + return; + } } + _pr.set_value(); + } + task* waiting_task() noexcept override { + return _pr.waiting_task(); + } + FutureT get_future() { + return _pr.get_future(); + } +}; + +template> +inline FutureT do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) { + while (begin != end) { + auto f = seastar::futurize_invoke(action, *begin); + ++begin; if (f.failed()) { return f; } + if (!f.available() || seastar::need_preempt()) { + // s will be freed by run_and_dispose() + auto* s = new crimson::do_for_each_state{ + std::move(begin), std::move(end), std::move(action), std::move(f)}; + return s->get_future(); + } } + return seastar::make_ready_future<>(); } + +template +inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) { + return ::crimson::do_for_each_impl(begin, end, std::move(action)); +} + template inline auto do_for_each(Container& c, AsyncAction action) { return ::crimson::do_for_each(std::begin(c), std::end(c), std::move(action)); @@ -742,11 +780,18 @@ private: auto _then(Func&& func) { return base_t::then(std::forward(func)); } + template + auto _forward_to(T&& pr) { + return base_t::forward_to(std::forward(pr)); + } template friend inline auto ::crimson::do_for_each(Iterator begin, Iterator end, AsyncAction action); + template + friend class ::crimson::do_for_each_state; + template friend inline auto ::crimson::repeat(AsyncAction action);