From b23b1a4f128529b446fb0366241b72540afbaaae Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 12 Sep 2022 19:17:16 +0000 Subject: [PATCH] crimson/osd/client_request: use fresh tracking_events/handle instances on requeue See instance_handle_t explanation in client_request.h Fixes: https://tracker.ceph.com/issues/57494 Fixes: https://tracker.ceph.com/issues/57495 Signed-off-by: Samuel Just --- src/crimson/common/utility.h | 18 +++ .../osd/osd_operations/client_request.cc | 128 ++++++++++-------- .../osd/osd_operations/client_request.h | 122 ++++++++++++++--- src/crimson/osd/osdmap_gate.h | 1 + src/crimson/osd/pg_activation_blocker.h | 1 + 5 files changed, 190 insertions(+), 80 deletions(-) diff --git a/src/crimson/common/utility.h b/src/crimson/common/utility.h index 6fc50bb1c1a1..86b30815585c 100644 --- a/src/crimson/common/utility.h +++ b/src/crimson/common/utility.h @@ -18,3 +18,21 @@ void assert_moveable(const T& t) { static_assert(_impl::always_false::value, "unable to move-out from T"); } +namespace internal { + +template +static auto _apply_method_to_tuple( + Obj &obj, Method method, ArgTuple &&tuple, + std::index_sequence) { + return (obj.*method)(std::get(std::forward(tuple))...); +} + +} + +template +auto apply_method_to_tuple(Obj &obj, Method method, ArgTuple &&tuple) { + constexpr auto tuple_size = std::tuple_size_v; + return internal::_apply_method_to_tuple( + obj, method, std::forward(tuple), + std::make_index_sequence()); +} diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index db531f7bcb24..fca707f98512 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -24,11 +24,9 @@ namespace crimson::osd { void ClientRequest::Orderer::requeue( ShardServices &shard_services, Ref pg) { - for (auto &req: list) { - req.handle.exit(); - } for (auto &req: list) { logger().debug("{}: {} requeueing {}", __func__, *pg, req); + req.reset_instance_handle(); std::ignore = req.with_pg_int(shard_services, pg); } } @@ -47,7 +45,6 @@ void ClientRequest::Orderer::clear_and_cancel() void ClientRequest::complete_request() { track_event(); - handle.exit(); on_complete.set_value(); } @@ -55,7 +52,8 @@ ClientRequest::ClientRequest( OSD &osd, crimson::net::ConnectionRef conn, Ref &&m) : osd(osd), conn(std::move(conn)), - m(std::move(m)) + m(std::move(m)), + instance_handle(seastar::make_lw_shared()) {} ClientRequest::~ClientRequest() @@ -114,8 +112,10 @@ seastar::future<> ClientRequest::with_pg_int( } const auto this_instance_id = instance_id++; OperationRef opref{this}; + auto instance_handle = get_instance_handle(); + auto &ihref = *instance_handle; return interruptor::with_interruption( - [this, pgref, this_instance_id]() mutable { + [this, pgref, this_instance_id, &ihref]() mutable { PG &pg = *pgref; if (pg.can_discard_op(*m)) { return osd.send_incremental_map( @@ -127,28 +127,23 @@ seastar::future<> ClientRequest::with_pg_int( return interruptor::now(); }); } - return enter_stage(pp(pg).await_map - ).then_interruptible([this, this_instance_id, &pg] { + return ihref.enter_stage(pp(pg).await_map, *this + ).then_interruptible([this, this_instance_id, &pg, &ihref] { logger().debug("{}.{}: after await_map stage", *this, this_instance_id); - return with_blocking_event< - PG_OSDMapGate::OSDMapBlocker::BlockingEvent - >([this, &pg] (auto&& trigger) { - return pg.osdmap_gate.wait_for_map( - std::move(trigger), - m->get_min_epoch()); - }); - }).then_interruptible([this, this_instance_id, &pg](auto map) { + return ihref.enter_blocker( + *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map, + m->get_min_epoch(), nullptr); + }).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) { logger().debug("{}.{}: after wait_for_map", *this, this_instance_id); - return enter_stage(pp(pg).wait_for_active); - }).then_interruptible([this, this_instance_id, &pg]() { + return ihref.enter_stage(pp(pg).wait_for_active, *this); + }).then_interruptible([this, this_instance_id, &pg, &ihref]() { logger().debug( "{}.{}: after wait_for_active stage", *this, this_instance_id); - return with_blocking_event< - PGActivationBlocker::BlockingEvent - >([&pg] (auto&& trigger) { - return pg.wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this, pgref, this_instance_id]() mutable + return ihref.enter_blocker( + *this, + pg.wait_for_active_blocker, + &decltype(pg.wait_for_active_blocker)::wait); + }).then_interruptible([this, pgref, this_instance_id, &ihref]() mutable -> interruptible_future<> { logger().debug( "{}.{}: after wait_for_active", *this, this_instance_id); @@ -156,6 +151,7 @@ seastar::future<> ClientRequest::with_pg_int( return process_pg_op(pgref); } else { return process_op( + ihref, pgref ).then_interruptible([](auto){}); } @@ -167,7 +163,11 @@ seastar::future<> ClientRequest::with_pg_int( }, [this, this_instance_id, pgref](std::exception_ptr eptr) { // TODO: better debug output logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr); - }, pgref).finally([opref=std::move(opref), pgref=std::move(pgref)] {}); + }, pgref).finally( + [opref=std::move(opref), pgref=std::move(pgref), + instance_handle=std::move(instance_handle), &ihref] { + ihref.handle.exit(); + }); } seastar::future<> ClientRequest::with_pg( @@ -193,16 +193,17 @@ ClientRequest::process_pg_op( } ClientRequest::interruptible_future -ClientRequest::process_op(Ref &pg) +ClientRequest::process_op(instance_handle_t &ihref, Ref &pg) { - return enter_stage( - pp(*pg).recover_missing + return ihref.enter_stage( + pp(*pg).recover_missing, + *this ).then_interruptible( [this, pg]() mutable { return do_recover_missing(pg, m->get_hobj()); - }).then_interruptible([this, pg]() mutable { + }).then_interruptible([this, pg, &ihref]() mutable { return pg->already_complete(m->get_reqid()).then_unpack_interruptible( - [this, pg](bool completed, int ret) mutable + [this, pg, &ihref](bool completed, int ret) mutable -> PG::load_obc_iertr::future { if (completed) { auto reply = crimson::make_message( @@ -212,29 +213,35 @@ ClientRequest::process_op(Ref &pg) return seastar::make_ready_future(seq_mode_t::OUT_OF_ORDER); }); } else { - return enter_stage(pp(*pg).get_obc).then_interruptible( - [this, pg]() mutable -> PG::load_obc_iertr::future { + return ihref.enter_stage(pp(*pg).get_obc, *this + ).then_interruptible( + [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future { logger().debug("{}: got obc lock", *this); op_info.set_from_op(&*m, *pg->get_osdmap()); // XXX: `do_with()` is just a workaround for `with_obc_func_t` imposing // `future`. - return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) { - return pg->with_locked_obc(m->get_hobj(), op_info, - [this, pg, &mode](auto obc) mutable { - return enter_stage(pp(*pg).process).then_interruptible( - [this, pg, obc, &mode]() mutable { - return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) { - mode = _mode; - return seastar::now(); - }); - }); - }).safe_then_interruptible([&mode] { - return PG::load_obc_iertr::make_ready_future(mode); - }); - }); - }); + return seastar::do_with( + seq_mode_t{}, + [this, &pg, &ihref](seq_mode_t& mode) { + return pg->with_locked_obc( + m->get_hobj(), op_info, + [this, pg, &mode, &ihref](auto obc) mutable { + return ihref.enter_stage(pp(*pg).process, *this + ).then_interruptible( + [this, pg, obc, &mode, &ihref]() mutable { + return do_process(ihref, pg, obc + ).then_interruptible([&mode] (seq_mode_t _mode) { + mode = _mode; + return seastar::now(); + }); + }); + }).safe_then_interruptible([&mode] { + return PG::load_obc_iertr::make_ready_future(mode); + }); + }); + }); } - }); + }); }).safe_then_interruptible([pg=std::move(pg)] (const seq_mode_t mode) { return seastar::make_ready_future(mode); }, PG::load_obc_ertr::all_same_way([](auto &code) { @@ -258,7 +265,9 @@ auto ClientRequest::reply_op_error(Ref& pg, int err) } ClientRequest::interruptible_future -ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) +ClientRequest::do_process( + instance_handle_t &ihref, + Ref& pg, crimson::osd::ObjectContextRef obc) { if (!pg->is_primary()) { // primary can handle both normal ops and balanced reads @@ -291,25 +300,26 @@ ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) } return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible( - [this, pg](auto submitted, auto all_completed) mutable { - return submitted.then_interruptible([this, pg] { - return enter_stage(pp(*pg).wait_repop); + [this, pg, &ihref](auto submitted, auto all_completed) mutable { + return submitted.then_interruptible([this, pg, &ihref] { + return ihref.enter_stage(pp(*pg).wait_repop, *this); }).then_interruptible( - [this, pg, all_completed=std::move(all_completed)]() mutable { + [this, pg, all_completed=std::move(all_completed), &ihref]() mutable { return all_completed.safe_then_interruptible( - [this, pg](MURef reply) { - return enter_stage(pp(*pg).send_reply).then_interruptible( + [this, pg, &ihref](MURef reply) { + return ihref.enter_stage(pp(*pg).send_reply, *this + ).then_interruptible( [this, reply=std::move(reply)]() mutable { return conn->send(std::move(reply)).then([] { return seastar::make_ready_future(seq_mode_t::IN_ORDER); }); }); - }, crimson::ct_error::eagain::handle([this, pg]() mutable { - return process_op(pg); + }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable { + return process_op(ihref, pg); })); }); - }, crimson::ct_error::eagain::handle([this, pg]() mutable { - return process_op(pg); + }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable { + return process_op(ihref, pg); })); } diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 1d8b521afbe9..47143ac7ec00 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -18,6 +18,7 @@ #include "crimson/osd/pg_activation_blocker.h" #include "crimson/osd/pg_map.h" #include "crimson/common/type_helpers.h" +#include "crimson/common/utility.h" #include "messages/MOSDOp.h" namespace crimson::osd { @@ -30,7 +31,6 @@ class ClientRequest final : public PhasedOperationT, OSD &osd; const crimson::net::ConnectionRef conn; // must be after conn due to ConnectionPipeline's life-time - PipelineHandle handle; Ref m; OpInfo op_info; seastar::promise<> on_complete; @@ -52,6 +52,102 @@ public: friend class HistoricBackend; }; + /** + * instance_handle_t + * + * Client request is, at present, the only Operation which can be requeued. + * This is, mostly, fine. However, reusing the PipelineHandle or + * BlockingEvent structures before proving that the prior instance has stopped + * can create hangs or crashes due to violations of the BlockerT and + * PipelineHandle invariants. + * + * To solve this, we create an instance_handle_t which contains the events + * for the portion of execution that can be rerun as well as the + * PipelineHandle. ClientRequest::with_pg_int grabs a reference to the current + * instance_handle_t and releases its PipelineHandle in the finally block. + * On requeue, we create a new instance_handle_t with a fresh PipelineHandle + * and events tuple and use it and use it for the next invocation of + * with_pg_int. + */ + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + CompletionEvent + > tracking_events; + + class instance_handle_t + : public seastar::enable_lw_shared_from_this { + public: + using ref_t = seastar::lw_shared_ptr; + PipelineHandle handle; + + std::tuple< + PGPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGPipeline::WaitForActive::BlockingEvent, + PGActivationBlocker::BlockingEvent, + PGPipeline::RecoverMissing::BlockingEvent, + PGPipeline::GetOBC::BlockingEvent, + PGPipeline::Process::BlockingEvent, + PGPipeline::WaitRepop::BlockingEvent, + PGPipeline::SendReply::BlockingEvent, + CompletionEvent + > pg_tracking_events; + + template + auto with_blocking_event(F &&f, ClientRequest &op) { + auto ret = std::forward(f)( + typename BlockingEventT::template Trigger{ + std::get(pg_tracking_events), op + }); + if constexpr (std::is_same_v) { + return ret; + } else { + using ret_t = decltype(ret); + return typename InterruptorT::template futurize_t{std::move(ret)}; + } + } + + template + auto enter_stage(StageT &stage, ClientRequest &op) { + return this->template with_blocking_event< + typename StageT::BlockingEvent, + InterruptorT>( + [&stage, this](auto &&trigger) { + return handle.template enter( + stage, std::move(trigger)); + }, op); + } + + template < + typename InterruptorT=void, typename BlockingObj, typename Method, + typename... Args> + auto enter_blocker( + ClientRequest &op, BlockingObj &obj, Method method, Args&&... args) { + return this->template with_blocking_event< + typename BlockingObj::Blocker::BlockingEvent, + InterruptorT>( + [&obj, method, + args=std::forward_as_tuple(std::move(args)...)](auto &&trigger) mutable { + return apply_method_to_tuple( + obj, method, + std::tuple_cat( + std::forward_as_tuple(std::move(trigger)), + std::move(args)) + ); + }, op); + } + }; + instance_handle_t::ref_t instance_handle; + void reset_instance_handle() { + instance_handle = seastar::make_lw_shared(); + } + auto get_instance_handle() { return instance_handle; } + using ordering_hook_t = boost::intrusive::list_member_hook<>; ordering_hook_t ordering_hook; class Orderer { @@ -93,7 +189,7 @@ public: return m->get_spg(); } ConnectionPipeline &get_connection_pipeline(); - PipelineHandle &get_handle() { return handle; } + PipelineHandle &get_handle() { return instance_handle->handle; } epoch_t get_epoch() const { return m->get_min_epoch(); } seastar::future<> with_pg_int( @@ -114,6 +210,7 @@ private: }; interruptible_future do_process( + instance_handle_t &ihref, Ref& pg, crimson::osd::ObjectContextRef obc); ::crimson::interruptible::interruptible_future< @@ -121,7 +218,8 @@ private: Ref &pg); ::crimson::interruptible::interruptible_future< ::crimson::osd::IOInterruptCondition, seq_mode_t> process_op( - Ref &pg); + instance_handle_t &ihref, + Ref &pg); bool is_pg_op() const; ConnectionPipeline &cp(); @@ -136,24 +234,6 @@ private: bool is_misdirected(const PG& pg) const; public: - std::tuple< - StartEvent, - ConnectionPipeline::AwaitActive::BlockingEvent, - ConnectionPipeline::AwaitMap::BlockingEvent, - OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, - ConnectionPipeline::GetPG::BlockingEvent, - PGMap::PGCreationBlockingEvent, - PGPipeline::AwaitMap::BlockingEvent, - PG_OSDMapGate::OSDMapBlocker::BlockingEvent, - PGPipeline::WaitForActive::BlockingEvent, - PGActivationBlocker::BlockingEvent, - PGPipeline::RecoverMissing::BlockingEvent, - PGPipeline::GetOBC::BlockingEvent, - PGPipeline::Process::BlockingEvent, - PGPipeline::WaitRepop::BlockingEvent, - PGPipeline::SendReply::BlockingEvent, - CompletionEvent - > tracking_events; friend class LttngBackend; friend class HistoricBackend; diff --git a/src/crimson/osd/osdmap_gate.h b/src/crimson/osd/osdmap_gate.h index 9a7d542db3e3..d76c4b82f37c 100644 --- a/src/crimson/osd/osdmap_gate.h +++ b/src/crimson/osd/osdmap_gate.h @@ -45,6 +45,7 @@ public: void dump_detail(Formatter *f) const final; }; + using Blocker = OSDMapBlocker; private: // order the promises in ascending order of the waited osdmap epoch, diff --git a/src/crimson/osd/pg_activation_blocker.h b/src/crimson/osd/pg_activation_blocker.h index fd07d9b41410..fff8219d1135 100644 --- a/src/crimson/osd/pg_activation_blocker.h +++ b/src/crimson/osd/pg_activation_blocker.h @@ -24,6 +24,7 @@ protected: public: static constexpr const char *type_name = "PGActivationBlocker"; + using Blocker = PGActivationBlocker; PGActivationBlocker(PG *pg) : pg(pg) {} void unblock(); -- 2.47.3