From 633f45ac3bf3339629abd4c935ff5e4631e9fd83 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Tue, 12 Apr 2022 10:37:16 +0200 Subject: [PATCH] crimson/osd: convert entire ClientRequest to the new infra MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- .../osd/osd_operation_external_tracking.h | 68 +++++++++++++++++++ .../osd/osd_operations/client_request.cc | 64 ++++++++--------- .../osd/osd_operations/client_request.h | 13 ++++ 3 files changed, 114 insertions(+), 31 deletions(-) diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index eaccd501a09..3eedead0468 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -9,6 +9,8 @@ #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/recovery_subrequest.h" #include "crimson/osd/osd_operations/replicated_request.h" +#include "crimson/osd/pg_activation_blocker.h" +#include "crimson/osd/pg_map.h" namespace crimson::osd { @@ -17,6 +19,17 @@ struct LttngBackend : ClientRequest::StartEvent::Backend, ClientRequest::ConnectionPipeline::AwaitMap::BlockingEvent::Backend, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend, + ClientRequest::ConnectionPipeline::GetPG::BlockingEvent::Backend, + PGMap::PGCreationBlockingEvent::Backend, + ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend, + ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend, + PGActivationBlocker::BlockingEvent::Backend, + ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend, + ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend, + ClientRequest::PGPipeline::Process::BlockingEvent::Backend, + ClientRequest::PGPipeline::WaitRepop::BlockingEvent::Backend, + ClientRequest::PGPipeline::SendReply::BlockingEvent::Backend, ClientRequest::CompletionEvent::Backend { void handle(ClientRequest::StartEvent&, @@ -32,6 +45,61 @@ struct LttngBackend const OSD_OSDMapGate::OSDMapBlocker&) override { } + void handle(ClientRequest::ConnectionPipeline::GetPG::BlockingEvent& ev, + const Operation& op, + const ClientRequest::ConnectionPipeline::GetPG& blocker) override { + } + + void handle(PGMap::PGCreationBlockingEvent&, + const Operation&, + const PGMap::PGCreationBlocker&) override { + } + + void handle(ClientRequest::PGPipeline::AwaitMap::BlockingEvent& ev, + const Operation& op, + const ClientRequest::PGPipeline::AwaitMap& blocker) override { + } + + void handle(PG_OSDMapGate::OSDMapBlocker::BlockingEvent&, + const Operation&, + const PG_OSDMapGate::OSDMapBlocker&) override { + } + + void handle(ClientRequest::PGPipeline::WaitForActive::BlockingEvent& ev, + const Operation& op, + const ClientRequest::PGPipeline::WaitForActive& blocker) override { + } + + void handle(PGActivationBlocker::BlockingEvent& ev, + const Operation& op, + const PGActivationBlocker& blocker) override { + } + + void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev, + const Operation& op, + const ClientRequest::PGPipeline::RecoverMissing& blocker) override { + } + + void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev, + const Operation& op, + const ClientRequest::PGPipeline::GetOBC& blocker) override { + } + + void handle(ClientRequest::PGPipeline::Process::BlockingEvent& ev, + const Operation& op, + const ClientRequest::PGPipeline::Process& blocker) override { + } + + void handle(ClientRequest::PGPipeline::WaitRepop::BlockingEvent& ev, + const Operation& op, + const ClientRequest::PGPipeline::WaitRepop& blocker) override { + } + + void handle(ClientRequest::PGPipeline::SendReply::BlockingEvent& ev, + const Operation& op, + const ClientRequest::PGPipeline::SendReply& blocker) override { + } + void handle(ClientRequest::CompletionEvent&, const Operation&) override {} }; diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index bca3952c772..649293460f3 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -92,9 +92,12 @@ seastar::future<> ClientRequest::start() m->get_min_epoch()); }); }).then([this](epoch_t epoch) { - return with_blocking_future(handle.enter(cp().get_pg)); + return enter_stage<>(cp().get_pg); }).then([this] { - return with_blocking_future(osd.wait_for_pg(m->get_spg())); + return with_blocking_event( + [this] (auto&& trigger) { + return osd.wait_for_pg(std::move(trigger), m->get_spg()); + }); }).then([this](Ref pgref) mutable { return interruptor::with_interruption([this, pgref]() mutable { epoch_t same_interval_since = pgref->get_interval_start_epoch(); @@ -111,17 +114,21 @@ seastar::future<> ClientRequest::start() return interruptor::now(); }); } - return with_blocking_future_interruptible( - handle.enter(pp(pg).await_map) - ).then_interruptible([this, &pg] { - return with_blocking_future_interruptible( - pg.osdmap_gate.wait_for_map(m->get_min_epoch())); - }).then_interruptible([this, &pg](auto map) { - return with_blocking_future_interruptible( - handle.enter(pp(pg).wait_for_active)); + return enter_stage( + pp(pg).await_map + ).then_interruptible([this, &pg] { + return with_blocking_event( + [this, &pg] (auto&& trigger) { + return pg.osdmap_gate.wait_for_map(std::move(trigger), + m->get_min_epoch()); + }); + }).then_interruptible([this, &pg](auto&&) { + return enter_stage<>(pp(pg).wait_for_active); }).then_interruptible([this, &pg]() { - return with_blocking_future_interruptible( - pg.wait_for_active_blocker.wait()); + return with_blocking_event( + [&pg] (auto&& trigger) { + return pg.wait_for_active_blocker.wait(std::move(trigger)); + }); }).then_interruptible([this, pgref=std::move(pgref)]() mutable { if (is_pg_op()) { return process_pg_op(pgref).then_interruptible([this] { @@ -169,9 +176,9 @@ ClientRequest::process_pg_op( ClientRequest::interruptible_future ClientRequest::process_op(Ref &pg) { - return with_blocking_future_interruptible( - handle.enter(pp(*pg).recover_missing)) - .then_interruptible( + return enter_stage( + pp(*pg).recover_missing + ).then_interruptible( [this, pg]() mutable { return do_recover_missing(pg, m->get_hobj()); }).then_interruptible([this, pg]() mutable { @@ -186,8 +193,7 @@ ClientRequest::process_op(Ref &pg) return seastar::make_ready_future(seq_mode_t::OUT_OF_ORDER); }); } else { - return with_blocking_future_interruptible( - handle.enter(pp(*pg).get_obc)).then_interruptible( + return enter_stage(pp(*pg).get_obc).then_interruptible( [this, pg]() mutable -> PG::load_obc_iertr::future { logger().debug("{}: got obc lock", *this); op_info.set_from_op(&*m, *pg->get_osdmap()); @@ -196,9 +202,8 @@ ClientRequest::process_op(Ref &pg) return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) { return pg->with_locked_obc(m->get_hobj(), op_info, [this, pg, &mode](auto obc) mutable { - return with_blocking_future_interruptible( - handle.enter(pp(*pg).process) - ).then_interruptible([this, pg, obc, &mode]() mutable { + return enter_stage(pp(*pg).process).then_interruptible( + [this, pg, obc, &mode]() mutable { return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) { mode = _mode; return seastar::now(); @@ -264,21 +269,18 @@ ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible( [this, pg](auto submitted, auto all_completed) mutable { - return submitted.then_interruptible( - [this, pg] { - return with_blocking_future_interruptible( - handle.enter(pp(*pg).wait_repop)); + return submitted.then_interruptible([this, pg] { + return enter_stage(pp(*pg).wait_repop); }).then_interruptible( [this, pg, all_completed=std::move(all_completed)]() mutable { return all_completed.safe_then_interruptible( [this, pg](MURef reply) { - return with_blocking_future_interruptible( - handle.enter(pp(*pg).send_reply)).then_interruptible( - [this, reply=std::move(reply)]() mutable{ - return conn->send(std::move(reply)).then([] { - return seastar::make_ready_future(seq_mode_t::IN_ORDER); - }); - }); + return enter_stage(pp(*pg).send_reply).then_interruptible( + [this, reply=std::move(reply)]() mutable { + return conn->send(std::move(reply)).then([] { + return seastar::make_ready_future(seq_mode_t::IN_ORDER); + }); + }); }, crimson::ct_error::eagain::handle([this, pg]() mutable { return process_op(pg); })); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index f0fbf4f88aa..024b514db65 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -10,6 +10,8 @@ #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request_common.h" #include "crimson/osd/osd_operations/common/pg_pipeline.h" +#include "crimson/osd/pg_activation_blocker.h" +#include "crimson/osd/pg_map.h" #include "crimson/common/type_helpers.h" #include "messages/MOSDOp.h" @@ -109,6 +111,17 @@ public: StartEvent, ConnectionPipeline::AwaitMap::BlockingEvent, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + PGPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGPipeline::WaitForActive::BlockingEvent, + PGActivationBlocker::BlockingEvent, + PGPipeline::RecoverMissing::BlockingEvent, + PGPipeline::GetOBC::BlockingEvent, + PGPipeline::Process::BlockingEvent, + PGPipeline::WaitRepop::BlockingEvent, + PGPipeline::SendReply::BlockingEvent, CompletionEvent > tracking_events; -- 2.39.5