From: Samuel Just Date: Wed, 11 Sep 2024 01:31:57 +0000 (+0000) Subject: crimson/.../internal_client_request: factor out with_interruption X-Git-Tag: v20.0.0~812^2~16 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a091414c67ba9f1407c3756dd75ca2aa3b1074ac;p=ceph.git crimson/.../internal_client_request: factor out with_interruption Signed-off-by: Samuel Just --- diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index b1224f6e25942..d4213928a3e46 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -50,6 +50,77 @@ CommonPGPipeline& InternalClientRequest::client_pp() return pg->request_pg_pipeline; } +InternalClientRequest::interruptible_future<> +InternalClientRequest::with_interruption() +{ + return enter_stage( + client_pp().wait_for_active + ).then_interruptible([this] { + return with_blocking_event([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this] { + return enter_stage( + client_pp().recover_missing); + }).then_interruptible([this] { + return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); + }).then_interruptible([this](bool unfound) { + if (unfound) { + throw std::system_error( + std::make_error_code(std::errc::operation_canceled), + fmt::format("{} is unfound, drop it!", get_target_oid())); + } + return enter_stage( + client_pp().get_obc); + }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { + LOG_PREFIX(InternalClientRequest::with_interruption); + DEBUGI("{}: getting obc lock", *this); + return seastar::do_with( + create_osd_ops(), + [this](auto& osd_ops) mutable { + LOG_PREFIX(InternalClientRequest::with_interruption); + DEBUGI("InternalClientRequest: got {} OSDOps to execute", + std::size(osd_ops)); + [[maybe_unused]] const int ret = op_info.set_from_op( + std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); + assert(ret == 0); + // call with_locked_obc() in order, but wait concurrently for loading. + enter_stage_sync(client_pp().lock_obc); + return pg->with_locked_obc( + get_target_oid(), op_info, + [&osd_ops, this](auto, auto obc) { + return enter_stage(client_pp().process + ).then_interruptible( + [obc=std::move(obc), &osd_ops, this] { + return pg->do_osd_ops( + std::move(obc), + osd_ops, + std::as_const(op_info), + get_do_osd_ops_params() + ).safe_then_unpack_interruptible( + [](auto submitted, auto all_completed) { + return all_completed.handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + return seastar::now(); + })); + }, crimson::ct_error::eagain::handle([] { + return interruptor::now(); + }) + ); + }); + }); + }); + }).si_then([this] { + logger().debug("{}: complete", *this); + return handle.complete(); + }).handle_error_interruptible( + PG::load_obc_ertr::all_same_way([] { + return seastar::now(); + }) + ); +} + seastar::future<> InternalClientRequest::start() { track_event(); @@ -57,72 +128,7 @@ seastar::future<> InternalClientRequest::start() DEBUGI("{}: in repeat", *this); return interruptor::with_interruption([this]() mutable { - return enter_stage( - client_pp().wait_for_active - ).then_interruptible([this] { - return with_blocking_event([this] (auto&& trigger) { - return pg->wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this] { - return enter_stage( - client_pp().recover_missing); - }).then_interruptible([this] { - return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); - }).then_interruptible([this](bool unfound) { - if (unfound) { - throw std::system_error( - std::make_error_code(std::errc::operation_canceled), - fmt::format("{} is unfound, drop it!", get_target_oid())); - } - return enter_stage( - client_pp().get_obc); - }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("{}: getting obc lock", *this); - return seastar::do_with( - create_osd_ops(), - [this](auto& osd_ops) mutable { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("InternalClientRequest: got {} OSDOps to execute", - std::size(osd_ops)); - [[maybe_unused]] const int ret = op_info.set_from_op( - std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); - assert(ret == 0); - // call with_locked_obc() in order, but wait concurrently for loading. - enter_stage_sync(client_pp().lock_obc); - return pg->with_locked_obc( - get_target_oid(), op_info, - [&osd_ops, this](auto, auto obc) { - return enter_stage(client_pp().process - ).then_interruptible( - [obc=std::move(obc), &osd_ops, this] { - return pg->do_osd_ops( - std::move(obc), - osd_ops, - std::as_const(op_info), - get_do_osd_ops_params() - ).safe_then_unpack_interruptible( - [](auto submitted, auto all_completed) { - return all_completed.handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - return seastar::now(); - })); - }, crimson::ct_error::eagain::handle([] { - return interruptor::now(); - }) - ); - }); - }); - }); - }).si_then([this] { - logger().debug("{}: complete", *this); - return handle.complete(); - }).handle_error_interruptible( - PG::load_obc_ertr::all_same_way([] { - return seastar::now(); - }) - ); + return with_interruption(); }, [](std::exception_ptr eptr) { return seastar::now(); }, pg, start_epoch).then([this] { diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h index f198e58464338..2f3585013344d 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.h +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -41,6 +41,8 @@ private: CommonPGPipeline& client_pp(); + InternalClientRequest::interruptible_future<> with_interruption(); + seastar::future<> do_process(); Ref pg;