From fdf6d06fc3f9c6e0a419dd5dc0d8969fe224cd4c Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Sun, 4 Oct 2020 17:51:59 +0800 Subject: [PATCH] crimson/osd: use interruptible future in the PG layer Signed-off-by: Xuehan Xu --- src/crimson/common/errorator.h | 70 +++++- src/crimson/common/exception.h | 27 ++- src/crimson/common/interruptible_future.h | 104 ++++++--- src/crimson/common/operation.h | 108 ++++++++- src/crimson/osd/ec_backend.cc | 4 +- src/crimson/osd/ec_backend.h | 8 +- src/crimson/osd/objclass.cc | 3 +- src/crimson/osd/object_context.h | 80 +++++-- src/crimson/osd/ops_executer.cc | 68 +++--- src/crimson/osd/ops_executer.h | 68 ++++-- src/crimson/osd/osd_operation.h | 8 + .../osd/osd_operations/background_recovery.cc | 29 ++- .../osd/osd_operations/background_recovery.h | 8 +- .../osd/osd_operations/client_request.cc | 154 +++++++------ .../osd/osd_operations/client_request.h | 18 +- .../osd/osd_operations/recovery_subrequest.cc | 13 +- .../osd/osd_operations/replicated_request.cc | 21 +- src/crimson/osd/pg.cc | 83 +++---- src/crimson/osd/pg.h | 57 +++-- src/crimson/osd/pg_backend.cc | 134 ++++++----- src/crimson/osd/pg_backend.h | 120 +++++++--- .../osd/pg_interval_interrupt_condition.cc | 15 +- .../osd/pg_interval_interrupt_condition.h | 13 +- src/crimson/osd/pg_recovery.cc | 40 ++-- src/crimson/osd/pg_recovery.h | 17 +- src/crimson/osd/recovery_backend.cc | 40 ++-- src/crimson/osd/recovery_backend.h | 55 +++-- src/crimson/osd/replicated_backend.cc | 10 +- src/crimson/osd/replicated_backend.h | 9 +- .../osd/replicated_recovery_backend.cc | 212 ++++++++++-------- src/crimson/osd/replicated_recovery_backend.h | 59 ++--- 31 files changed, 1095 insertions(+), 560 deletions(-) diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h index 746bd9bb51de5..7fd053740841b 100644 --- a/src/crimson/common/errorator.h +++ b/src/crimson/common/errorator.h @@ -10,6 +10,13 @@ #include "include/ceph_assert.h" +namespace crimson::interruptible { + +template +class interruptible_future_detail; + +} + namespace crimson { template @@ -311,10 +318,11 @@ static constexpr auto composer(FuncHead&& head, FuncTail&&... tail) { template struct errorated_future_marker{}; +template +static inline constexpr bool is_error_v = std::is_base_of_v, T>; + template struct errorator { - template - static inline constexpr bool is_error_v = std::is_base_of_v, T>; static_assert((... && is_error_v), "errorator expects presence of ::is_error in all error types"); @@ -664,6 +672,7 @@ private: } }); } + template auto handle_error(ErrorFuncHead&& error_func_head, @@ -696,6 +705,8 @@ private: friend inline auto ::seastar::internal::do_with_impl(T&& rvalue, F&& f); template friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); + template + friend class ::crimson::interruptible::interruptible_future_detail; }; class Enabler {}; @@ -931,6 +942,59 @@ private: } }; + template + class futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>> { + public: + using type = ::crimson::interruptible::interruptible_future_detail< + InterruptCond, typename futurize::type>; + + template + static type apply(Func&& func, std::tuple&& args) { + try { + return ::seastar::futurize_apply(std::forward(func), + std::forward>(args)); + } catch (...) { + return seastar::futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>>::make_exception_future( + std::current_exception()); + } + } + + template + static type invoke(Func&& func, Args&&... args) { + try { + return ::seastar::futurize_invoke(std::forward(func), + std::forward(args)...); + } catch(...) { + return seastar::futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>>::make_exception_future( + std::current_exception()); + } + } + template + static type invoke(Func&& func, seastar::internal::monostate) { + try { + return ::seastar::futurize_invoke(std::forward(func)); + } catch(...) { + return seastar::futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>>::make_exception_future( + std::current_exception()); + } + } + template + static type make_exception_future(Arg&& arg) { + return ::seastar::futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>>::make_exception_future( + std::forward(arg)); + } + }; + template static std::exception_ptr make_exception_ptr(ErrorT&& e) { // calling via interface class due to encapsulation and friend relations. @@ -945,6 +1009,8 @@ private: // we were exploiting before. template friend class errorator; + template + friend class ::crimson::interruptible::interruptible_future_detail; }; // class errorator, generic template // no errors? errorator<>::future is plain seastar::future then! diff --git a/src/crimson/common/exception.h b/src/crimson/common/exception.h index 05caf5ebd0c2c..dafb21a941976 100644 --- a/src/crimson/common/exception.h +++ b/src/crimson/common/exception.h @@ -8,17 +8,21 @@ #include #include "crimson/common/log.h" +#include "crimson/common/interruptible_future.h" namespace crimson::common { -class system_shutdown_exception final : public std::exception{ +class interruption : public std::exception +{}; + +class system_shutdown_exception final : public interruption{ public: const char* what() const noexcept final { return "system shutting down"; } }; -class actingset_changed final : public std::exception { +class actingset_changed final : public interruption { public: actingset_changed(bool sp) : still_primary(sp) {} const char* what() const noexcept final { @@ -31,6 +35,25 @@ private: const bool still_primary; }; +template +inline seastar::future<> with_interruption( + OpFunc&& opfunc, OnInterrupt&& efunc) { + if constexpr (may_loop) { + return ::seastar::repeat( + [opfunc=std::move(opfunc), efunc=std::move(efunc)]() mutable { + return ::crimson::interruptible::interruptor::template futurize< + std::result_of_t>::apply(std::move(opfunc), std::make_tuple()) + .template handle_interruption(std::move(efunc)); + }); + } else { + return ::crimson::interruptible::interruptor::template futurize< + std::result_of_t>::apply(std::move(opfunc), std::make_tuple()) + .template handle_interruption(std::move(efunc)); + } +} + template inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args) { diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h index ef60b449a32f2..65fdb7d427124 100644 --- a/src/crimson/common/interruptible_future.h +++ b/src/crimson/common/interruptible_future.h @@ -241,6 +241,8 @@ public: typename interruptor::template futurize_t; using core_type::get0; using core_type::core_type; + using core_type::get_exception; + using core_type::ignore_ready_future; [[gnu::always_inline]] interruptible_future_detail(seastar::future&& base) @@ -319,6 +321,14 @@ public: } } + template + [[gnu::always_inline]] + auto then_unpack_interruptible(Func&& func) { + return then_interruptible([func=std::forward(func)](T&& tuple) mutable { + return std::apply(std::forward(func), std::move(tuple)); + }); + } + template >> @@ -386,6 +396,14 @@ public: }); } + + using my_type = interruptible_future_detail>; + + template + [[gnu::always_inline]] + my_type finally(Func&& func) { + return core_type::finally(std::forward(func)); + } private: seastar::future to_future() { return static_cast(std::move(*this)); @@ -405,14 +423,6 @@ private: std::move(fut)); }); } - // this is only supposed to be invoked by seastar functions - template >> - [[gnu::always_inline]] - Result finally(Func&& func) { - return core_type::finally(std::forward(func)); - } template ::value, int>> @@ -688,32 +698,36 @@ public: std::forward(err_func_tail)...)); } - template + template auto handle_error_interruptible(ErrorFunc&& errfunc) { - assert(interrupt_cond); - auto fut = core_type::handle_error( - [errfunc=std::move(errfunc), - interrupt_condition=interrupt_cond] - (auto&& err) mutable -> decltype(auto) { - constexpr bool return_void = std::is_void_v< - std::invoke_result_t>>; - constexpr bool return_err = ::crimson::is_error_v< - std::decay_t>>>; - if constexpr (return_err || return_void) { - return non_futurized_call_with_interruption( - interrupt_condition, - std::move(errfunc), - std::move(err)); - } else { - return call_with_interruption( - interrupt_condition, - std::move(errfunc), - std::move(err)); - } - }); - return (interrupt_futurize_t)(std::move(fut)); + if constexpr (interruptible) { + assert(interrupt_cond); + auto fut = core_type::handle_error( + [errfunc=std::move(errfunc), + interrupt_condition=interrupt_cond] + (auto&& err) mutable -> decltype(auto) { + constexpr bool return_void = std::is_void_v< + std::invoke_result_t>>; + constexpr bool return_err = ::crimson::is_error_v< + std::decay_t>>>; + if constexpr (return_err || return_void) { + return non_futurized_call_with_interruption( + interrupt_condition, + std::move(errfunc), + std::move(err)); + } else { + return call_with_interruption( + interrupt_condition, + std::move(errfunc), + std::move(err)); + } + }); + return (interrupt_futurize_t)(std::move(fut)); + } else { + return core_type::handle_error(std::forward(errfunc)); + } } template (error_func_head), std::forward(error_func_tail)...)); } + + template + [[gnu::always_inline]] + auto finally(Func&& func) { + auto fut = core_type::finally(std::forward(func)); + return (interrupt_futurize_t)(std::move(fut)); + } private: ErroratedFuture<::crimson::errorated_future_marker> to_future() { @@ -867,6 +888,17 @@ public: return fut; } + template + [[gnu::always_inline]] + static auto wrap_function(Func&& func) { + return [func=std::forward(func), + interrupt_condition=interrupt_cond]() mutable { + return call_with_interruption( + interrupt_condition, + std::forward(func)); + }; + } + template , std::enable_if_t::value, int> = 0> @@ -1135,6 +1167,12 @@ public: return ::seastar::internal::when_all_impl( futurize_invoke_if_func(std::forward(fut_or_funcs))...); } + + template + static inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept { + return ::seastar::internal::when_all_succeed_impl( + futurize_invoke_if_func(std::forward(fut_or_funcs))...); + } private: // return true if an new interrupt condition is created and false otherwise template diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h index e086bf95a374e..66eb468d91dfb 100644 --- a/src/crimson/common/operation.h +++ b/src/crimson/common/operation.h @@ -1,5 +1,5 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab #pragma once @@ -17,6 +17,7 @@ #include #include "include/ceph_assert.h" +#include "crimson/common/interruptible_future.h" namespace ceph { class Formatter; @@ -54,6 +55,11 @@ class blocking_future_detail { template friend blocking_future_detail> join_blocking_futures(U &&u); + template + friend blocking_future_detail< + ::crimson::interruptible::interruptible_future> + join_blocking_interruptible_futures(T&& t); + template friend class blocking_future_detail; @@ -65,11 +71,40 @@ public: blocker, std::move(fut).then(std::forward(f))); } + template + auto then_interruptible(F &&f) && { + using result = decltype(std::declval().then_interruptible(f)); + return blocking_future_detail< + typename ::crimson::interruptible::interruptor< + InterruptCond>::template futurize::type>( + blocker, + std::move(fut).then_interruptible(std::forward(f))); + } }; template using blocking_future = blocking_future_detail>; +template +using blocking_interruptible_future = blocking_future_detail< + ::crimson::interruptible::interruptible_future>; + +template +blocking_interruptible_future +make_ready_blocking_interruptible_future(U&& args) { + return blocking_interruptible_future( + nullptr, + seastar::make_ready_future(std::forward(args))); +} + +template +blocking_interruptible_future +make_exception_blocking_interruptible_future(Exception&& e) { + return blocking_interruptible_future( + nullptr, + seastar::make_exception_future(e)); +} + template blocking_future_detail> make_ready_blocking_future(U&&... args) { return blocking_future( @@ -95,6 +130,23 @@ public: blocking_future make_blocking_future(seastar::future &&f) { return blocking_future(this, std::move(f)); } + + template + blocking_interruptible_future + make_blocking_future( + crimson::interruptible::interruptible_future &&f) { + return blocking_interruptible_future( + this, std::move(f)); + } + template + blocking_interruptible_future + make_blocking_interruptible_future(seastar::future &&f) { + return blocking_interruptible_future( + this, + ::crimson::interruptible::interruptor::make_interruptible( + std::move(f))); + } + void dump(ceph::Formatter *f) const; virtual ~Blocker() = default; @@ -142,6 +194,26 @@ blocking_future<> join_blocking_futures(T &&t) { })); } +template +blocking_interruptible_future +join_blocking_interruptible_futures(T&& t) { + std::vector blockers; + blockers.reserve(t.size()); + for (auto &&bf: t) { + blockers.push_back(bf.blocker); + bf.blocker = nullptr; + } + auto agg = std::make_unique(std::move(blockers)); + return agg->make_blocking_future( + ::crimson::interruptible::interruptor::parallel_for_each( + std::forward(t), + [](auto &&bf) { + return std::move(bf.fut); + }).then_interruptible([agg=std::move(agg)] { + return seastar::make_ready_future<>(); + })); +} + /** * Common base for all crimson-osd operations. Mainly provides @@ -172,6 +244,38 @@ class Operation : public boost::intrusive_ref_counter< }); } + template + ::crimson::interruptible::interruptible_future + with_blocking_future_interruptible(blocking_future &&f) { + if (f.fut.available()) { + return std::move(f.fut); + } + assert(f.blocker); + add_blocker(f.blocker); + auto fut = std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) { + clear_blocker(blocker); + return std::move(arg); + }); + return ::crimson::interruptible::interruptible_future< + InterruptCond, T>(std::move(fut)); + } + + template + ::crimson::interruptible::interruptible_future + with_blocking_future_interruptible( + blocking_interruptible_future &&f) { + if (f.fut.available()) { + return std::move(f.fut); + } + assert(f.blocker); + add_blocker(f.blocker); + return std::move(f.fut).template then_wrapped_interruptible( + [this, blocker=f.blocker](auto &&arg) { + clear_blocker(blocker); + return std::move(arg); + }); + } + void dump(ceph::Formatter *f); void dump_brief(ceph::Formatter *f); virtual ~Operation() = default; diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 96309c36622f3..5cc2ffaf5a8a5 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -12,7 +12,7 @@ ECBackend::ECBackend(shard_id_t shard, // todo } -ECBackend::ll_read_errorator::future +ECBackend::ll_read_ierrorator::future ECBackend::_read(const hobject_t& hoid, const uint64_t off, const uint64_t len, @@ -22,7 +22,7 @@ ECBackend::_read(const hobject_t& hoid, return seastar::make_ready_future(); } -seastar::future +ECBackend::interruptible_future ECBackend::_submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 2161e061dac41..44da609cb8e60 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -22,11 +22,9 @@ public: } void on_actingset_changed(peering_info_t pi) final {} private: - ll_read_errorator::future _read(const hobject_t& hoid, - uint64_t off, - uint64_t len, - uint32_t flags) override; - seastar::future + ll_read_ierrorator::future + _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override; + interruptible_future _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, diff --git a/src/crimson/osd/objclass.cc b/src/crimson/osd/objclass.cc index bc3284e266126..66e4079ad5238 100644 --- a/src/crimson/osd/objclass.cc +++ b/src/crimson/osd/objclass.cc @@ -25,7 +25,8 @@ static inline int execute_osd_op(cls_method_context_t hctx, OSDOp& op) // created for us by `seastar::async` in `::do_op_call()`. int ret = 0; using osd_op_errorator = crimson::osd::OpsExecuter::osd_op_errorator; - reinterpret_cast(hctx)->execute_op(op).handle_error( + reinterpret_cast(hctx)->execute_op(op) + .handle_error_interruptible( osd_op_errorator::all_same_way([&ret] (const std::error_code& err) { assert(err.value() > 0); ret = -err.value(); diff --git a/src/crimson/osd/object_context.h b/src/crimson/osd/object_context.h index ac80b6d5beeb1..251c8796b1dc2 100644 --- a/src/crimson/osd/object_context.h +++ b/src/crimson/osd/object_context.h @@ -132,34 +132,66 @@ public: boost::intrusive::list_member_hook<>, &ObjectContext::list_hook>; - template + template auto with_lock(Func&& func) { - switch (Type) { - case RWState::RWWRITE: - return _with_lock(lock.for_write(), std::forward(func)); - case RWState::RWREAD: - return _with_lock(lock.for_read(), std::forward(func)); - case RWState::RWEXCL: - return _with_lock(lock.for_excl(), std::forward(func)); - case RWState::RWNONE: - return seastar::futurize_invoke(std::forward(func)); - default: - assert(0 == "noop"); + if constexpr (!std::is_void_v) { + auto wrapper = ::crimson::interruptible::interruptor::wrap_function(std::forward(func)); + switch (Type) { + case RWState::RWWRITE: + return _with_lock(lock.for_write(), std::move(wrapper)); + case RWState::RWREAD: + return _with_lock(lock.for_read(), std::move(wrapper)); + case RWState::RWEXCL: + return _with_lock(lock.for_excl(), std::move(wrapper)); + case RWState::RWNONE: + return seastar::futurize_invoke(std::move(wrapper)); + default: + assert(0 == "noop"); + } + } else { + switch (Type) { + case RWState::RWWRITE: + return _with_lock(lock.for_write(), std::forward(func)); + case RWState::RWREAD: + return _with_lock(lock.for_read(), std::forward(func)); + case RWState::RWEXCL: + return _with_lock(lock.for_excl(), std::forward(func)); + case RWState::RWNONE: + return seastar::futurize_invoke(std::forward(func)); + default: + assert(0 == "noop"); + } } } - template + template auto with_promoted_lock(Func&& func) { - switch (Type) { - case RWState::RWWRITE: - return _with_lock(lock.excl_from_write(), std::forward(func)); - case RWState::RWREAD: - return _with_lock(lock.excl_from_read(), std::forward(func)); - case RWState::RWEXCL: - return _with_lock(lock.excl_from_excl(), std::forward(func)); - case RWState::RWNONE: - return _with_lock(lock.for_excl(), std::forward(func)); - default: - assert(0 == "noop"); + if constexpr (!std::is_void_v) { + auto wrapper = ::crimson::interruptible::interruptor::wrap_function(std::forward(func)); + switch (Type) { + case RWState::RWWRITE: + return _with_lock(lock.excl_from_write(), std::move(wrapper)); + case RWState::RWREAD: + return _with_lock(lock.excl_from_read(), std::move(wrapper)); + case RWState::RWEXCL: + return _with_lock(lock.excl_from_excl(), std::move(wrapper)); + case RWState::RWNONE: + return _with_lock(lock.for_excl(), std::move(wrapper)); + default: + assert(0 == "noop"); + } + } else { + switch (Type) { + case RWState::RWWRITE: + return _with_lock(lock.excl_from_write(), std::forward(func)); + case RWState::RWREAD: + return _with_lock(lock.excl_from_read(), std::forward(func)); + case RWState::RWEXCL: + return _with_lock(lock.excl_from_excl(), std::forward(func)); + case RWState::RWNONE: + return _with_lock(lock.for_excl(), std::forward(func)); + default: + assert(0 == "noop"); + } } } diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 6b6614e93d884..63ca81af9ba65 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -28,7 +28,7 @@ namespace { namespace crimson::osd { -OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op) +OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op) { std::string cname, mname; ceph::bufferlist indata; @@ -150,7 +150,7 @@ static watch_info_t create_watch_info(const OSDOp& osd_op, }; } -OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch( +OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch( OSDOp& osd_op, ObjectState& os, ceph::os::Transaction& txn) @@ -192,7 +192,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch( }); } -OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_reconnect( +OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect( OSDOp& osd_op, ObjectState& os, ceph::os::Transaction& txn) @@ -207,7 +207,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_reconnect( } } -OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch( +OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_unwatch( OSDOp& osd_op, ObjectState& os, ceph::os::Transaction& txn) @@ -247,7 +247,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch( }); } -OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_ping( +OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_ping( OSDOp& osd_op, ObjectState& os, ceph::os::Transaction& txn) @@ -271,7 +271,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_ping( return seastar::now(); } -OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch( +OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch( OSDOp& osd_op, ObjectState& os, ceph::os::Transaction& txn) @@ -304,7 +304,7 @@ static uint64_t get_next_notify_id(epoch_t e) return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++)); } -OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify( +OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify( OSDOp& osd_op, const ObjectState& os) { @@ -363,7 +363,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify( }); } -OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack( +OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify_ack( OSDOp& osd_op, const ObjectState& os) { @@ -421,7 +421,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack( }); } -OpsExecuter::osd_op_errorator::future<> +OpsExecuter::interruptible_errorated_future OpsExecuter::execute_op(OSDOp& osd_op) { // TODO: dispatch via call table? @@ -459,7 +459,8 @@ OpsExecuter::execute_op(OSDOp& osd_op) return backend.get_xattrs(os, osd_op); }); case CEPH_OSD_OP_RMXATTR: - return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { + return do_write_op( + [&osd_op] (auto& backend, auto& os, auto& txn) { return backend.rm_xattr(os, osd_op, txn); }, true); case CEPH_OSD_OP_CREATE: @@ -642,7 +643,7 @@ static inline std::unique_ptr get_pgls_filter( return filter; } -static seastar::future pgls_filter( +static PG::interruptible_future pgls_filter( const PGLSFilter& filter, const PGBackend& backend, const hobject_t& sobj) @@ -650,7 +651,7 @@ static seastar::future pgls_filter( if (const auto xattr = filter.get_xattr(); !xattr.empty()) { logger().debug("pgls_filter: filter is interested in xattr={} for obj={}", xattr, sobj); - return backend.getxattr(sobj, xattr).safe_then( + return backend.getxattr(sobj, xattr).safe_then_interruptible( [&filter, sobj] (ceph::bufferptr bp) { logger().debug("pgls_filter: got xvalue for obj={}", sobj); @@ -675,7 +676,7 @@ static seastar::future pgls_filter( } } -static seastar::future do_pgnls_common( +static PG::interruptible_future do_pgnls_common( const hobject_t& pg_start, const hobject_t& pg_end, const PGBackend& backend, @@ -691,8 +692,9 @@ static seastar::future do_pgnls_common( throw std::invalid_argument("outside of PG bounds"); } - return backend.list_objects(lower_bound, limit).then( - [&backend, filter, nspace](auto&& ret) { + return backend.list_objects(lower_bound, limit).then_interruptible( + [&backend, filter, nspace](auto&& ret) + -> PG::interruptible_future, hobject_t>> { auto& [objects, next] = ret; auto in_my_namespace = [&nspace](const hobject_t& obj) { using crimson::common::local_conf; @@ -704,7 +706,8 @@ static seastar::future do_pgnls_common( return obj.get_namespace() == nspace; } }; - auto to_pglsed = [&backend, filter] (const hobject_t& obj) { + auto to_pglsed = [&backend, filter] (const hobject_t& obj) + -> PG::interruptible_future { // this transformation looks costly. However, I don't have any // reason to think PGLS* operations are critical for, let's say, // general performance. @@ -733,7 +736,7 @@ static seastar::future do_pgnls_common( std::tuple, hobject_t>>( std::make_tuple(std::move(items), std::move(next))); }); - }).then( + }).then_interruptible( [pg_end] (auto&& ret) { auto& [items, next] = ret; auto is_matched = [] (const auto& obj) { @@ -757,7 +760,7 @@ static seastar::future do_pgnls_common( }); } -static seastar::future<> do_pgnls( +static PG::interruptible_future<> do_pgnls( const PG& pg, const std::string& nspace, OSDOp& osd_op) @@ -778,13 +781,13 @@ static seastar::future<> do_pgnls( nspace, osd_op.op.pgls.count, nullptr /* no filter */) - .then([&osd_op](bufferlist bl) { + .then_interruptible([&osd_op](bufferlist bl) { osd_op.outdata = std::move(bl); return seastar::now(); }); } -static seastar::future<> do_pgnls_filtered( +static PG::interruptible_future<> do_pgnls_filtered( const PG& pg, const std::string& nspace, OSDOp& osd_op) @@ -822,14 +825,14 @@ static seastar::future<> do_pgnls_filtered( nspace, osd_op.op.pgls.count, filter.get()) - .then([&osd_op](bufferlist bl) { + .then_interruptible([&osd_op](bufferlist bl) { osd_op.outdata = std::move(bl); return seastar::now(); }); }); } -static seastar::future do_pgls_common( +static PG::interruptible_future do_pgls_common( const hobject_t& pg_start, const hobject_t& pg_end, const PGBackend& backend, @@ -846,12 +849,13 @@ static seastar::future do_pgls_common( } using entries_t = decltype(pg_ls_response_t::entries); - return backend.list_objects(lower_bound, limit).then( + return backend.list_objects(lower_bound, limit).then_interruptible( [&backend, filter, nspace](auto&& ret) { auto& [objects, next] = ret; - return seastar::when_all( - seastar::map_reduce(std::move(objects), - [&backend, filter, nspace](const hobject_t& obj) { + return PG::interruptor::when_all( + PG::interruptor::map_reduce(std::move(objects), + [&backend, filter, nspace](const hobject_t& obj) + -> PG::interruptible_future{ if (obj.get_namespace() == nspace) { if (filter) { return pgls_filter(*filter, backend, obj); @@ -870,7 +874,7 @@ static seastar::future do_pgls_common( return entries; }), seastar::make_ready_future(next)); - }).then([pg_end](auto&& ret) { + }).then_interruptible([pg_end](auto&& ret) { auto entries = std::move(std::get<0>(ret).get0()); auto next = std::move(std::get<1>(ret).get0()); pg_ls_response_t response; @@ -884,7 +888,7 @@ static seastar::future do_pgls_common( }); } -static seastar::future<> do_pgls( +static PG::interruptible_future<> do_pgls( const PG& pg, const std::string& nspace, OSDOp& osd_op) @@ -906,13 +910,13 @@ static seastar::future<> do_pgls( nspace, osd_op.op.pgls.count, nullptr /* no filter */) - .then([&osd_op](bufferlist bl) { + .then_interruptible([&osd_op](bufferlist bl) { osd_op.outdata = std::move(bl); return seastar::now(); }); } -static seastar::future<> do_pgls_filtered( +static PG::interruptible_future<> do_pgls_filtered( const PG& pg, const std::string& nspace, OSDOp& osd_op) @@ -950,14 +954,14 @@ static seastar::future<> do_pgls_filtered( nspace, osd_op.op.pgls.count, filter.get()) - .then([&osd_op](bufferlist bl) { + .then_interruptible([&osd_op](bufferlist bl) { osd_op.outdata = std::move(bl); return seastar::now(); }); }); } -seastar::future<> +PgOpsExecuter::interruptible_future<> PgOpsExecuter::execute_op(OSDOp& osd_op) { logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op)); diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 54a715b0060b4..854051ede9549 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -19,7 +19,9 @@ #include "crimson/osd/object_context.h" #include "crimson/common/errorator.h" +#include "crimson/common/interruptible_future.h" #include "crimson/common/type_helpers.h" +#include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/shard_services.h" @@ -55,6 +57,33 @@ class OpsExecuter { crimson::ct_error::not_connected, crimson::ct_error::timed_out>; + using call_ierrorator = + ::crimson::interruptible::interruptible_errorator< + IOInterruptCondition, call_errorator>; + using read_ierrorator = + ::crimson::interruptible::interruptible_errorator< + IOInterruptCondition, read_errorator>; + using write_iertr = + ::crimson::interruptible::interruptible_errorator< + IOInterruptCondition, write_ertr>; + using get_attr_ierrorator = + ::crimson::interruptible::interruptible_errorator< + IOInterruptCondition, get_attr_errorator>; + using watch_ierrorator = + ::crimson::interruptible::interruptible_errorator< + IOInterruptCondition, watch_errorator>; + + template + using interruptible_errorated_future = + ::crimson::interruptible::interruptible_errorated_future< + IOInterruptCondition, Errorator, T>; + using interruptor = + ::crimson::interruptible::interruptor; + template + using interruptible_future = + ::crimson::interruptible::interruptible_future< + IOInterruptCondition, T>; + public: // because OpsExecuter is pretty heavy-weight object we want to ensure // it's not copied nor even moved by accident. Performance is the sole @@ -69,6 +98,9 @@ public: get_attr_errorator, watch_errorator, PGBackend::stat_errorator>; + using osd_op_ierrorator = + ::crimson::interruptible::interruptible_errorator< + IOInterruptCondition, osd_op_errorator>; private: // an operation can be divided into two stages: main and effect-exposing @@ -106,31 +138,31 @@ private: MainFunc&& main_func, EffectFunc&& effect_func); - call_errorator::future<> do_op_call(class OSDOp& osd_op); - watch_errorator::future<> do_op_watch( + call_ierrorator::future<> do_op_call(class OSDOp& osd_op); + watch_ierrorator::future<> do_op_watch( class OSDOp& osd_op, class ObjectState& os, ceph::os::Transaction& txn); - watch_errorator::future<> do_op_watch_subop_watch( + watch_ierrorator::future<> do_op_watch_subop_watch( class OSDOp& osd_op, class ObjectState& os, ceph::os::Transaction& txn); - watch_errorator::future<> do_op_watch_subop_reconnect( + watch_ierrorator::future<> do_op_watch_subop_reconnect( class OSDOp& osd_op, class ObjectState& os, ceph::os::Transaction& txn); - watch_errorator::future<> do_op_watch_subop_unwatch( + watch_ierrorator::future<> do_op_watch_subop_unwatch( class OSDOp& osd_op, class ObjectState& os, ceph::os::Transaction& txn); - watch_errorator::future<> do_op_watch_subop_ping( + watch_ierrorator::future<> do_op_watch_subop_ping( class OSDOp& osd_op, class ObjectState& os, ceph::os::Transaction& txn); - watch_errorator::future<> do_op_notify( + watch_ierrorator::future<> do_op_notify( class OSDOp& osd_op, const class ObjectState& os); - watch_errorator::future<> do_op_notify_ack( + watch_ierrorator::future<> do_op_notify_ack( class OSDOp& osd_op, const class ObjectState& os); @@ -178,10 +210,11 @@ public: msg(msg) { } - osd_op_errorator::future<> execute_op(class OSDOp& osd_op); + interruptible_errorated_future + execute_op(class OSDOp& osd_op); template - osd_op_errorator::future<> flush_changes(MutFunc&& mut_func) &&; + osd_op_ierrorator::future<> flush_changes(MutFunc&& mut_func) &&; const auto& get_message() const { return msg; @@ -236,14 +269,14 @@ auto OpsExecuter::with_effect_on_obc( } template -OpsExecuter::osd_op_errorator::future<> OpsExecuter::flush_changes( +OpsExecuter::osd_op_ierrorator::future<> OpsExecuter::flush_changes( MutFunc&& mut_func) && { const bool want_mutate = !txn.empty(); // osd_op_params are instantiated by every wr-like operation. assert(osd_op_params || !want_mutate); assert(obc); - auto maybe_mutated = osd_op_errorator::now(); + auto maybe_mutated = interruptor::make_interruptible(osd_op_errorator::now()); if (want_mutate) { maybe_mutated = std::forward(mut_func)(std::move(txn), std::move(obc), @@ -253,9 +286,9 @@ OpsExecuter::osd_op_errorator::future<> OpsExecuter::flush_changes( if (__builtin_expect(op_effects.empty(), true)) { return maybe_mutated; } else { - return maybe_mutated.safe_then([this] { + return maybe_mutated.safe_then_interruptible([this] { // let's do the cleaning of `op_effects` in destructor - return crimson::do_for_each(op_effects, [] (auto& op_effect) { + return interruptor::do_for_each(op_effects, [] (auto& op_effect) { return op_effect->execute(); }); }); @@ -264,12 +297,17 @@ OpsExecuter::osd_op_errorator::future<> OpsExecuter::flush_changes( // PgOpsExecuter -- a class for executing ops targeting a certain PG. class PgOpsExecuter { + template + using interruptible_future = + ::crimson::interruptible::interruptible_future< + IOInterruptCondition, T>; + public: PgOpsExecuter(const PG& pg, const MOSDOp& msg) : pg(pg), nspace(msg.get_hobj().nspace) { } - seastar::future<> execute_op(class OSDOp& osd_op); + interruptible_future<> execute_op(class OSDOp& osd_op); private: const PG& pg; diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 1e438f90dbaa4..a7700039b5ae3 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -4,6 +4,7 @@ #pragma once #include "crimson/osd/osd_operation_sequencer.h" +#include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/scheduler/scheduler.h" namespace crimson::osd { @@ -39,6 +40,13 @@ static_assert( template class OperationT : public Operation { public: + template + using interruptible_future = + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition, ValuesT>; + using interruptor = + ::crimson::interruptible::interruptor< + ::crimson::osd::IOInterruptCondition>; static constexpr const char *type_name = OP_NAMES[static_cast(T::type)]; using IRef = boost::intrusive_ptr; diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc index 126e0e9029bff..7614cc9102a6c 100644 --- a/src/crimson/osd/osd_operations/background_recovery.cc +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -51,7 +51,11 @@ seastar::future<> BackgroundRecovery::start() IRef ref = this; return ss.throttler.with_throttle_while( this, get_scheduler_params(), [this] { - return do_recovery(); + return interruptor::with_interruption([this] { + return do_recovery(); + }, [](std::exception_ptr) { + return seastar::make_ready_future(false); + }, pg); }).handle_exception_type([ref, this](const std::system_error& err) { if (err.code() == std::make_error_code(std::errc::interrupted)) { logger().debug("{} recovery interruped: {}", *pg, err.what()); @@ -61,12 +65,14 @@ seastar::future<> BackgroundRecovery::start() }); } -seastar::future UrgentRecovery::do_recovery() +UrgentRecovery::interruptible_future +UrgentRecovery::do_recovery() { + logger().debug("{}: {}", __func__, *this); if (!pg->has_reset_since(epoch_started)) { - return with_blocking_future( + return with_blocking_future_interruptible( pg->get_recovery_handler()->recover_missing(soid, need) - ).then([] { + ).then_interruptible([] { return seastar::make_ready_future(false); }); } @@ -76,7 +82,8 @@ seastar::future UrgentRecovery::do_recovery() void UrgentRecovery::print(std::ostream &lhs) const { lhs << "UrgentRecovery(" << pg->get_pgid() << ", " - << soid << ", v" << need << ")"; + << soid << ", v" << need << ", epoch_started: " + << epoch_started << ")"; } void UrgentRecovery::dump_detail(Formatter *f) const @@ -101,11 +108,12 @@ PglogBasedRecovery::PglogBasedRecovery( crimson::osd::scheduler::scheduler_class_t::background_recovery) {} -seastar::future PglogBasedRecovery::do_recovery() +PglogBasedRecovery::interruptible_future +PglogBasedRecovery::do_recovery() { if (pg->has_reset_since(epoch_started)) return seastar::make_ready_future(false); - return with_blocking_future( + return with_blocking_future_interruptible( pg->get_recovery_handler()->start_recovery_ops( crimson::common::local_conf()->osd_recovery_max_single_start)); } @@ -115,7 +123,8 @@ BackfillRecovery::BackfillRecoveryPipeline &BackfillRecovery::bp(PG &pg) return pg.backfill_pipeline; } -seastar::future BackfillRecovery::do_recovery() +BackfillRecovery::interruptible_future +BackfillRecovery::do_recovery() { logger().debug("{}", __func__); @@ -125,13 +134,13 @@ seastar::future BackfillRecovery::do_recovery() return seastar::make_ready_future(false); } // TODO: limits - return with_blocking_future( + return with_blocking_future_interruptible( // process_event() of our boost::statechart machine is non-reentrant. // with the backfill_pipeline we protect it from a second entry from // the implementation of BackfillListener. // additionally, this stage serves to synchronize with PeeringEvent. handle.enter(bp(*pg).process) - ).then([this] { + ).then_interruptible([this] { pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt)); return seastar::make_ready_future(false); }); diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h index b9adb87d9c6d9..d7fbc6127df0a 100644 --- a/src/crimson/osd/osd_operations/background_recovery.h +++ b/src/crimson/osd/osd_operations/background_recovery.h @@ -41,7 +41,7 @@ private: scheduler_class }; } - virtual seastar::future do_recovery() = 0; + virtual interruptible_future do_recovery() = 0; ShardServices &ss; const crimson::osd::scheduler::scheduler_class_t scheduler_class; }; @@ -67,7 +67,7 @@ public: private: void dump_detail(Formatter* f) const final; - seastar::future do_recovery() override; + interruptible_future do_recovery() override; const hobject_t soid; const eversion_t need; }; @@ -80,7 +80,7 @@ public: epoch_t epoch_started); private: - seastar::future do_recovery() override; + interruptible_future do_recovery() override; }; class BackfillRecovery final : public BackgroundRecovery { @@ -105,7 +105,7 @@ public: private: boost::intrusive_ptr evt; PipelineHandle handle; - seastar::future do_recovery() override; + interruptible_future do_recovery() override; }; template diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 7b7493e4f1c15..8595fc500cf22 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -65,102 +65,118 @@ seastar::future<> ClientRequest::start() { logger().debug("{}: start", *this); - return crimson::common::handle_system_shutdown( - [this, opref=IRef{this}]() mutable { - return seastar::repeat([this, opref=std::move(opref)]() mutable { + return seastar::repeat([this, opref=IRef{this}]() mutable { logger().debug("{}: in repeat", *this); return with_blocking_future(handle.enter(cp().await_map)) - .then([this] { - return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); + .then([this]() { + return with_blocking_future( + osd.osdmap_gate.wait_for_map( + m->get_min_epoch())); }).then([this](epoch_t epoch) { return with_blocking_future(handle.enter(cp().get_pg)); }).then([this] { return with_blocking_future(osd.wait_for_pg(m->get_spg())); }).then([this](Ref pgref) mutable { - epoch_t same_interval_since = pgref->get_interval_start_epoch(); - logger().debug("{} same_interval_since: {}", *this, same_interval_since); - return sequencer.start_op( - handle, prev_op_id, get_id(), - [this, pgref] { - PG &pg = *pgref; - if (pg.can_discard_op(*m)) { - logger().debug("{} op discarded, {}, same_primary_since: {}", - *this, pg, pg.get_info().history.same_primary_since); - return osd.send_incremental_map(conn, m->get_map_epoch()); - } - return with_blocking_future( - handle.enter(pp(pg).await_map) - ).then([this, &pg]() mutable { - return with_blocking_future( - pg.osdmap_gate.wait_for_map(m->get_min_epoch())); - }).then([this, &pg](auto map) mutable { - return with_blocking_future( - handle.enter(pp(pg).wait_for_active)); - }).then([this, &pg]() mutable { - return with_blocking_future(pg.wait_for_active_blocker.wait()); - }).then([this, pgref=std::move(pgref)]() mutable { - if (m->finish_decode()) { - m->clear_payload(); - } - if (is_pg_op()) { - return process_pg_op(pgref); - } else { - return process_op(pgref); - } - }); - }).then([this] { - sequencer.finish_op(get_id()); - return seastar::stop_iteration::yes; - }).handle_exception_type( - [this](crimson::common::actingset_changed& e) mutable { - if (e.is_primary()) { - logger().debug("operation restart, acting set changed"); - sequencer.maybe_reset(get_id()); - return seastar::stop_iteration::no; - } else { - sequencer.abort(); - logger().debug("operation abort, up primary changed"); + return interruptor::with_interruption([this, pgref]() mutable { + epoch_t same_interval_since = pgref->get_interval_start_epoch(); + logger().debug("{} same_interval_since: {}", *this, same_interval_since); + return sequencer.start_op( + handle, prev_op_id, get_id(), + interruptor::wrap_function( + [this, pgref]() mutable -> interruptible_future<> { + PG &pg = *pgref; + if (pg.can_discard_op(*m)) { + return osd.send_incremental_map(conn, m->get_map_epoch()); + } + return with_blocking_future_interruptible( + handle.enter(pp(pg).await_map) + ).then_interruptible([this, &pg]() mutable { + return with_blocking_future_interruptible( + pg.osdmap_gate.wait_for_map(m->get_min_epoch())); + }).then_interruptible([this, &pg](auto map) mutable { + return with_blocking_future_interruptible( + handle.enter(pp(pg).wait_for_active)); + }).then_interruptible([this, &pg]() mutable { + return with_blocking_future_interruptible( + pg.wait_for_active_blocker.wait()); + }).then_interruptible([this, pgref=std::move(pgref)]() mutable { + if (m->finish_decode()) { + m->clear_payload(); + } + if (is_pg_op()) { + return process_pg_op(pgref); + } else { + return process_op(pgref); + } + }); + }) + ).then_interruptible([this, pgref]() mutable { + sequencer.finish_op(get_id()); return seastar::stop_iteration::yes; - } - }).handle_exception_type( - [](seastar::broken_condition_variable&) { - return seastar::stop_iteration::yes; - }); + }); + }, [this, pgref](std::exception_ptr eptr) mutable { + if (*eptr.__cxa_exception_type() == + typeid(::crimson::common::actingset_changed)) { + try { + std::rethrow_exception(eptr); + } catch(::crimson::common::actingset_changed& e) { + if (e.is_primary()) { + logger().debug("{} operation restart, acting set changed", *this); + sequencer.maybe_reset(get_id()); + return seastar::stop_iteration::no; + } else { + logger().debug("{} operation abort, up primary changed", *this); + sequencer.abort(); + return seastar::stop_iteration::yes; + } + } + } + assert(*eptr.__cxa_exception_type() == + typeid(crimson::common::system_shutdown_exception)); + crimson::get_logger(ceph_subsys_osd).debug( + "{} operation skipped, system shutdown", *this); + return seastar::stop_iteration::yes; + }, pgref); }); }); - }); } -seastar::future<> ClientRequest::process_pg_op( +ClientRequest::interruptible_future<> +ClientRequest::process_pg_op( Ref &pg) { return pg->do_pg_ops(m) - .then([this, pg=std::move(pg)](Ref reply) { + .then_interruptible([this, pg=std::move(pg)](Ref reply) { return conn->send(reply); }); } -seastar::future<> ClientRequest::process_op(Ref &pg) +ClientRequest::interruptible_future<> +ClientRequest::process_op(Ref &pg) { - return with_blocking_future(handle.enter(pp(*pg).recover_missing)).then( + return with_blocking_future_interruptible( + handle.enter(pp(*pg).recover_missing)) + .then_interruptible( [this, pg]() mutable { return do_recover_missing(pg); - }).then([this, pg]() mutable { - return pg->already_complete(m->get_reqid()).then_unpack( + }).then_interruptible([this, pg]() mutable { + return pg->already_complete(m->get_reqid()).then_unpack_interruptible( [this, pg](bool completed, int ret) mutable - -> PG::load_obc_ertr::future<> { + -> PG::load_obc_iertr::future<> { if (completed) { auto reply = make_message( m.get(), ret, pg->get_osdmap_epoch(), CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); return conn->send(std::move(reply)); } else { - return with_blocking_future(handle.enter(pp(*pg).get_obc)).then( - [this, pg]() mutable -> PG::load_obc_ertr::future<> { + return with_blocking_future_interruptible( + handle.enter(pp(*pg).get_obc)).then_interruptible( + [this, pg]() mutable -> PG::load_obc_iertr::future<> { logger().debug("{}: got obc lock", *this); op_info.set_from_op(&*m, *pg->get_osdmap()); return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable { - return with_blocking_future(handle.enter(pp(*pg).process)).then( + return with_blocking_future_interruptible( + handle.enter(pp(*pg).process)).then_interruptible( [this, pg, obc]() mutable { return do_process(pg, obc); }); @@ -168,7 +184,7 @@ seastar::future<> ClientRequest::process_op(Ref &pg) }); } }); - }).safe_then([pg=std::move(pg)] { + }).safe_then_interruptible([pg=std::move(pg)] { return seastar::now(); }, PG::load_obc_ertr::all_same_way([](auto &code) { logger().error("ClientRequest saw error code {}", code); @@ -176,7 +192,8 @@ seastar::future<> ClientRequest::process_op(Ref &pg) })); } -seastar::future<> ClientRequest::do_recover_missing(Ref& pg) +ClientRequest::interruptible_future<> +ClientRequest::do_recover_missing(Ref& pg) { eversion_t ver; const hobject_t& soid = m->get_hobj(); @@ -196,7 +213,7 @@ seastar::future<> ClientRequest::do_recover_missing(Ref& pg) } } -seastar::future<> +ClientRequest::interruptible_future<> ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) { if (!pg->is_primary()) { @@ -213,7 +230,8 @@ ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) return conn->send(std::move(reply)); } } - return pg->do_osd_ops(m, obc, op_info).safe_then([this](Ref reply) { + return pg->do_osd_ops(m, obc, op_info).safe_then_interruptible( + [this](Ref reply) -> interruptible_future<> { return conn->send(std::move(reply)); }, crimson::ct_error::eagain::handle([this, pg]() mutable { return process_op(pg); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index a88c3b331c2d0..3b1c5670a6c73 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -62,13 +62,16 @@ public: seastar::future<> start(); private: - seastar::future<> process_pg_op(Ref& pg); - seastar::future<> process_op(Ref& pg); - seastar::future<> do_recover_missing(Ref& pgref); - seastar::future<> do_process( + interruptible_future<> do_recover_missing(Ref& pgref); + interruptible_future<> do_process( Ref& pg, crimson::osd::ObjectContextRef obc); - + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition> process_pg_op( + Ref &pg); + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition> process_op( + Ref &pg); bool is_pg_op() const; ConnectionPipeline &cp(); @@ -77,6 +80,11 @@ private: OpSequencer& sequencer; const uint64_t prev_op_id; + template + using interruptible_errorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + Errorator>; private: bool is_misdirected(const PG& pg) const; }; diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc index 820c7beabc23a..df003f7e6b63e 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.cc +++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc @@ -15,14 +15,17 @@ seastar::future<> RecoverySubRequest::start() { logger().debug("{}: start", *this); IRef opref = this; - return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())) + return with_blocking_future( + osd.osdmap_gate.wait_for_map(m->get_min_epoch())) .then([this] (epoch_t epoch) { return with_blocking_future(osd.wait_for_pg(m->get_spg())); }).then([this, opref=std::move(opref)] (Ref pgref) { - return seastar::do_with(std::move(pgref), std::move(opref), - [this](auto& pgref, auto& opref) { - return pgref->get_recovery_backend()->handle_recovery_op(m); - }); + return interruptor::with_interruption([this, opref, pgref] { + return seastar::do_with(std::move(pgref), std::move(opref), + [this](auto& pgref, auto& opref) { + return pgref->get_recovery_backend()->handle_recovery_op(m); + }); + }, [](std::exception_ptr) { return seastar::now(); }, pgref); }); } diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc index 34487f9e458ed..254f76103b83e 100644 --- a/src/crimson/osd/osd_operations/replicated_request.cc +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -60,15 +60,18 @@ seastar::future<> RepRequest::start() { logger().debug("{} start", *this); IRef ref = this; + return with_blocking_future(handle.enter(cp().await_map)) - .then([this]() { - return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch())); - }).then([this](epoch_t epoch) { - return with_blocking_future(handle.enter(cp().get_pg)); - }).then([this] { - return with_blocking_future(osd.wait_for_pg(req->get_spg())); - }).then([this, ref=std::move(ref)](Ref pg) { - return pg->handle_rep_op(std::move(req)); - }); + .then([this]() { + return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch())); + }).then([this](epoch_t epoch) { + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future(osd.wait_for_pg(req->get_spg())); + }).then([this, ref=std::move(ref)](Ref pg) { + return interruptor::with_interruption([this, ref, pg] { + return pg->handle_rep_op(std::move(req)); + }, [](std::exception_ptr) { return seastar::now(); }, pg); + }); } } diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 6ad2cad435849..0e335a2180f4e 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -566,7 +566,7 @@ seastar::future<> PG::WaitForActiveBlocker::stop() return seastar::now(); } -seastar::future<> PG::submit_transaction(const OpInfo& op_info, +PG::interruptible_future<> PG::submit_transaction(const OpInfo& op_info, ObjectContextRef&& obc, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p) @@ -608,7 +608,7 @@ seastar::future<> PG::submit_transaction(const OpInfo& op_info, std::move(osd_op_p), peering_state.get_last_peering_reset(), map_epoch, - std::move(log_entries)).then( + std::move(log_entries)).then_interruptible( [this, last_complete=peering_state.get_info().last_complete, at_version=osd_op_p.at_version](auto acked) { for (const auto& peer : acked) { @@ -635,7 +635,7 @@ void PG::fill_op_params_bump_pg_version( } } -seastar::future> PG::handle_failed_op( +PG::interruptible_future> PG::handle_failed_op( const std::error_code& e, ObjectContextRef obc, const OpsExecuter& ox, @@ -671,8 +671,8 @@ seastar::future> PG::handle_failed_op( e.message(), need_reload_obc); return (need_reload_obc ? reload_obc(*obc) - : load_obc_ertr::now() - ).safe_then([e, &m, obc = std::move(obc), this] { + : interruptor::make_interruptible(load_obc_ertr::now()) + ).safe_then_interruptible([e, &m, obc = std::move(obc), this] { auto reply = make_message( &m, -e.value(), get_osdmap_epoch(), 0, false); reply->set_enoent_reply_versions( @@ -682,7 +682,7 @@ seastar::future> PG::handle_failed_op( }, load_obc_ertr::assert_all{ "can't live with object state messed up" }); } -seastar::future<> PG::repair_object( +PG::interruptible_future<> PG::repair_object( Ref m, const hobject_t& oid, eversion_t& v) @@ -698,7 +698,7 @@ seastar::future<> PG::repair_object( return std::move(fut); } -PG::do_osd_ops_ertr::future> +PG::do_osd_ops_iertr::future> PG::do_osd_ops( Ref m, ObjectContextRef obc, @@ -708,20 +708,21 @@ PG::do_osd_ops( throw crimson::common::system_shutdown_exception(); } - using osd_op_errorator = OpsExecuter::osd_op_errorator; + using osd_op_ierrorator = + OpsExecuter::osd_op_ierrorator; const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head() : m->get_hobj(); auto ox = std::make_unique( obc, op_info, get_pool().info, get_backend(), *m); - return crimson::do_for_each( - m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) { + return interruptor::do_for_each( + m->ops.begin(), m->ops.end(), [obc, m, ox = ox.get()](OSDOp& osd_op) { logger().debug( "do_osd_ops: {} - object {} - handling op {}", *m, obc->obs.oi.soid, ceph_osd_op_name(osd_op.op.op)); return ox->execute_op(osd_op); - }).safe_then([this, obc, m, ox = ox.get(), &op_info] { + }).safe_then_interruptible([this, obc, m, ox = ox.get(), &op_info] { logger().debug( "do_osd_ops: {} - object {} all operations successful", *m, @@ -730,7 +731,7 @@ PG::do_osd_ops( [this, m, &op_info] (auto&& txn, auto&& obc, auto&& osd_op_p, - bool user_modify) -> osd_op_errorator::future<> { + bool user_modify) -> osd_op_ierrorator::future<> { logger().debug( "do_osd_ops: {} - object {} submitting txn", *m, @@ -742,7 +743,8 @@ PG::do_osd_ops( std::move(txn), std::move(osd_op_p)); }); - }).safe_then([this, m, obc, rvec = op_info.allows_returnvec()] { + }).safe_then_interruptible_tuple([this, m, obc, rvec = op_info.allows_returnvec()]() + -> PG::do_osd_ops_iertr::future> { // TODO: should stop at the first op which returns a negative retval, // cmpext uses it for returning the index of first unmatched byte int result = m->ops.empty() ? 0 : m->ops.back().rval.code; @@ -762,7 +764,8 @@ PG::do_osd_ops( return PG::do_osd_ops_ertr::make_ready_future>( std::move(reply)); }, crimson::ct_error::object_corrupted::handle([m, obc, this] { - return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version).then([] { + return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version) + .then_interruptible([] { return PG::do_osd_ops_ertr::future>( crimson::ct_error::eagain::make()); }); @@ -774,7 +777,7 @@ PG::do_osd_ops( })); } -seastar::future> PG::do_pg_ops(Ref m) +PG::interruptible_future> PG::do_pg_ops(Ref m) { if (__builtin_expect(stopping, false)) { throw crimson::common::system_shutdown_exception(); @@ -782,15 +785,15 @@ seastar::future> PG::do_pg_ops(Ref m) auto ox = std::make_unique(std::as_const(*this), std::as_const(*m)); - return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) { + return interruptor::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) { logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op)); return ox->execute_op(osd_op); - }).then([m, this, ox = std::move(ox)] { + }).then_interruptible([m, this, ox = std::move(ox)] { auto reply = make_message(m.get(), 0, get_osdmap_epoch(), CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); return seastar::make_ready_future>(std::move(reply)); - }).handle_exception_type([=](const crimson::osd::error& e) { + }).handle_exception_type_interruptible([=](const crimson::osd::error& e) { auto reply = make_message( m.get(), -e.code().value(), get_osdmap_epoch(), 0, false); reply->set_enoent_reply_versions(peering_state.get_info().last_update, @@ -852,7 +855,7 @@ std::optional PG::resolve_oid( } template -PG::load_obc_ertr::future<> +PG::load_obc_iertr::future<> PG::with_head_obc(hobject_t oid, with_obc_func_t&& func) { logger().debug("{} {}", __func__, oid); @@ -860,19 +863,19 @@ PG::with_head_obc(hobject_t oid, with_obc_func_t&& func) assert(oid.is_head()); auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid); obc->append_to(obc_set_accessing); - return obc->with_lock( + return obc->with_lock( [oid=std::move(oid), existed=existed, obc=obc, func=std::move(func), this] { - auto loaded = load_obc_ertr::make_ready_future(obc); + auto loaded = load_obc_iertr::make_ready_future(obc); if (existed) { logger().debug("with_head_obc: found {} in cache", oid); } else { logger().debug("with_head_obc: cache miss on {}", oid); - loaded = obc->with_promoted_lock([this, obc] { + loaded = obc->with_promoted_lock([this, obc] { return load_head_obc(obc); }); } - return loaded.safe_then([func=std::move(func)](auto obc) { + return loaded.safe_then_interruptible([func = std::move(func)](auto obc) { return func(std::move(obc)); }); }).finally([this, pgref, obc=std::move(obc)] { @@ -882,12 +885,12 @@ PG::with_head_obc(hobject_t oid, with_obc_func_t&& func) } template -PG::load_obc_ertr::future<> +PG::load_obc_iertr::future<> PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func) { assert(!oid.is_head()); return with_head_obc(oid.get_head(), - [oid, func=std::move(func), this](auto head) -> load_obc_ertr::future<> { + [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> { auto coid = resolve_oid(head->get_ro_ss(), oid); if (!coid) { // TODO: return crimson::ct_error::enoent::make(); @@ -898,22 +901,22 @@ PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func) return clone->template with_lock( [coid=*coid, existed=existed, head=std::move(head), clone=std::move(clone), - func=std::move(func), this]() -> load_obc_ertr::future<> { - auto loaded = load_obc_ertr::make_ready_future(clone); + func=std::move(func), this]() -> load_obc_iertr::future<> { + auto loaded = load_obc_iertr::make_ready_future(clone); if (existed) { logger().debug("with_clone_obc: found {} in cache", coid); } else { logger().debug("with_clone_obc: cache miss on {}", coid); loaded = clone->template with_promoted_lock( [coid, clone, head, this] { - return backend->load_metadata(coid).safe_then( + return backend->load_metadata(coid).safe_then_interruptible( [coid, clone=std::move(clone), head=std::move(head)](auto md) mutable { clone->set_clone_state(std::move(md->os), std::move(head)); return clone; }); }); } - return loaded.safe_then([func=std::move(func)](auto clone) { + return loaded.safe_then_interruptible([func = std::move(func)](auto clone) { return func(std::move(clone)); }); }); @@ -921,14 +924,15 @@ PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func) } // explicitly instantiate the used instantiations -template PG::load_obc_ertr::future<> +template PG::load_obc_iertr::future<> PG::with_head_obc(hobject_t, with_obc_func_t&&); -PG::load_obc_ertr::future +PG::load_obc_iertr::future PG::load_head_obc(ObjectContextRef obc) { hobject_t oid = obc->get_oid(); - return backend->load_metadata(oid).safe_then([obc=std::move(obc)](auto md) + return backend->load_metadata(oid).safe_then_interruptible( + [obc=std::move(obc)](auto md) -> load_obc_ertr::future { const hobject_t& oid = md->os.oi.soid; logger().debug( @@ -947,11 +951,11 @@ PG::load_head_obc(ObjectContextRef obc) }); } -PG::load_obc_ertr::future<> +PG::load_obc_iertr::future<> PG::reload_obc(crimson::osd::ObjectContext& obc) const { assert(obc.is_head()); - return backend->load_metadata(obc.get_oid()).safe_then([&obc](auto md) + return backend->load_metadata(obc.get_oid()).safe_then_interruptible([&obc](auto md) -> load_obc_ertr::future<> { logger().debug( "{}: reloaded obs {} for {}", @@ -970,7 +974,7 @@ PG::reload_obc(crimson::osd::ObjectContext& obc) const }); } -PG::load_obc_ertr::future<> +PG::load_obc_iertr::future<> PG::with_locked_obc(Ref &m, const OpInfo &op_info, Operation *op, PG::with_obc_func_t &&f) { @@ -1002,7 +1006,7 @@ PG::with_locked_obc(Ref &m, const OpInfo &op_info, }; } -seastar::future<> PG::handle_rep_op(Ref req) +PG::interruptible_future<> PG::handle_rep_op(Ref req) { if (__builtin_expect(stopping, false)) { return seastar::make_exception_future<>( @@ -1021,8 +1025,9 @@ seastar::future<> PG::handle_rep_op(Ref req) decode(log_entries, p); peering_state.append_log(std::move(log_entries), req->pg_trim_to, req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false); - return shard_services.get_store().do_transaction(coll_ref, std::move(txn)) - .then([req, lcod=peering_state.get_info().last_complete, this] { + return interruptor::make_interruptible(shard_services.get_store().do_transaction( + coll_ref, std::move(txn))).then_interruptible( + [req, lcod=peering_state.get_info().last_complete, this] { peering_state.update_last_complete_ondisk(lcod); const auto map_epoch = get_osdmap_epoch(); auto reply = make_message( @@ -1132,7 +1137,7 @@ bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const { return false; } -seastar::future> +PG::interruptible_future> PG::already_complete(const osd_reqid_t& reqid) { eversion_t version; diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index ac220f841731f..346f60b9cdcaf 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -21,9 +21,11 @@ #include "crimson/osd/object_context.h" #include "osd/PeeringState.h" +#include "crimson/common/interruptible_future.h" #include "crimson/common/type_helpers.h" #include "crimson/os/futurized_collection.h" #include "crimson/osd/backfill_state.h" +#include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/replicated_request.h" @@ -79,6 +81,11 @@ class PG : public boost::intrusive_ref_counter< seastar::timer renew_lease_timer; public: + template + using interruptible_future = + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition, T>; + PG(spg_t pgid, pg_shard_t pg_shard, crimson::os::CollectionRef coll_ref, @@ -479,16 +486,30 @@ public: using load_obc_ertr = crimson::errorator< crimson::ct_error::object_corrupted>; - - load_obc_ertr::future + using load_obc_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + load_obc_ertr>; + using interruptor = ::crimson::interruptible::interruptor< + ::crimson::osd::IOInterruptCondition>; + load_obc_iertr::future< + std::pair> + get_or_load_clone_obc( + hobject_t oid, crimson::osd::ObjectContextRef head_obc); + + load_obc_iertr::future< + std::pair> + get_or_load_head_obc(hobject_t oid); + + load_obc_iertr::future load_head_obc(ObjectContextRef obc); - load_obc_ertr::future<> + load_obc_iertr::future<> reload_obc(crimson::osd::ObjectContext& obc) const; public: using with_obc_func_t = - std::function (ObjectContextRef)>; + std::function (ObjectContextRef)>; using obc_accessing_list_t = boost::intrusive::list< ObjectContext, @@ -496,15 +517,15 @@ public: obc_accessing_list_t obc_set_accessing; template - load_obc_ertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func); + load_obc_iertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func); - load_obc_ertr::future<> with_locked_obc( + load_obc_iertr::future<> with_locked_obc( Ref &m, const OpInfo &op_info, Operation *op, with_obc_func_t&& f); - seastar::future<> handle_rep_op(Ref m); + interruptible_future<> handle_rep_op(Ref m); void handle_rep_op_reply(crimson::net::ConnectionRef conn, const MOSDRepOpReply& m); @@ -513,9 +534,9 @@ public: private: template - load_obc_ertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func); + load_obc_iertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func); - load_obc_ertr::future get_locked_obc( + load_obc_iertr::future get_locked_obc( Operation *op, const hobject_t &oid, RWState::State type); @@ -527,23 +548,27 @@ private: osd_op_params_t& osd_op_p, Ref m, const bool user_modify); - seastar::future> handle_failed_op( + interruptible_future> handle_failed_op( const std::error_code& e, ObjectContextRef obc, const OpsExecuter& ox, const MOSDOp& m) const; using do_osd_ops_ertr = crimson::errorator< crimson::ct_error::eagain>; - do_osd_ops_ertr::future> do_osd_ops( + using do_osd_ops_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + ::crimson::errorator>; + do_osd_ops_iertr::future> do_osd_ops( Ref m, ObjectContextRef obc, const OpInfo &op_info); - seastar::future> do_pg_ops(Ref m); - seastar::future<> submit_transaction(const OpInfo& op_info, + interruptible_future> do_pg_ops(Ref m); + interruptible_future<> submit_transaction(const OpInfo& op_info, ObjectContextRef&& obc, ceph::os::Transaction&& txn, osd_op_params_t&& oop); - seastar::future<> repair_object(Ref m, + interruptible_future<> repair_object(Ref m, const hobject_t& oid, eversion_t& v); @@ -632,7 +657,7 @@ public: return &it->second; } } - seastar::future> already_complete(const osd_reqid_t& reqid); + interruptible_future> already_complete(const osd_reqid_t& reqid); int get_recovery_op_priority() const { int64_t pri = 0; get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); @@ -698,6 +723,8 @@ private: private: BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline; + + friend class IOInterruptCondition; }; std::ostream& operator<<(std::ostream&, const PG& pg); diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index d7cba8a1bc64a..ca112a0d146a0 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -63,16 +63,17 @@ PGBackend::PGBackend(shard_id_t shard, store{store} {} -PGBackend::load_metadata_ertr::future +PGBackend::load_metadata_iertr::future + PGBackend::load_metadata(const hobject_t& oid) { if (__builtin_expect(stopping, false)) { throw crimson::common::system_shutdown_exception(); } - return store->get_attrs( + return interruptor::make_interruptible(store->get_attrs( coll, - ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then( + ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible( [oid](auto &&attrs) -> load_metadata_ertr::future{ loaded_object_md_t::ref ret(new loaded_object_md_t()); if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) { @@ -120,7 +121,7 @@ PGBackend::load_metadata(const hobject_t& oid) })); } -seastar::future +PGBackend::interruptible_future PGBackend::mutate_object( std::set pg_shards, crimson::osd::ObjectContextRef &&obc, @@ -178,7 +179,7 @@ static inline bool _read_verify_data( return true; } -PGBackend::read_errorator::future<> +PGBackend::read_ierrorator::future<> PGBackend::read(const ObjectState& os, OSDOp& osd_op) { const auto& oi = os.oi; @@ -206,7 +207,7 @@ PGBackend::read(const ObjectState& os, OSDOp& osd_op) // read the whole object if length is 0 length = size; } - return _read(oi.soid, offset, length, op.flags).safe_then( + return _read(oi.soid, offset, length, op.flags).safe_then_interruptible_tuple( [&oi, &osd_op](auto&& bl) -> read_errorator::future<> { if (!_read_verify_data(oi, bl)) { // crc mismatches @@ -222,19 +223,20 @@ PGBackend::read(const ObjectState& os, OSDOp& osd_op) read_errorator::pass_further{}); } -PGBackend::read_errorator::future<> +PGBackend::read_ierrorator::future<> PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op) { const auto& op = osd_op.op; logger().trace("sparse_read: {} {}~{}", os.oi.soid, op.extent.offset, op.extent.length); - return store->fiemap(coll, ghobject_t{os.oi.soid}, + return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid}, op.extent.offset, - op.extent.length).then([&os, &osd_op, this](auto&& m) { + op.extent.length)).then_interruptible( + [&os, &osd_op, this](auto&& m) { return seastar::do_with(interval_set{std::move(m)}, [&os, &osd_op, this](auto&& extents) { - return store->readv(coll, ghobject_t{os.oi.soid}, - extents, osd_op.op.flags).safe_then( + return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid}, + extents, osd_op.op.flags)).safe_then_interruptible_tuple( [&os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> { if (_read_verify_data(os.oi, bl)) { osd_op.op.extent.length = bl.length(); @@ -286,7 +288,7 @@ namespace { } } -PGBackend::checksum_errorator::future<> +PGBackend::checksum_ierrorator::future<> PGBackend::checksum(const ObjectState& os, OSDOp& osd_op) { // sanity tests and normalize the argments @@ -318,7 +320,8 @@ PGBackend::checksum(const ObjectState& os, OSDOp& osd_op) } // read the chunk to be checksum'ed - return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags).safe_then( + return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags) + .safe_then_interruptible( [&osd_op](auto&& read_bl) mutable -> checksum_errorator::future<> { auto& checksum = osd_op.op.checksum; if (read_bl.length() != checksum.length) { @@ -351,7 +354,7 @@ PGBackend::checksum(const ObjectState& os, OSDOp& osd_op) }); } -PGBackend::cmp_ext_errorator::future<> +PGBackend::cmp_ext_ierrorator::future<> PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op) { const ceph_osd_op& op = osd_op.op; @@ -373,7 +376,7 @@ PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op) } else { ext_len = op.extent.length; } - auto read_ext = ll_read_errorator::make_ready_future(); + auto read_ext = ll_read_ierrorator::make_ready_future(); if (ext_len == 0) { logger().debug("{}: zero length extent", __func__); } else if (!os.exists || os.oi.is_whiteout()) { @@ -381,7 +384,7 @@ PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op) } else { read_ext = _read(os.oi.soid, op.extent.offset, ext_len, 0); } - return read_ext.safe_then([&osd_op](auto&& read_bl) { + return read_ext.safe_then_interruptible([&osd_op](auto&& read_bl) { int32_t retcode = 0; for (unsigned index = 0; index < osd_op.indata.length(); index++) { char byte_in_op = osd_op.indata[index]; @@ -397,7 +400,8 @@ PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op) }); } -PGBackend::stat_errorator::future<> PGBackend::stat( +PGBackend::stat_ierrorator::future<> +PGBackend::stat( const ObjectState& os, OSDOp& osd_op) { @@ -447,7 +451,7 @@ static bool is_offset_and_length_valid( } } -seastar::future<> PGBackend::write( +PGBackend::interruptible_future<> PGBackend::write( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn, @@ -510,7 +514,7 @@ seastar::future<> PGBackend::write( return seastar::now(); } -seastar::future<> PGBackend::write_same( +PGBackend::interruptible_future<> PGBackend::write_same( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn, @@ -539,7 +543,7 @@ seastar::future<> PGBackend::write_same( return seastar::now(); } -seastar::future<> PGBackend::writefull( +PGBackend::interruptible_future<> PGBackend::writefull( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn, @@ -566,7 +570,7 @@ seastar::future<> PGBackend::writefull( return seastar::now(); } -PGBackend::append_errorator::future<> PGBackend::append( +PGBackend::append_ierrorator::future<> PGBackend::append( ObjectState& os, OSDOp& osd_op, ceph::os::Transaction& txn, @@ -588,7 +592,7 @@ PGBackend::append_errorator::future<> PGBackend::append( return seastar::now(); } -PGBackend::write_ertr::future<> PGBackend::truncate( +PGBackend::write_iertr::future<> PGBackend::truncate( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn, @@ -640,7 +644,7 @@ PGBackend::write_ertr::future<> PGBackend::truncate( return write_ertr::now(); } -PGBackend::write_ertr::future<> PGBackend::zero( +PGBackend::write_iertr::future<> PGBackend::zero( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn, @@ -667,7 +671,7 @@ PGBackend::write_ertr::future<> PGBackend::zero( return write_ertr::now(); } -seastar::future<> PGBackend::create( +PGBackend::interruptible_future<> PGBackend::create( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn) @@ -693,8 +697,8 @@ seastar::future<> PGBackend::create( return seastar::now(); } -seastar::future<> PGBackend::remove(ObjectState& os, - ceph::os::Transaction& txn) +PGBackend::interruptible_future<> +PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn) { // todo: snapset txn.remove(coll->get_cid(), @@ -709,7 +713,7 @@ seastar::future<> PGBackend::remove(ObjectState& os, return seastar::now(); } -seastar::future, hobject_t>> +PGBackend::interruptible_future, hobject_t>> PGBackend::list_objects(const hobject_t& start, uint64_t limit) const { if (__builtin_expect(stopping, false)) { @@ -717,11 +721,11 @@ PGBackend::list_objects(const hobject_t& start, uint64_t limit) const } auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard}; - return store->list_objects(coll, - gstart, - ghobject_t::get_max(), - limit) - .then([](auto ret) { + return interruptor::make_interruptible(store->list_objects(coll, + gstart, + ghobject_t::get_max(), + limit)) + .then_interruptible([](auto ret) { auto& [gobjects, next] = ret; std::vector objects; boost::copy(gobjects | @@ -743,7 +747,7 @@ PGBackend::list_objects(const hobject_t& start, uint64_t limit) const }); } -seastar::future<> PGBackend::setxattr( +PGBackend::interruptible_future<> PGBackend::setxattr( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn) @@ -775,7 +779,7 @@ seastar::future<> PGBackend::setxattr( //ctx->delta_stats.num_wr++; } -PGBackend::get_attr_errorator::future<> PGBackend::getxattr( +PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr( const ObjectState& os, OSDOp& osd_op) const { @@ -788,7 +792,8 @@ PGBackend::get_attr_errorator::future<> PGBackend::getxattr( name = "_" + aname; } logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name); - return getxattr(os.oi.soid, name).safe_then([&osd_op] (ceph::bufferptr val) { + return getxattr(os.oi.soid, name).safe_then_interruptible( + [&osd_op] (ceph::bufferptr val) { osd_op.outdata.clear(); osd_op.outdata.push_back(std::move(val)); osd_op.op.xattr.value_len = osd_op.outdata.length(); @@ -798,7 +803,8 @@ PGBackend::get_attr_errorator::future<> PGBackend::getxattr( //ctx->delta_stats.num_rd++; } -PGBackend::get_attr_errorator::future PGBackend::getxattr( +PGBackend::get_attr_ierrorator::future +PGBackend::getxattr( const hobject_t& soid, std::string_view key) const { @@ -809,7 +815,7 @@ PGBackend::get_attr_errorator::future PGBackend::getxattr( return store->get_attr(coll, ghobject_t{soid}, key); } -PGBackend::get_attr_errorator::future<> PGBackend::get_xattrs( +PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs( const ObjectState& os, OSDOp& osd_op) const { @@ -831,7 +837,8 @@ PGBackend::get_attr_errorator::future<> PGBackend::get_xattrs( }); } -PGBackend::rm_xattr_ertr::future<> PGBackend::rm_xattr( +PGBackend::rm_xattr_iertr::future<> +PGBackend::rm_xattr( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn) @@ -847,14 +854,18 @@ PGBackend::rm_xattr_ertr::future<> PGBackend::rm_xattr( string attr_name{"_"}; bp.copy(osd_op.op.xattr.name_len, attr_name); txn.rmattr(coll->get_cid(), ghobject_t{os.oi.soid}, attr_name); - return rm_xattr_ertr::now(); + return rm_xattr_iertr::now(); } using get_omap_ertr = crimson::os::FuturizedStore::read_errorator::extend< crimson::ct_error::enodata>; +using get_omap_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + get_omap_ertr>; static -get_omap_ertr::future< +get_omap_iertr::future< crimson::os::FuturizedStore::omap_values_t> maybe_get_omap_vals_by_keys( crimson::os::FuturizedStore* store, @@ -870,7 +881,7 @@ maybe_get_omap_vals_by_keys( } static -get_omap_ertr::future< +get_omap_iertr::future< std::tuple> maybe_get_omap_vals( crimson::os::FuturizedStore* store, @@ -885,7 +896,7 @@ maybe_get_omap_vals( } } -PGBackend::ll_read_errorator::future +PGBackend::ll_read_ierrorator::future PGBackend::omap_get_header( const crimson::os::CollectionRef& c, const ghobject_t& oid) const @@ -893,19 +904,19 @@ PGBackend::omap_get_header( return store->omap_get_header(c, oid); } -PGBackend::ll_read_errorator::future<> +PGBackend::ll_read_ierrorator::future<> PGBackend::omap_get_header( const ObjectState& os, OSDOp& osd_op) const { - return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then( + return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then_interruptible( [&osd_op] (ceph::bufferlist&& header) { osd_op.outdata = std::move(header); return seastar::now(); }); } -PGBackend::ll_read_errorator::future<> +PGBackend::ll_read_ierrorator::future<> PGBackend::omap_get_keys( const ObjectState& os, OSDOp& osd_op) const @@ -930,7 +941,7 @@ PGBackend::omap_get_keys( std::min(max_return, local_conf()->osd_max_omap_entries_per_request); // TODO: truly chunk the reading - return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then( + return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible( [=, &osd_op](auto ret) { ceph::bufferlist result; bool truncated = false; @@ -948,7 +959,7 @@ PGBackend::omap_get_keys( osd_op.outdata.claim_append(result); encode(truncated, osd_op.outdata); return seastar::now(); - }).handle_error( + }).handle_error_interruptible( crimson::ct_error::enodata::handle([&osd_op] { uint32_t num = 0; bool truncated = false; @@ -963,7 +974,7 @@ PGBackend::omap_get_keys( //ctx->delta_stats.num_rd++; } -PGBackend::ll_read_errorator::future<> +PGBackend::ll_read_ierrorator::future<> PGBackend::omap_get_vals( const ObjectState& os, OSDOp& osd_op) const @@ -988,7 +999,8 @@ PGBackend::omap_get_vals( std::min(max_return, local_conf()->osd_max_omap_entries_per_request); // TODO: truly chunk the reading - return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then( + return maybe_get_omap_vals(store, coll, os.oi, start_after) + .safe_then_interruptible( [=, &osd_op] (auto&& ret) { auto [done, vals] = std::move(ret); assert(done); @@ -1014,7 +1026,7 @@ PGBackend::omap_get_vals( osd_op.outdata.claim_append(result); encode(truncated, osd_op.outdata); return ll_read_errorator::now(); - }).handle_error( + }).handle_error_interruptible( crimson::ct_error::enodata::handle([&osd_op] { encode(uint32_t{0} /* num */, osd_op.outdata); encode(bool{false} /* truncated */, osd_op.outdata); @@ -1028,7 +1040,7 @@ PGBackend::omap_get_vals( //ctx->delta_stats.num_rd++; } -PGBackend::ll_read_errorator::future<> +PGBackend::ll_read_ierrorator::future<> PGBackend::omap_get_vals_by_keys( const ObjectState& os, OSDOp& osd_op) const @@ -1048,11 +1060,12 @@ PGBackend::omap_get_vals_by_keys( } catch (buffer::error&) { throw crimson::osd::invalid_argument(); } - return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get).safe_then( + return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get) + .safe_then_interruptible( [&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) { encode(vals, osd_op.outdata); return ll_read_errorator::now(); - }).handle_error( + }).handle_error_interruptible( crimson::ct_error::enodata::handle([&osd_op] { uint32_t num = 0; encode(num, osd_op.outdata); @@ -1066,7 +1079,8 @@ PGBackend::omap_get_vals_by_keys( //ctx->delta_stats.num_rd++; } -seastar::future<> PGBackend::omap_set_vals( +PGBackend::interruptible_future<> +PGBackend::omap_set_vals( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn, @@ -1096,7 +1110,8 @@ seastar::future<> PGBackend::omap_set_vals( return seastar::now(); } -seastar::future<> PGBackend::omap_set_header( +PGBackend::interruptible_future<> +PGBackend::omap_set_header( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn) @@ -1111,7 +1126,7 @@ seastar::future<> PGBackend::omap_set_header( return seastar::now(); } -seastar::future<> PGBackend::omap_remove_range( +PGBackend::interruptible_future<> PGBackend::omap_remove_range( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& txn) @@ -1131,7 +1146,7 @@ seastar::future<> PGBackend::omap_remove_range( return seastar::now(); } -PGBackend::omap_clear_ertr::future<> +PGBackend::omap_clear_iertr::future<> PGBackend::omap_clear( ObjectState& os, OSDOp& osd_op, @@ -1155,14 +1170,15 @@ PGBackend::omap_clear( return omap_clear_ertr::now(); } -seastar::future PGBackend::stat( +PGBackend::interruptible_future +PGBackend::stat( CollectionRef c, const ghobject_t& oid) const { return store->stat(c, oid); } -seastar::future> +PGBackend::interruptible_future> PGBackend::fiemap( CollectionRef c, const ghobject_t& oid, diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index c02f35f3979e7..d820e9f57d5a6 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -40,10 +40,25 @@ protected: using ec_profile_t = std::map; // low-level read errorator using ll_read_errorator = crimson::os::FuturizedStore::read_errorator; + using ll_read_ierrorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + ll_read_errorator>; public: using load_metadata_ertr = crimson::errorator< crimson::ct_error::object_corrupted>; + using load_metadata_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + load_metadata_ertr>; + using interruptor = + ::crimson::interruptible::interruptor< + ::crimson::osd::IOInterruptCondition>; + template + using interruptible_future = + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition, T>; PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store); virtual ~PGBackend() = default; static std::unique_ptr create(pg_t pgid, @@ -56,71 +71,95 @@ public: std::map>; using read_errorator = ll_read_errorator::extend< crimson::ct_error::object_corrupted>; - read_errorator::future<> read( + using read_ierrorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + read_errorator>; + read_ierrorator::future<> read( const ObjectState& os, OSDOp& osd_op); - read_errorator::future<> sparse_read( + read_ierrorator::future<> sparse_read( const ObjectState& os, OSDOp& osd_op); using checksum_errorator = ll_read_errorator::extend< crimson::ct_error::object_corrupted, crimson::ct_error::invarg>; - checksum_errorator::future<> checksum( + using checksum_ierrorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + checksum_errorator>; + checksum_ierrorator::future<> checksum( const ObjectState& os, OSDOp& osd_op); using cmp_ext_errorator = ll_read_errorator::extend< crimson::ct_error::invarg>; - cmp_ext_errorator::future<> cmp_ext( + using cmp_ext_ierrorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + cmp_ext_errorator>; + cmp_ext_ierrorator::future<> cmp_ext( const ObjectState& os, OSDOp& osd_op); using stat_errorator = crimson::errorator; - stat_errorator::future<> stat( + using stat_ierrorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + stat_errorator>; + stat_ierrorator::future<> stat( const ObjectState& os, OSDOp& osd_op); // TODO: switch the entire write family to errorator. using write_ertr = crimson::errorator< crimson::ct_error::file_too_large>; - seastar::future<> create( + using write_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + write_ertr>; + interruptible_future<> create( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans); - seastar::future<> remove( + interruptible_future<> remove( ObjectState& os, ceph::os::Transaction& txn); - seastar::future<> write( + interruptible_future<> write( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); - seastar::future<> write_same( + interruptible_future<> write_same( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); - seastar::future<> writefull( + interruptible_future<> writefull( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); using append_errorator = crimson::errorator< crimson::ct_error::invarg>; - append_errorator::future<> append( + using append_ierrorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + append_errorator>; + append_ierrorator::future<> append( ObjectState& os, OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); - write_ertr::future<> truncate( + write_iertr::future<> truncate( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); - write_ertr::future<> zero( + write_iertr::future<> zero( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); - seastar::future mutate_object( + interruptible_future mutate_object( std::set pg_shards, crimson::osd::ObjectContextRef &&obc, ceph::os::Transaction&& txn, @@ -128,68 +167,80 @@ public: epoch_t min_epoch, epoch_t map_epoch, std::vector&& log_entries); - seastar::future, hobject_t>> list_objects( + interruptible_future, hobject_t>> list_objects( const hobject_t& start, uint64_t limit) const; - seastar::future<> setxattr( + interruptible_future<> setxattr( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans); using get_attr_errorator = crimson::os::FuturizedStore::get_attr_errorator; - get_attr_errorator::future<> getxattr( + using get_attr_ierrorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + get_attr_errorator>; + get_attr_ierrorator::future<> getxattr( const ObjectState& os, OSDOp& osd_op) const; - get_attr_errorator::future getxattr( + get_attr_ierrorator::future getxattr( const hobject_t& soid, std::string_view key) const; - get_attr_errorator::future<> get_xattrs( + get_attr_ierrorator::future<> get_xattrs( const ObjectState& os, OSDOp& osd_op) const; using rm_xattr_ertr = crimson::errorator; - rm_xattr_ertr::future<> rm_xattr( + using rm_xattr_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + rm_xattr_ertr>; + rm_xattr_iertr::future<> rm_xattr( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans); - seastar::future stat( + interruptible_future stat( CollectionRef c, const ghobject_t& oid) const; - seastar::future> fiemap( + interruptible_future> fiemap( CollectionRef c, const ghobject_t& oid, uint64_t off, uint64_t len); // OMAP - ll_read_errorator::future<> omap_get_keys( + ll_read_ierrorator::future<> omap_get_keys( const ObjectState& os, OSDOp& osd_op) const; - ll_read_errorator::future<> omap_get_vals( + ll_read_ierrorator::future<> omap_get_vals( const ObjectState& os, OSDOp& osd_op) const; - ll_read_errorator::future<> omap_get_vals_by_keys( + ll_read_ierrorator::future<> omap_get_vals_by_keys( const ObjectState& os, OSDOp& osd_op) const; - seastar::future<> omap_set_vals( + interruptible_future<> omap_set_vals( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); - ll_read_errorator::future omap_get_header( + ll_read_ierrorator::future omap_get_header( const crimson::os::CollectionRef& c, const ghobject_t& oid) const; - ll_read_errorator::future<> omap_get_header( + ll_read_ierrorator::future<> omap_get_header( const ObjectState& os, OSDOp& osd_op) const; - seastar::future<> omap_set_header( + interruptible_future<> omap_set_header( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans); - seastar::future<> omap_remove_range( + interruptible_future<> omap_remove_range( ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans); using omap_clear_ertr = crimson::errorator; - omap_clear_ertr::future<> omap_clear( + using omap_clear_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + omap_clear_ertr>; + omap_clear_iertr::future<> omap_clear( ObjectState& os, OSDOp& osd_op, ceph::os::Transaction& trans, @@ -217,18 +268,19 @@ public: std::optional ss; using ref = std::unique_ptr; }; - load_metadata_ertr::future load_metadata( + load_metadata_iertr::future + load_metadata( const hobject_t &oid); private: - virtual ll_read_errorator::future _read( + virtual ll_read_ierrorator::future _read( const hobject_t& hoid, size_t offset, size_t length, uint32_t flags) = 0; bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn); - virtual seastar::future + virtual interruptible_future _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, diff --git a/src/crimson/osd/pg_interval_interrupt_condition.cc b/src/crimson/osd/pg_interval_interrupt_condition.cc index 6be3193b85d68..8ee8e5d8dbf77 100644 --- a/src/crimson/osd/pg_interval_interrupt_condition.cc +++ b/src/crimson/osd/pg_interval_interrupt_condition.cc @@ -1,19 +1,28 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -#include "io_interrupt_condition.h" +#include "pg_interval_interrupt_condition.h" #include "pg.h" +#include "crimson/common/log.h" + namespace crimson::osd { IOInterruptCondition::IOInterruptCondition(Ref& pg) : pg(pg), e(pg->get_osdmap_epoch()) {} -epoch_t IOInterruptCondition::get_current_osdmap_epoch() { - return pg->get_osdmap_epoch(); +bool IOInterruptCondition::new_interval_created() { + bool ret = e < pg->get_interval_start_epoch(); + if (ret) + ::crimson::get_logger(ceph_subsys_osd).debug( + "{} new interval, should interrupt, e{}", *pg); + return ret; } bool IOInterruptCondition::is_stopping() { + if (pg->stopping) + ::crimson::get_logger(ceph_subsys_osd).debug( + "{} shutting down, should interrupt", *pg); return pg->stopping; } diff --git a/src/crimson/osd/pg_interval_interrupt_condition.h b/src/crimson/osd/pg_interval_interrupt_condition.h index 4ad5041a72984..7f62ad108ff14 100644 --- a/src/crimson/osd/pg_interval_interrupt_condition.h +++ b/src/crimson/osd/pg_interval_interrupt_condition.h @@ -16,18 +16,17 @@ class IOInterruptCondition { public: IOInterruptCondition(Ref& pg); - epoch_t get_current_osdmap_epoch(); + bool new_interval_created(); bool is_stopping(); bool is_primary(); - template - std::pair> may_interrupt() { - if (e != get_current_osdmap_epoch()) { - return std::pair>( - true, seastar::futurize::make_exception_future( - ::crimson::common::actingset_changed(is_primary()))); + template + std::pair> may_interrupt() { + if (new_interval_created()) { + return {true, seastar::futurize::make_exception_future( + ::crimson::common::actingset_changed(is_primary()))}; } if (is_stopping()) { return {true, seastar::futurize::make_exception_future( diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index 19c8962f6df4f..12783eb802374 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -36,7 +36,7 @@ void PGRecovery::start_pglogbased_recovery() pg->get_osdmap_epoch()); } -crimson::blocking_future +PGRecovery::blocking_interruptible_future PGRecovery::start_recovery_ops(size_t max_to_start) { assert(pg->is_primary()); @@ -51,13 +51,15 @@ PGRecovery::start_recovery_ops(size_t max_to_start) assert(!pg->is_backfilling()); assert(!pg->get_peering_state().is_deleting()); - std::vector> started; + std::vector> started; started.reserve(max_to_start); max_to_start -= start_primary_recovery_ops(max_to_start, &started); if (max_to_start > 0) { max_to_start -= start_replica_recovery_ops(max_to_start, &started); } - return crimson::join_blocking_futures(std::move(started)).then( + return crimson::join_blocking_interruptible_futures< + ::crimson::osd::IOInterruptCondition>(std::move(started)).then_interruptible< + ::crimson::osd::IOInterruptCondition>( [this] { bool done = !pg->get_peering_state().needs_recovery(); if (done) { @@ -94,7 +96,7 @@ PGRecovery::start_recovery_ops(size_t max_to_start) size_t PGRecovery::start_primary_recovery_ops( size_t max_to_start, - std::vector> *out) + std::vector> *out) { if (!pg->is_recovering()) { return 0; @@ -149,7 +151,8 @@ size_t PGRecovery::start_primary_recovery_ops( // TODO: handle lost/unfound if (pg->get_recovery_backend()->is_recovering(soid)) { auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid); - out->push_back(recovery_waiter.wait_for_recovered_blocking()); + out->push_back(recovery_waiter.wait_for_recovered_blocking< + ::crimson::osd::IOInterruptCondition>()); ++started; } else if (pg->get_recovery_backend()->is_recovering(head)) { ++skipped; @@ -169,7 +172,7 @@ size_t PGRecovery::start_primary_recovery_ops( size_t PGRecovery::start_replica_recovery_ops( size_t max_to_start, - std::vector> *out) + std::vector> *out) { if (!pg->is_recovering()) { return 0; @@ -216,7 +219,8 @@ size_t PGRecovery::start_replica_recovery_ops( if (pg->get_recovery_backend()->is_recovering(soid)) { logger().debug("{}: already recovering object {}", __func__, soid); auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid); - out->push_back(recovery_waiter.wait_for_recovered_blocking()); + out->push_back(recovery_waiter.wait_for_recovered_blocking< + ::crimson::osd::IOInterruptCondition>()); started++; continue; } @@ -254,7 +258,8 @@ size_t PGRecovery::start_replica_recovery_ops( return started; } -crimson::blocking_future<> PGRecovery::recover_missing( +PGRecovery::blocking_interruptible_future<> +PGRecovery::recover_missing( const hobject_t &soid, eversion_t need) { if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) { @@ -262,7 +267,8 @@ crimson::blocking_future<> PGRecovery::recover_missing( pg->get_recovery_backend()->recover_delete(soid, need)); } else { return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( - pg->get_recovery_backend()->recover_object(soid, need).handle_exception( + pg->get_recovery_backend()->recover_object(soid, need) + .handle_exception_interruptible( [=, soid = std::move(soid)] (auto e) { on_failed_recover({ pg->get_pg_whoami() }, soid, need); return seastar::make_ready_future<>(); @@ -274,11 +280,12 @@ crimson::blocking_future<> PGRecovery::recover_missing( size_t PGRecovery::prep_object_replica_deletes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress) + std::vector> *in_progress) { in_progress->push_back( pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( - pg->get_recovery_backend()->push_delete(soid, need).then([=] { + pg->get_recovery_backend()->push_delete(soid, need).then_interruptible( + [=] { object_stat_sum_t stat_diff; stat_diff.num_objects_recovered = 1; on_global_recover(soid, stat_diff, true); @@ -292,11 +299,12 @@ size_t PGRecovery::prep_object_replica_deletes( size_t PGRecovery::prep_object_replica_pushes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress) + std::vector> *in_progress) { in_progress->push_back( pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( - pg->get_recovery_backend()->recover_object(soid, need).handle_exception( + pg->get_recovery_backend()->recover_object(soid, need) + .handle_exception_interruptible( [=, soid = std::move(soid)] (auto e) { on_failed_recover({ pg->get_pg_whoami() }, soid, need); return seastar::make_ready_future<>(); @@ -418,7 +426,7 @@ void PGRecovery::request_primary_scan( begin, local_conf()->osd_backfill_scan_min, local_conf()->osd_backfill_scan_max - ).then([this] (BackfillInterval bi) { + ).then_interruptible([this] (BackfillInterval bi) { logger().debug("request_primary_scan:{}", __func__); using BackfillState = crimson::osd::BackfillState; start_backfill_recovery(BackfillState::PrimaryScanned{ std::move(bi) }); @@ -433,10 +441,10 @@ void PGRecovery::enqueue_push( __func__, obj, v); pg->get_recovery_backend()->add_recovering(obj); std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\ - handle_exception([] (auto) { + handle_exception_interruptible([] (auto) { ceph_abort_msg("got exception on backfill's push"); return seastar::make_ready_future<>(); - }).then([this, obj] { + }).then_interruptible([this, obj] { logger().debug("enqueue_push:{}", __func__); using BackfillState = crimson::osd::BackfillState; start_backfill_recovery(BackfillState::ObjectPushed(std::move(obj))); diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index 19303e58d923f..931b6d7a9c6d3 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -6,6 +6,7 @@ #include #include "crimson/osd/backfill_state.h" +#include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/pg_recovery_listener.h" #include "crimson/osd/scheduler/scheduler.h" @@ -18,11 +19,15 @@ class PGBackend; class PGRecovery : public crimson::osd::BackfillState::BackfillListener { public: + template + using blocking_interruptible_future = + ::crimson::blocking_interruptible_future< + ::crimson::osd::IOInterruptCondition, T>; PGRecovery(PGRecoveryListener* pg) : pg(pg) {} virtual ~PGRecovery() {} void start_pglogbased_recovery(); - crimson::blocking_future start_recovery_ops(size_t max_to_start); + blocking_interruptible_future start_recovery_ops(size_t max_to_start); void on_backfill_reserved(); void dispatch_backfill_event( boost::intrusive_ptr evt); @@ -32,24 +37,24 @@ private: PGRecoveryListener* pg; size_t start_primary_recovery_ops( size_t max_to_start, - std::vector> *out); + std::vector> *out); size_t start_replica_recovery_ops( size_t max_to_start, - std::vector> *out); + std::vector> *out); std::vector get_replica_recovery_order() const { return pg->get_replica_recovery_order(); } - crimson::blocking_future<> recover_missing( + blocking_interruptible_future<> recover_missing( const hobject_t &soid, eversion_t need); size_t prep_object_replica_deletes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress); + std::vector> *in_progress); size_t prep_object_replica_pushes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress); + std::vector> *in_progress); void on_local_recover( const hobject_t& soid, diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index aeec0d14be938..b19c9fa46ae4e 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -89,7 +89,8 @@ void RecoveryBackend::handle_backfill_finish( RecoveryDone{}); } -seastar::future<> RecoveryBackend::handle_backfill_progress( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_backfill_progress( MOSDPGBackfill& m) { logger().debug("{}", __func__); @@ -107,7 +108,8 @@ seastar::future<> RecoveryBackend::handle_backfill_progress( ).or_terminate(); } -seastar::future<> RecoveryBackend::handle_backfill_finish_ack( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_backfill_finish_ack( MOSDPGBackfill& m) { logger().debug("{}", __func__); @@ -118,7 +120,8 @@ seastar::future<> RecoveryBackend::handle_backfill_finish_ack( return seastar::now(); } -seastar::future<> RecoveryBackend::handle_backfill( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_backfill( MOSDPGBackfill& m) { logger().debug("{}", __func__); @@ -136,7 +139,8 @@ seastar::future<> RecoveryBackend::handle_backfill( } } -seastar::future<> RecoveryBackend::handle_backfill_remove( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_backfill_remove( MOSDPGBackfillRemove& m) { logger().debug("{} m.ls={}", __func__, m.ls); @@ -153,18 +157,20 @@ seastar::future<> RecoveryBackend::handle_backfill_remove( ).or_terminate(); } -seastar::future RecoveryBackend::scan_for_backfill( +RecoveryBackend::interruptible_future +RecoveryBackend::scan_for_backfill( const hobject_t& start, [[maybe_unused]] const std::int64_t min, const std::int64_t max) { logger().debug("{} starting from {}", __func__, start); auto version_map = seastar::make_lw_shared>(); - return backend->list_objects(start, max).then( + return backend->list_objects(start, max).then_interruptible( [this, start, version_map] (auto&& ret) { auto&& [objects, next] = std::move(ret); - return seastar::parallel_for_each(std::move(objects), - [this, version_map] (const hobject_t& object) { + return interruptor::parallel_for_each(std::move(objects), + [this, version_map] (const hobject_t& object) + -> interruptible_future<> { crimson::osd::ObjectContextRef obc; if (pg.is_primary()) { obc = shard_services.obc_registry.maybe_get_cached_obc(object); @@ -181,7 +187,7 @@ seastar::future RecoveryBackend::scan_for_backfill( } return seastar::now(); } else { - return backend->load_metadata(object).safe_then( + return backend->load_metadata(object).safe_then_interruptible( [version_map, object] (auto md) { if (md->os.exists) { logger().debug("scan_for_backfill found: {} {}", @@ -191,7 +197,7 @@ seastar::future RecoveryBackend::scan_for_backfill( return seastar::now(); }, PGBackend::load_metadata_ertr::assert_all{}); } - }).then([version_map, start=std::move(start), next=std::move(next), this] { + }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] { BackfillInterval bi; bi.begin = std::move(start); bi.end = std::move(next); @@ -204,7 +210,8 @@ seastar::future RecoveryBackend::scan_for_backfill( }); } -seastar::future<> RecoveryBackend::handle_scan_get_digest( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_scan_get_digest( MOSDPGScan& m) { logger().debug("{}", __func__); @@ -224,7 +231,7 @@ seastar::future<> RecoveryBackend::handle_scan_get_digest( std::move(m.begin), crimson::common::local_conf().get_val("osd_backfill_scan_min"), crimson::common::local_conf().get_val("osd_backfill_scan_max") - ).then([this, + ).then_interruptible([this, query_epoch=m.query_epoch, conn=m.get_connection()] (auto backfill_interval) { auto reply = make_message( @@ -240,7 +247,8 @@ seastar::future<> RecoveryBackend::handle_scan_get_digest( }); } -seastar::future<> RecoveryBackend::handle_scan_digest( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_scan_digest( MOSDPGScan& m) { logger().debug("{}", __func__); @@ -264,7 +272,8 @@ seastar::future<> RecoveryBackend::handle_scan_digest( return seastar::now(); } -seastar::future<> RecoveryBackend::handle_scan( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_scan( MOSDPGScan& m) { logger().debug("{}", __func__); @@ -280,7 +289,8 @@ seastar::future<> RecoveryBackend::handle_scan( } } -seastar::future<> RecoveryBackend::handle_recovery_op( +RecoveryBackend::interruptible_future<> +RecoveryBackend::handle_recovery_op( Ref m) { switch (m->get_header().type) { diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index 2216fdeed6840..eb621487a18a6 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -8,6 +8,7 @@ #include "crimson/common/type_helpers.h" #include "crimson/os/futurized_store.h" #include "crimson/os/futurized_collection.h" +#include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/object_context.h" #include "crimson/osd/shard_services.h" @@ -24,25 +25,16 @@ namespace crimson::osd{ class PGBackend; class RecoveryBackend { - void handle_backfill_finish( - MOSDPGBackfill& m); - seastar::future<> handle_backfill_progress( - MOSDPGBackfill& m); - seastar::future<> handle_backfill_finish_ack( - MOSDPGBackfill& m); - seastar::future<> handle_backfill(MOSDPGBackfill& m); - - seastar::future<> handle_backfill_remove(MOSDPGBackfillRemove& m); - - seastar::future<> handle_scan_get_digest( - MOSDPGScan& m); - seastar::future<> handle_scan_digest( - MOSDPGScan& m); - seastar::future<> handle_scan( - MOSDPGScan& m); protected: class WaitForObjectRecovery; public: + template + using interruptible_future = + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition, T>; + using interruptor = + ::crimson::interruptible::interruptor< + ::crimson::osd::IOInterruptCondition>; RecoveryBackend(crimson::osd::PG& pg, crimson::osd::ShardServices& shard_services, crimson::os::CollectionRef coll, @@ -72,20 +64,20 @@ public: return recovering.size(); } - virtual seastar::future<> handle_recovery_op( + virtual interruptible_future<> handle_recovery_op( Ref m); - virtual seastar::future<> recover_object( + virtual interruptible_future<> recover_object( const hobject_t& soid, eversion_t need) = 0; - virtual seastar::future<> recover_delete( + virtual interruptible_future<> recover_delete( const hobject_t& soid, eversion_t need) = 0; - virtual seastar::future<> push_delete( + virtual interruptible_future<> push_delete( const hobject_t& soid, eversion_t need) = 0; - seastar::future scan_for_backfill( + interruptible_future scan_for_backfill( const hobject_t& from, std::int64_t min, std::int64_t max); @@ -146,9 +138,10 @@ protected: seastar::future<> wait_for_recovered() { return recovered.get_shared_future(); } - crimson::blocking_future<> + template + crimson::blocking_interruptible_future wait_for_recovered_blocking() { - return make_blocking_future( + return make_blocking_interruptible_future( recovered.get_shared_future()); } seastar::future<> wait_for_pull() { @@ -200,4 +193,20 @@ protected: } void clean_up(ceph::os::Transaction& t, std::string_view why); virtual seastar::future<> on_stop() = 0; +private: + void handle_backfill_finish( + MOSDPGBackfill& m); + interruptible_future<> handle_backfill_progress( + MOSDPGBackfill& m); + interruptible_future<> handle_backfill_finish_ack( + MOSDPGBackfill& m); + interruptible_future<> handle_backfill(MOSDPGBackfill& m); + + interruptible_future<> handle_scan_get_digest( + MOSDPGScan& m); + interruptible_future<> handle_scan_digest( + MOSDPGScan& m); + interruptible_future<> handle_scan( + MOSDPGScan& m); + interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m); }; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 63ee96bfae21f..cc1870769c11d 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -27,7 +27,7 @@ ReplicatedBackend::ReplicatedBackend(pg_t pgid, shard_services{shard_services} {} -ReplicatedBackend::ll_read_errorator::future +ReplicatedBackend::ll_read_ierrorator::future ReplicatedBackend::_read(const hobject_t& hoid, const uint64_t off, const uint64_t len, @@ -39,7 +39,7 @@ ReplicatedBackend::_read(const hobject_t& hoid, return store->read(coll, ghobject_t{hoid}, off, len, flags); } -seastar::future +ReplicatedBackend::interruptible_future ReplicatedBackend::_submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, @@ -61,7 +61,7 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, bufferlist encoded_txn; encode(txn, encoded_txn); - return seastar::parallel_for_each(std::move(pg_shards), + return interruptor::parallel_for_each(std::move(pg_shards), [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)] (auto pg_shard) mutable { if (pg_shard == whoami) { @@ -81,7 +81,7 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, // TODO: set more stuff. e.g., pg_states return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch); } - }).then([this, peers=pending_txn->second.weak_from_this()] { + }).then_interruptible([this, peers=pending_txn->second.weak_from_this()] { if (!peers) { // for now, only actingset_changed can cause peers // to be nullptr @@ -94,7 +94,7 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, return seastar::now(); } return peers->all_committed.get_shared_future(); - }).then([pending_txn, this] { + }).then_interruptible([pending_txn, this] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); return seastar::make_ready_future(std::move(acked_peers)); diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index 6b1b57e528505..1e2ae752a594a 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -26,11 +26,10 @@ public: seastar::future<> stop() final; void on_actingset_changed(peering_info_t pi) final; private: - ll_read_errorator::future _read(const hobject_t& hoid, - uint64_t off, - uint64_t len, - uint32_t flags) override; - seastar::future + ll_read_ierrorator::future + _read(const hobject_t& hoid, uint64_t off, + uint64_t len, uint32_t flags) override; + interruptible_future _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 3b9810736ec3a..e10c142296f37 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -18,7 +18,8 @@ namespace { } } -seastar::future<> ReplicatedRecoveryBackend::recover_object( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::recover_object( const hobject_t& soid, eversion_t need) { @@ -26,7 +27,7 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object( // always add_recovering(soid) before recover_object(soid) assert(is_recovering(soid)); // start tracking the recovery of soid - return maybe_pull_missing_obj(soid, need).then([this, soid, need] { + return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] { logger().debug("recover_object: loading obc: {}", soid); return pg.with_head_obc(soid, [this, soid, need](auto obc) { @@ -35,7 +36,7 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object( recovery_waiter.obc = obc; recovery_waiter.obc->wait_recovery_read(); return maybe_push_shards(soid, need); - }).handle_error( + }).handle_error_interruptible( crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) { // TODO: may need eio handling? logger().error("recover_object saw error code {}, ignoring object {}", @@ -44,14 +45,14 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object( }); } -seastar::future<> +RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::maybe_push_shards( const hobject_t& soid, eversion_t need) { - return seastar::parallel_for_each(get_shards_to_push(soid), + return interruptor::parallel_for_each(get_shards_to_push(soid), [this, need, soid](auto shard) { - return prep_push(soid, need, shard).then([this, soid, shard](auto push) { + return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) { auto msg = make_message(); msg->from = pg.get_pg_whoami(); msg->pgid = pg.get_pgid(); @@ -59,14 +60,16 @@ ReplicatedRecoveryBackend::maybe_push_shards( msg->min_epoch = pg.get_last_peering_reset(); msg->pushes.push_back(std::move(push)); msg->set_priority(pg.get_recovery_op_priority()); - return shard_services.send_to_osd(shard.osd, - std::move(msg), - pg.get_osdmap_epoch()).then( + return interruptor::make_interruptible( + shard_services.send_to_osd(shard.osd, + std::move(msg), + pg.get_osdmap_epoch())) + .then_interruptible( [this, soid, shard] { return recovering.at(soid).wait_for_pushes(shard); }); }); - }).then([this, soid] { + }).then_interruptible([this, soid] { auto &recovery = recovering.at(soid); auto push_info = recovery.pushing.begin(); object_stat_sum_t stat = {}; @@ -79,7 +82,7 @@ ReplicatedRecoveryBackend::maybe_push_shards( } pg.get_recovery_handler()->on_global_recover(soid, stat, false); return seastar::make_ready_future<>(); - }).handle_exception([this, soid](auto e) { + }).handle_exception_interruptible([this, soid](auto e) { auto &recovery = recovering.at(soid); if (recovery.obc) { recovery.obc->drop_recovery_read(); @@ -89,7 +92,7 @@ ReplicatedRecoveryBackend::maybe_push_shards( }); } -seastar::future<> +RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::maybe_pull_missing_obj( const hobject_t& soid, eversion_t need) @@ -110,26 +113,28 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj( msg->map_epoch = pg.get_osdmap_epoch(); msg->min_epoch = pg.get_last_peering_reset(); msg->set_pulls({std::move(po)}); - return shard_services.send_to_osd( - pi.from.osd, - std::move(msg), - pg.get_osdmap_epoch() - ).then([&recovery_waiter] { + return interruptor::make_interruptible( + shard_services.send_to_osd( + pi.from.osd, + std::move(msg), + pg.get_osdmap_epoch() + )).then_interruptible([&recovery_waiter] { return recovery_waiter.wait_for_pull(); }); } -seastar::future<> ReplicatedRecoveryBackend::push_delete( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::push_delete( const hobject_t& soid, eversion_t need) { logger().debug("{}: {}, {}", __func__, soid, need); - recovering[soid]; epoch_t min_epoch = pg.get_last_peering_reset(); assert(pg.get_acting_recovery_backfill().size() > 0); - return seastar::parallel_for_each(pg.get_acting_recovery_backfill(), - [this, soid, need, min_epoch](pg_shard_t shard) { + return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(), + [this, soid, need, min_epoch](pg_shard_t shard) + -> interruptible_future<> { if (shard == pg.get_pg_whoami()) return seastar::make_ready_future<>(); auto iter = pg.get_shard_missing().find(shard); @@ -143,8 +148,9 @@ seastar::future<> ReplicatedRecoveryBackend::push_delete( pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch); msg->set_priority(pg.get_recovery_op_priority()); msg->objects.push_back(std::make_pair(soid, need)); - return shard_services.send_to_osd(shard.osd, std::move(msg), - pg.get_osdmap_epoch()).then( + return interruptor::make_interruptible( + shard_services.send_to_osd(shard.osd, std::move(msg), + pg.get_osdmap_epoch())).then_interruptible( [this, soid, shard] { return recovering.at(soid).wait_for_pushes(shard); }); @@ -153,13 +159,15 @@ seastar::future<> ReplicatedRecoveryBackend::push_delete( }); } -seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::handle_recovery_delete( Ref m) { logger().debug("{}: {}", __func__, *m); auto& p = m->objects.front(); //TODO: only one delete per message for now. - return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then( + return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()) + .then_interruptible( [this, m] { auto reply = make_message(); reply->from = pg.get_pg_whoami(); @@ -172,7 +180,8 @@ seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete( }); } -seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::on_local_recover_persist( const hobject_t& soid, const ObjectRecoveryInfo& _recovery_info, bool is_delete, @@ -181,32 +190,36 @@ seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist( logger().debug("{}", __func__); ceph::os::Transaction t; pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t); - return shard_services.get_store().do_transaction(coll, std::move(t)).then( + return interruptor::make_interruptible( + shard_services.get_store().do_transaction(coll, std::move(t))) + .then_interruptible( [this, epoch_frozen, last_complete = pg.get_info().last_complete] { pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); return seastar::make_ready_future<>(); }); } -seastar::future<> ReplicatedRecoveryBackend::local_recover_delete( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::local_recover_delete( const hobject_t& soid, eversion_t need, epoch_t epoch_to_freeze) { logger().debug("{}: {}, {}", __func__, soid, need); - return backend->load_metadata(soid).safe_then([this] - (auto lomt) { + return backend->load_metadata(soid).safe_then_interruptible([this] + (auto lomt) -> interruptible_future<> { if (lomt->os.exists) { return seastar::do_with(ceph::os::Transaction(), [this, lomt = std::move(lomt)](auto& txn) { - return backend->remove(lomt->os, txn).then([this, &txn]() mutable { + return backend->remove(lomt->os, txn).then_interruptible( + [this, &txn]() mutable { return shard_services.get_store().do_transaction(coll, std::move(txn)); }); }); } return seastar::make_ready_future<>(); - }).safe_then([this, soid, epoch_to_freeze, need] { + }).safe_then_interruptible([this, soid, epoch_to_freeze, need] { ObjectRecoveryInfo recovery_info; recovery_info.soid = soid; recovery_info.version = need; @@ -223,7 +236,8 @@ seastar::future<> ReplicatedRecoveryBackend::local_recover_delete( ); } -seastar::future<> ReplicatedRecoveryBackend::recover_delete( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::recover_delete( const hobject_t &soid, eversion_t need) { logger().debug("{}: {}, {}", __func__, soid, need); @@ -231,8 +245,9 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete( epoch_t cur_epoch = pg.get_osdmap_epoch(); return seastar::do_with(object_stat_sum_t(), [this, soid, need, cur_epoch](auto& stat_diff) { - return local_recover_delete(soid, need, cur_epoch).then( - [this, &stat_diff, cur_epoch, soid, need] { + return local_recover_delete(soid, need, cur_epoch).then_interruptible( + [this, &stat_diff, cur_epoch, soid, need]() + -> interruptible_future<> { if (!pg.has_reset_since(cur_epoch)) { bool object_missing = false; for (const auto& shard : pg.get_acting_recovery_backfill()) { @@ -254,14 +269,14 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete( } } return seastar::make_ready_future<>(); - }).then([this, soid, &stat_diff] { + }).then_interruptible([this, soid, &stat_diff] { pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true); return seastar::make_ready_future<>(); }); }); } -seastar::future +RecoveryBackend::interruptible_future ReplicatedRecoveryBackend::prep_push( const hobject_t& soid, eversion_t need, @@ -299,7 +314,7 @@ ReplicatedRecoveryBackend::prep_push( pi.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty(); - return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then( + return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then_interruptible( [this, soid, pg_shard](auto pop) { auto& recovery_waiter = recovering.at(soid); auto& pi = recovery_waiter.pushing[pg_shard]; @@ -339,7 +354,8 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi, pi.recovery_progress = po.recovery_progress; } -seastar::future ReplicatedRecoveryBackend::build_push_op( +RecoveryBackend::interruptible_future +ReplicatedRecoveryBackend::build_push_op( const ObjectRecoveryInfo& recovery_info, const ObjectRecoveryProgress& progress, object_stat_sum_t* stat) @@ -355,7 +371,7 @@ seastar::future ReplicatedRecoveryBackend::build_push_op( (auto& new_progress, auto& available, auto& v, auto& pop) { return read_metadata_for_push_op(recovery_info.soid, progress, new_progress, - v, &pop).then([&](eversion_t local_ver) mutable { + v, &pop).then_interruptible([&](eversion_t local_ver) mutable { // If requestor didn't know the version, use ours if (v == eversion_t()) { v = local_ver; @@ -368,14 +384,14 @@ seastar::future ReplicatedRecoveryBackend::build_push_op( progress, new_progress, &available, &pop); - }).then([this, &recovery_info, &progress, &available, &pop]() mutable { + }).then_interruptible([this, &recovery_info, &progress, &available, &pop]() mutable { logger().debug("build_push_op: available: {}, copy_subset: {}", available, recovery_info.copy_subset); return read_object_for_push_op(recovery_info.soid, recovery_info.copy_subset, progress.data_recovered_to, available, &pop); - }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop] + }).then_interruptible([&recovery_info, &v, &progress, &new_progress, stat, &pop] (uint64_t recovered_to) mutable { new_progress.data_recovered_to = recovered_to; if (new_progress.is_complete(recovery_info)) { @@ -403,7 +419,7 @@ seastar::future ReplicatedRecoveryBackend::build_push_op( }); } -seastar::future +RecoveryBackend::interruptible_future ReplicatedRecoveryBackend::read_metadata_for_push_op( const hobject_t& oid, const ObjectRecoveryProgress& progress, @@ -411,21 +427,25 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op( eversion_t ver, PushOp* push_op) { + logger().debug("{}, {}", __func__, oid); if (!progress.first) { return seastar::make_ready_future(ver); } - return seastar::when_all_succeed( - backend->omap_get_header(coll, ghobject_t(oid)).handle_error( - crimson::os::FuturizedStore::read_errorator::all_same_way( - [] (const std::error_code& e) { - return seastar::make_ready_future(); - })), - store->get_attrs(coll, ghobject_t(oid)).handle_error( - crimson::os::FuturizedStore::get_attrs_ertr::all_same_way( - [] (const std::error_code& e) { - return seastar::make_ready_future(); - })) - ).then_unpack([&new_progress, push_op](auto bl, auto attrs) { + return interruptor::make_interruptible(interruptor::when_all_succeed( + backend->omap_get_header(coll, ghobject_t(oid)).handle_error_interruptible( + crimson::os::FuturizedStore::read_errorator::all_same_way( + [oid] (const std::error_code& e) { + logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid); + return seastar::make_ready_future(); + })), + interruptor::make_interruptible(store->get_attrs(coll, ghobject_t(oid))) + .handle_error_interruptible( + crimson::os::FuturizedStore::get_attrs_ertr::all_same_way( + [oid] (const std::error_code& e) { + logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid); + return seastar::make_ready_future(); + })) + )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) { if (bl.length() == 0) { logger().warn("read_metadata_for_push_op: fail to read omap header"); } else if (attrs.empty()) { @@ -444,7 +464,7 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op( }); } -seastar::future +RecoveryBackend::interruptible_future ReplicatedRecoveryBackend::read_object_for_push_op( const hobject_t& oid, const interval_set& copy_subset, @@ -458,7 +478,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op( } // 1. get the extents in the interested range return backend->fiemap(coll, ghobject_t{oid}, - 0, copy_subset.range_end()).then_wrapped( + 0, copy_subset.range_end()).then_wrapped_interruptible( [=](auto&& fiemap_included) mutable { interval_set extents; try { @@ -475,7 +495,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op( // 3. read the truncated extents // TODO: check if the returned extents are pruned return store->readv(coll, ghobject_t{oid}, push_op->data_included, 0); - }).safe_then([push_op, range_end=copy_subset.range_end()](auto &&bl) { + }).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) { push_op->data.claim_append(std::move(bl)); uint64_t recovered_to = 0; if (push_op->data_included.empty()) { @@ -492,7 +512,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op( })); } -seastar::future<> +RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::read_omap_for_push_op( const hobject_t& oid, const ObjectRecoveryProgress& progress, @@ -561,15 +581,16 @@ ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const return shards; } -seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref m) +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::handle_pull(Ref m) { logger().debug("{}: {}", __func__, *m); return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) { - return seastar::parallel_for_each(pulls, + return interruptor::parallel_for_each(pulls, [this, from](auto& pull_op) { const hobject_t& soid = pull_op.soid; logger().debug("handle_pull: {}", soid); - return backend->stat(coll, ghobject_t(soid)).then( + return backend->stat(coll, ghobject_t(soid)).then_interruptible( [this, &pull_op](auto st) { ObjectRecoveryInfo &recovery_info = pull_op.recovery_info; ObjectRecoveryProgress &progress = pull_op.recovery_progress; @@ -586,7 +607,7 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref m) assert(recovery_info.clone_subset.empty()); } return build_push_op(recovery_info, progress, 0); - }).then([this, from](auto pop) { + }).then_interruptible([this, from](auto pop) { auto msg = make_message(); msg->from = pg.get_pg_whoami(); msg->pgid = pg.get_pgid(); @@ -601,7 +622,8 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref m) }); } -seastar::future ReplicatedRecoveryBackend::_handle_pull_response( +RecoveryBackend::interruptible_future +ReplicatedRecoveryBackend::_handle_pull_response( pg_shard_t from, PushOp& pop, PullOp* response, @@ -623,7 +645,8 @@ seastar::future ReplicatedRecoveryBackend::_handle_pull_response( if (pi.recovery_info.version == eversion_t()) pi.recovery_info.version = pop.version; - auto prepare_waiter = seastar::make_ready_future<>(); + auto prepare_waiter = interruptor::make_interruptible( + seastar::make_ready_future<>()); if (pi.recovery_progress.first) { prepare_waiter = pg.with_head_obc( pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) { @@ -632,9 +655,9 @@ seastar::future ReplicatedRecoveryBackend::_handle_pull_response( obc->obs.oi.decode(pop.attrset.at(OI_ATTR)); pi.recovery_info.oi = obc->obs.oi; return crimson::osd::PG::load_obc_ertr::now(); - }).handle_error(crimson::ct_error::assert_all{}); + }).handle_error_interruptible(crimson::ct_error::assert_all{}); }; - return prepare_waiter.then([this, &pi, &pop, t, response]() mutable { + return prepare_waiter.then_interruptible([this, &pi, &pop, t, response]() mutable { const bool first = pi.recovery_progress.first; pi.recovery_progress = pop.after_progress; logger().debug("new recovery_info {}, new progress {}", @@ -656,7 +679,8 @@ seastar::future ReplicatedRecoveryBackend::_handle_pull_response( return submit_push_data(pi.recovery_info, first, complete, clear_omap, std::move(data_zeros), std::move(usable_intervals), std::move(data), std::move(pop.omap_header), - pop.attrset, std::move(pop.omap_entries), t).then( + pop.attrset, std::move(pop.omap_entries), t) + .then_interruptible( [this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] { pi.stat.num_keys_recovered += pop.omap_entries.size(); pi.stat.num_bytes_recovered += bytes_recovered; @@ -677,7 +701,8 @@ seastar::future ReplicatedRecoveryBackend::_handle_pull_response( }); } -seastar::future<> ReplicatedRecoveryBackend::handle_pull_response( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::handle_pull_response( Ref m) { const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now. @@ -697,7 +722,7 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull_response( [this, &response](auto& t, auto& m) { pg_shard_t from = m->from; PushOp& pop = m->pushes[0]; // only one push per message for now - return _handle_pull_response(from, pop, &response, &t).then( + return _handle_pull_response(from, pop, &response, &t).then_interruptible( [this, &t](bool complete) { epoch_t epoch_frozen = pg.get_osdmap_epoch(); return shard_services.get_store().do_transaction(coll, std::move(t)) @@ -707,7 +732,7 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull_response( return seastar::make_ready_future(complete); }); }); - }).then([this, m, &response](bool complete) { + }).then_interruptible([this, m, &response](bool complete) { if (complete) { auto& pop = m->pushes[0]; recovering.at(pop.soid).set_pulled(); @@ -726,7 +751,8 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull_response( }); } -seastar::future<> ReplicatedRecoveryBackend::_handle_push( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::_handle_push( pg_shard_t from, PushOp &pop, PushReplyOp *response, @@ -752,7 +778,8 @@ seastar::future<> ReplicatedRecoveryBackend::_handle_push( return submit_push_data(pop.recovery_info, first, complete, clear_omap, std::move(data_zeros), std::move(pop.data_included), std::move(pop.data), std::move(pop.omap_header), - pop.attrset, std::move(pop.omap_entries), t).then( + pop.attrset, std::move(pop.omap_entries), t) + .then_interruptible( [this, complete, &pop, t] { if (complete) { pg.get_recovery_handler()->on_local_recover( @@ -762,7 +789,8 @@ seastar::future<> ReplicatedRecoveryBackend::_handle_push( }); } -seastar::future<> ReplicatedRecoveryBackend::handle_push( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::handle_push( Ref m) { if (pg.is_primary()) { @@ -774,16 +802,17 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push( PushOp& pop = m->pushes[0]; // TODO: only one push per message for now return seastar::do_with(ceph::os::Transaction(), [this, m, &pop, &response](auto& t) { - return _handle_push(m->from, pop, &response, &t).then( + return _handle_push(m->from, pop, &response, &t).then_interruptible( [this, &t] { epoch_t epoch_frozen = pg.get_osdmap_epoch(); - return shard_services.get_store().do_transaction(coll, std::move(t)).then( + return interruptor::make_interruptible( + shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible( [this, epoch_frozen, last_complete = pg.get_info().last_complete] { //TODO: this should be grouped with pg.on_local_recover somehow. pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); }); }); - }).then([this, m, &response]() mutable { + }).then_interruptible([this, m, &response]() mutable { auto reply = make_message(); reply->from = pg.get_pg_whoami(); reply->set_priority(m->get_priority()); @@ -798,7 +827,7 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push( }); } -seastar::future> +RecoveryBackend::interruptible_future> ReplicatedRecoveryBackend::_handle_push_reply( pg_shard_t peer, const PushReplyOp &op) @@ -815,10 +844,10 @@ ReplicatedRecoveryBackend::_handle_push_reply( bool error = pi.recovery_progress.error; if (!pi.recovery_progress.data_complete && !error) { return build_push_op(pi.recovery_info, pi.recovery_progress, - &pi.stat).then([&pi] (auto pop) { + &pi.stat).then_interruptible([&pi] (auto pop) { pi.recovery_progress = pop.after_progress; return seastar::make_ready_future>(std::move(pop)); - }).handle_exception([recovering_iter, &pi, peer] (auto e) { + }).handle_exception_interruptible([recovering_iter, &pi, peer] (auto e) { pi.recovery_progress.error = true; recovering_iter->second.set_push_failed(peer, e); return seastar::make_ready_future>(); @@ -832,14 +861,15 @@ ReplicatedRecoveryBackend::_handle_push_reply( } } -seastar::future<> ReplicatedRecoveryBackend::handle_push_reply( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::handle_push_reply( Ref m) { logger().debug("{}: {}", __func__, *m); auto from = m->from; auto& push_reply = m->replies[0]; //TODO: only one reply per message - return _handle_push_reply(from, push_reply).then( + return _handle_push_reply(from, push_reply).then_interruptible( [this, from](std::optional push_op) { if (push_op) { auto msg = make_message(); @@ -890,7 +920,7 @@ ReplicatedRecoveryBackend::trim_pushed_data( return {intervals_usable, data_usable}; } -seastar::future +RecoveryBackend::interruptible_future ReplicatedRecoveryBackend::prep_push_target( const ObjectRecoveryInfo& recovery_info, bool first, @@ -943,7 +973,8 @@ ReplicatedRecoveryBackend::prep_push_target( return seastar::make_ready_future(target_oid.hobj); } // clone overlap content in local object if using a new object - return store->stat(coll, ghobject_t(recovery_info.soid)).then( + return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid))) + .then_interruptible( [this, &recovery_info, t, target_oid] (auto st) { // TODO: pg num bytes counting uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size); @@ -962,8 +993,8 @@ ReplicatedRecoveryBackend::prep_push_target( return seastar::make_ready_future(target_oid.hobj); }); } - -seastar::future<> ReplicatedRecoveryBackend::submit_push_data( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::submit_push_data( const ObjectRecoveryInfo &recovery_info, bool first, bool complete, @@ -979,7 +1010,7 @@ seastar::future<> ReplicatedRecoveryBackend::submit_push_data( logger().debug("{}", __func__); return prep_push_target(recovery_info, first, complete, clear_omap, t, attrs, - std::move(omap_header)).then( + std::move(omap_header)).then_interruptible( [this, &recovery_info, t, first, complete, @@ -988,6 +1019,7 @@ seastar::future<> ReplicatedRecoveryBackend::submit_push_data( data_included=std::move(data_included), omap_entries=std::move(omap_entries), &attrs](auto target_oid) mutable { + uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; // Punch zeros for data, if fiemap indicates nothing but it is marked dirty if (!data_zeros.empty()) { @@ -1047,7 +1079,8 @@ void ReplicatedRecoveryBackend::submit_push_complete( } } -seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply( +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::handle_recovery_delete_reply( Ref m) { auto& p = m->objects.front(); @@ -1059,7 +1092,8 @@ seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply( return seastar::now(); } -seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref m) +RecoveryBackend::interruptible_future<> +ReplicatedRecoveryBackend::handle_recovery_op(Ref m) { switch (m->get_header().type) { case MSG_OSD_PG_PULL: diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index 2f1449b607254..55cd0f17861bd 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -3,6 +3,8 @@ #pragma once +#include "crimson/common/interruptible_future.h" +#include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/recovery_backend.h" #include "messages/MOSDPGPull.h" @@ -18,33 +20,34 @@ public: crimson::osd::ShardServices& shard_services, crimson::os::CollectionRef coll, PGBackend* backend) - : RecoveryBackend(pg, shard_services, coll, backend) {} - seastar::future<> handle_recovery_op( + : RecoveryBackend(pg, shard_services, coll, backend) + {} + interruptible_future<> handle_recovery_op( Ref m) final; - seastar::future<> recover_object( + interruptible_future<> recover_object( const hobject_t& soid, eversion_t need) final; - seastar::future<> recover_delete( + interruptible_future<> recover_delete( const hobject_t& soid, eversion_t need) final; - seastar::future<> push_delete( + interruptible_future<> push_delete( const hobject_t& soid, eversion_t need) final; protected: - seastar::future<> handle_pull( + interruptible_future<> handle_pull( Ref m); - seastar::future<> handle_pull_response( + interruptible_future<> handle_pull_response( Ref m); - seastar::future<> handle_push( + interruptible_future<> handle_push( Ref m); - seastar::future<> handle_push_reply( + interruptible_future<> handle_push_reply( Ref m); - seastar::future<> handle_recovery_delete( + interruptible_future<> handle_recovery_delete( Ref m); - seastar::future<> handle_recovery_delete_reply( + interruptible_future<> handle_recovery_delete_reply( Ref m); - seastar::future prep_push( + interruptible_future prep_push( const hobject_t& soid, eversion_t need, pg_shard_t pg_shard); @@ -55,13 +58,13 @@ protected: eversion_t need); std::vector get_shards_to_push( const hobject_t& soid) const; - seastar::future build_push_op( + interruptible_future build_push_op( const ObjectRecoveryInfo& recovery_info, const ObjectRecoveryProgress& progress, object_stat_sum_t* stat); /// @returns true if this push op is the last push op for /// recovery @c pop.soid - seastar::future _handle_pull_response( + interruptible_future _handle_pull_response( pg_shard_t from, PushOp& pop, PullOp* response, @@ -70,7 +73,7 @@ protected: const interval_set ©_subset, const interval_set &intervals_received, ceph::bufferlist data_received); - seastar::future<> submit_push_data( + interruptible_future<> submit_push_data( const ObjectRecoveryInfo &recovery_info, bool first, bool complete, @@ -85,20 +88,20 @@ protected: void submit_push_complete( const ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t); - seastar::future<> _handle_push( + interruptible_future<> _handle_push( pg_shard_t from, PushOp& pop, PushReplyOp *response, ceph::os::Transaction *t); - seastar::future> _handle_push_reply( + interruptible_future> _handle_push_reply( pg_shard_t peer, const PushReplyOp &op); - seastar::future<> on_local_recover_persist( + interruptible_future<> on_local_recover_persist( const hobject_t& soid, const ObjectRecoveryInfo& _recovery_info, bool is_delete, epoch_t epoch_to_freeze); - seastar::future<> local_recover_delete( + interruptible_future<> local_recover_delete( const hobject_t& soid, eversion_t need, epoch_t epoch_frozen); @@ -107,15 +110,19 @@ protected: } private: /// pull missing object from peer - seastar::future<> maybe_pull_missing_obj( + interruptible_future<> maybe_pull_missing_obj( const hobject_t& soid, eversion_t need); /// load object context for recovery if it is not ready yet using load_obc_ertr = crimson::errorator< crimson::ct_error::object_corrupted>; + using load_obc_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + load_obc_ertr>; - seastar::future<> maybe_push_shards( + interruptible_future<> maybe_push_shards( const hobject_t& soid, eversion_t need); @@ -123,7 +130,7 @@ private: /// be relatively small. /// /// @return @c oi.version - seastar::future read_metadata_for_push_op( + interruptible_future read_metadata_for_push_op( const hobject_t& oid, const ObjectRecoveryProgress& progress, ObjectRecoveryProgress& new_progress, @@ -136,19 +143,19 @@ private: /// @param copy_subset extents we want /// @param offset the offset in object from where we should read /// @return the new offset - seastar::future read_object_for_push_op( + interruptible_future read_object_for_push_op( const hobject_t& oid, const interval_set& copy_subset, uint64_t offset, uint64_t max_len, PushOp* push_op); - seastar::future<> read_omap_for_push_op( + interruptible_future<> read_omap_for_push_op( const hobject_t& oid, const ObjectRecoveryProgress& progress, ObjectRecoveryProgress& new_progress, uint64_t* max_len, PushOp* push_op); - seastar::future prep_push_target( + interruptible_future prep_push_target( const ObjectRecoveryInfo &recovery_info, bool first, bool complete, @@ -156,4 +163,6 @@ private: ObjectStore::Transaction* t, const map &attrs, bufferlist&& omap_header); + using interruptor = crimson::interruptible::interruptor< + crimson::osd::IOInterruptCondition>; }; -- 2.39.5