From: Xuehan Xu Date: Sun, 4 Oct 2020 09:50:48 +0000 (+0800) Subject: crimson: add interruptible future facilities X-Git-Tag: v17.1.0~2698^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3b88d76f360fcb1a3611a40c3a51d9796213f0a4;p=ceph.git crimson: add interruptible future facilities Signed-off-by: Xuehan Xu Signed-off-by: Kefu Chai --- diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h new file mode 100644 index 00000000000..ef60b449a32 --- /dev/null +++ b/src/crimson/common/interruptible_future.h @@ -0,0 +1,1272 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include + +#include "crimson/common/errorator.h" + +// The interrupt condition generally works this way: +// +// 1. It is created by "enable_interruption" method, and is recorded in the thread +// local global variable "::crimson::interruptible::interrupt_cond" (each "enable_ +// interruption" is paired with a "disable_interruption" to ensure that the global +// interrupt_cond won't hold unexpected interrupt conditions during the execution +// of unrelated continuations); +// 2. Any continuation that's created within the execution of the continuation +// that calls the "enable_interruption" method will capture the "interrupt_cond"; +// and when they starts to run, they will put that capture interruption condition +// into "::crimson::interruptible::interrupt_cond" so that further continuations +// created can also capture the interruption condition; +// 3. At the end of the continuation run, the global "interrupt_cond" will be cleared +// to prevent other continuations that are not supposed to be interrupted wrongly +// capture an interruption condition. +// With this approach, continuations capture the interrupt condition at their creation, +// restore the interrupt conditions at the beginning of their execution and clear those +// interrupt conditions at the end of their execution. So the global "interrupt_cond" +// only hold valid interrupt conditions when the corresponding continuations are actually +// running after which it gets cleared. Since continuations can't be executed simultaneously, +// different continuation chains won't be able to interfere with each other. +// +// The global "interrupt_cond" can work as a signal about whether the continuation +// is supposed to be interrupted, the reason that the global "interrupt_cond" and +// "enable_interruption" are added is that there may be this scenario: +// +// Say there's some method PG::func1(), in which the continuations created may +// or may not be supposed to be interrupted in different situations. If we don't +// have a global signal, we have to add an extra parameter to every method like +// PG::func1() to indicate whether the current run should create to-be-interrupted +// continuations or not. +// +// For users to insert an interruption condition, interruptor::with_interruption() in which +// enable_interruption and disable_interruption would be called should be invoked. And users +// can only handle interruption in the interruptible_future_detail::handle_interruption method. + +namespace crimson::interruptible { + +struct ready_future_marker {}; +struct exception_future_marker {}; + +template +class interruptible_future_builder; + +template +struct interruptor; + +template +using InterruptCondRef = seastar::lw_shared_ptr; + +template +thread_local InterruptCondRef interrupt_cond; + +template +class interruptible_future_detail {}; + +template +struct is_interruptible_future : public std::false_type {}; + +template +struct is_interruptible_future< + interruptible_future_detail< + InterruptCond, + FutureType>> + : public std::true_type {}; + +namespace internal { + +template +auto call_with_interruption_impl( + InterruptCondRef interrupt_condition, + Func&& func, Args&&... args) +{ + using futurator_t = seastar::futurize>; + // there might be a case like this: + // with_interruption([] { + // interruptor::do_for_each([] { + // ... + // return interruptible_errorated_future(); + // }).safe_then_interruptible([] { + // ... + // }); + // }) + // In this case, as crimson::do_for_each would directly do futurize_invoke + // for "call_with_interruption", we have make sure this invocation would + // errorly release ::crimson::interruptible::interrupt_cond + bool set_int_cond = false; + + // If there exists an interrupt condition, which means "Func" may not be + // permitted to run as a result of the interruption, test it. If it does + // need to be interrupted, return an interruption; otherwise, restore the + // global "interrupt_cond" with the interruption condition, and go ahead + // executing the Func. + if (!interrupt_cond && interrupt_condition) { + auto [interrupt, fut] = interrupt_condition->template may_interrupt< + typename futurator_t::type>(); + if (interrupt) { + return std::move(*fut); + } + set_int_cond = true; + interrupt_cond = interrupt_condition; + } + + auto fut = seastar::futurize_invoke( + std::forward(func), + std::forward(args)...); + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + if (set_int_cond && interrupt_cond) + interrupt_cond.release(); + return fut; +} + +} + +template , + std::enable_if_t && + seastar::is_future::value, int> = 0> +auto call_with_interruption( + InterruptCondRef interrupt_condition, + Func&& func, Ret&& fut) +{ + // if "T" is already an interrupt exception, return it directly; + // otherwise, upper layer application may encounter errors executing + // the "Func" body. + if (fut.failed()) { + std::exception_ptr eptr = fut.get_exception(); + if (interrupt_condition->is_interruption(eptr)) { + return seastar::futurize::make_exception_future(std::move(eptr)); + } + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward(func), + seastar::futurize::make_exception_future( + std::move(eptr))); + } + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward(func), + std::move(fut)); +} + +template , + std::enable_if_t, int> = 0> +auto call_with_interruption( + InterruptCondRef interrupt_condition, + Func&& func, T&& arg) +{ + // if "T" is already an interrupt exception, return it directly; + // otherwise, upper layer application may encounter errors executing + // the "Func" body. + return seastar::futurize::make_exception_future( + std::get<0>(std::tuple(std::forward(arg)))); +} + +template , + std::enable_if_t< + !InterruptCond::template is_interruption_v && + !seastar::is_future::value, int> = 0> +auto call_with_interruption( + InterruptCondRef interrupt_condition, + Func&& func, T&& arg) +{ + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward(func), + std::forward(arg)); +} + +template > +auto call_with_interruption( + InterruptCondRef interrupt_condition, + Func&& func) +{ + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward(func)); +} + +template > +decltype(auto) non_futurized_call_with_interruption( + InterruptCondRef interrupt_condition, + Func&& func, T&&... args) +{ + bool set_int_cond = false; + if (!interrupt_cond && interrupt_condition) { + auto [interrupt, fut] = interrupt_condition->template may_interrupt>(); + if (interrupt) { + std::rethrow_exception(fut->get_exception()); + } + set_int_cond = true; + interrupt_cond = std::move(interrupt_condition); + } + try { + if constexpr (std::is_void_v) { + std::invoke(std::forward(func), std::forward(args)...); + + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + if (set_int_cond && interrupt_cond) + interrupt_cond.release(); + return; + } else { + auto&& err = std::invoke(std::forward(func), std::forward(args)...); + if (set_int_cond && interrupt_cond) + interrupt_cond.release(); + return std::forward(err); + } + } catch (std::exception& e) { + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + if (set_int_cond && interrupt_cond) + interrupt_cond.release(); + throw e; + } +} + +template +class interruptible_future_detail> + : private seastar::future { +public: + using core_type = seastar::future; + template + using interrupt_futurize_t = + typename interruptor::template futurize_t; + using core_type::get0; + using core_type::core_type; + + [[gnu::always_inline]] + interruptible_future_detail(seastar::future&& base) + : core_type(std::move(base)) + {} + + using value_type = typename seastar::future::value_type; + using tuple_type = typename seastar::future::tuple_type; + + [[gnu::always_inline]] + value_type&& get() { + return std::move(core_type::get()); + } + + using core_type::available; + using core_type::failed; + + template + [[gnu::always_inline]] + auto handle_interruption(Func&& func) { + return core_type::then_wrapped( + [this, func=std::move(func), + interrupt_condition=interrupt_cond](auto&& fut) mutable { + if (fut.failed()) { + std::exception_ptr ex = fut.get_exception(); + if (interrupt_condition->is_interruption(ex)) { + return seastar::futurize_invoke(std::move(func), std::move(ex)); + } else { + return seastar::make_exception_future(std::move(ex)); + } + } else { + return seastar::make_ready_future(fut.get()); + } + }); + } + + template >>> + [[gnu::always_inline]] + Result then_wrapped_interruptible(Func&& func) { + assert(interrupt_cond); + return core_type::then_wrapped( + [func=std::move(func), interrupt_condition=interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + std::move(interrupt_condition), + std::forward(func), + std::move(fut)); + }); + } + + template + [[gnu::always_inline]] + auto then_interruptible(Func&& func) { + assert(interrupt_cond); + if constexpr (std::is_void_v) { + auto fut = core_type::then( + [func=std::move(func), interrupt_condition=interrupt_cond] + () mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + return (interrupt_futurize_t)(std::move(fut)); + } else { + auto fut = core_type::then( + [func=std::move(func), interrupt_condition=interrupt_cond] + (T&& arg) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::forward(arg)); + }); + return (interrupt_futurize_t)(std::move(fut)); + } + } + + template >> + [[gnu::always_inline]] + Result handle_exception_interruptible(Func&& func) { + assert(interrupt_cond); + return core_type::then_wrapped( + [func=std::forward(func), + interrupt_condition=interrupt_cond](auto&& fut) mutable { + if (!fut.failed()) { + return seastar::make_ready_future(fut.get()); + } else { + return call_with_interruption( + interrupt_condition, + std::move(func), + fut.get_exception()); + } + }); + } + + template >> + [[gnu::always_inline]] + Result finally_interruptible(Func&& func) { + if constexpr (may_interrupt) { + assert(interrupt_cond); + return core_type::then_wrapped( + [func=std::forward(func), + interrupt_condition=interrupt_cond](auto&& fut) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + } else { + return core_type::finally(std::forward(func)); + } + } + + template ::template arg<0>::type)>>> + [[gnu::always_inline]] + Result handle_exception_type_interruptible(Func&& func) { + assert(interrupt_cond); + using trait = seastar::function_traits; + static_assert(trait::arity == 1, "func can take only one parameter"); + using ex_type = typename trait::template arg<0>::type; + return core_type::then_wrapped( + [func=std::forward(func), + interrupt_condition=interrupt_cond](auto&& fut) mutable + -> Result { + if (!fut.failed()) { + return seastar::make_ready_future(fut.get()); + } else { + try { + std::rethrow_exception(fut.get_exception()); + } catch (ex_type& ex) { + return call_with_interruption( + interrupt_condition, + std::move(func), ex); + } + } + }); + } + +private: + seastar::future to_future() { + return static_cast(std::move(*this)); + } + // this is only supposed to be invoked by seastar functions + template )>>> + [[gnu::always_inline]] + Result then_wrapped(Func&& func) { + return core_type::then_wrapped( + [func=std::move(func), interrupt_condition=interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + interrupt_condition, + std::forward(func), + std::move(fut)); + }); + } + // this is only supposed to be invoked by seastar functions + template >> + [[gnu::always_inline]] + Result finally(Func&& func) { + return core_type::finally(std::forward(func)); + } + template ::value, int>> + friend auto interruptor::do_for_each( + Iterator, Iterator, AsyncAction&&); + template ::value, int>> + friend auto interruptor::do_for_each( + Iterator, Iterator, AsyncAction&&); + template ::value, int>> + friend auto interruptor::repeat(AsyncAction&&); + template ::value, int>> + friend auto interruptor::repeat(AsyncAction&&); + friend class interruptible_future_builder; + template + friend struct ::seastar::futurize; + template + friend class ::seastar::future; + template + friend class seastar::internal::do_with_state; + template + friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f); + template + friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); + template + friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); + template + friend class ::crimson::maybe_handle_error_t; + template + friend class ::seastar::internal::extract_values_from_futures_vector; + friend class interruptor::parallel_for_each_state; + template + friend class interruptible_future_detail; + template + friend inline typename ResolvedVectorTransform::future_type + seastar::internal::complete_when_all( + std::vector&& futures, + typename std::vector::iterator pos) noexcept; + template + friend class ::seastar::internal::when_all_state_component; + template + friend inline auto seastar::with_lock(Lock& lock, Func&& f); +}; + +template +struct interruptible_errorator { + template + using future = interruptible_future_detail>; + + template + static interruptible_future_detail< + InterruptCond, + typename Errorator::template future> + make_ready_future(A&& value) { + return interruptible_future_detail< + InterruptCond, typename Errorator::template future>( + Errorator::template make_ready_future( + std::forward(value))); + } + template + static interruptible_future_detail< + InterruptCond, + typename Errorator::template future> + make_ready_future() { + return interruptible_future_detail< + InterruptCond, typename Errorator::template future>( + Errorator::template make_ready_future()); + } + static interruptible_future_detail< + InterruptCond, + typename Errorator::template future<>> now() { + return interruptible_future_detail< + InterruptCond, typename Errorator::template future<>>( + Errorator::now()); + } +}; + +template typename ErroratedFuture, + typename T> +class interruptible_future_detail< + InterruptCond, + ErroratedFuture<::crimson::errorated_future_marker>> + : private ErroratedFuture<::crimson::errorated_future_marker> +{ +public: + using core_type = ErroratedFuture<::crimson::errorated_future_marker>; + template + using interrupt_futurize_t = + typename interruptor::template futurize_t; + + using core_type::available; + using core_type::failed; + using core_type::core_type; + + interruptible_future_detail(seastar::future&& fut) + : core_type(std::move(fut)) + {} + + template