From: Samuel Just Date: Wed, 7 Feb 2024 01:19:37 +0000 (-0800) Subject: crimson/.../client_request: convert process_op to coroutine X-Git-Tag: v19.1.1~356^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=aac772f39fb50928bfefec09ea1ebcaa538a720e;p=ceph.git crimson/.../client_request: convert process_op to coroutine Signed-off-by: Samuel Just (cherry picked from commit a4905fb3e2f2400656f0b436275e9fa745f14cc1) --- diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 2f71cdacd3a87..2514c08993ff5 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -231,74 +231,73 @@ ClientRequest::process_op( instance_handle_t &ihref, Ref pg, unsigned this_instance_id) { LOG_PREFIX(ClientRequest::process_op); - return ihref.enter_stage( + co_await ihref.enter_stage( client_pp(*pg).recover_missing, *this - ).then_interruptible([pg, this]() mutable { - return recover_missings(pg, m->get_hobj(), snaps_need_to_recover()); - }).then_interruptible([FNAME, this, pg, this_instance_id, &ihref]() mutable { - DEBUGDPP("{}.{}: checking already_complete", + ); + co_await recover_missings(pg, m->get_hobj(), snaps_need_to_recover()); + + DEBUGDPP("{}.{}: checking already_complete", + *pg, *this, this_instance_id); + auto completed = co_await pg->already_complete(m->get_reqid()); + + if (completed) { + DEBUGDPP("{}.{}: already completed, sending reply", *pg, *this, this_instance_id); - return pg->already_complete(m->get_reqid()).then_interruptible( - [FNAME, this, pg, this_instance_id, &ihref](auto completed) mutable - -> PG::load_obc_iertr::future<> { - if (completed) { - DEBUGDPP("{}.{}: already completed, sending reply", - *pg, *this, this_instance_id); - auto reply = crimson::make_message( - m.get(), completed->err, pg->get_osdmap_epoch(), - CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); - reply->set_reply_versions(completed->version, completed->user_version); - // TODO: gate the crosscore sending - return get_foreign_connection().send_with_throttling(std::move(reply)); - } else { - DEBUGDPP("{}.{}: not completed, entering get_obc stage", - *pg, *this, this_instance_id); - return ihref.enter_stage(client_pp(*pg).get_obc, *this - ).then_interruptible( - [FNAME, this, pg, this_instance_id, &ihref]() mutable - -> PG::load_obc_iertr::future<> { - DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub", + auto reply = crimson::make_message( + m.get(), completed->err, pg->get_osdmap_epoch(), + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); + reply->set_reply_versions(completed->version, completed->user_version); + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply)) + ); + co_return; + } + + DEBUGDPP("{}.{}: not completed, entering get_obc stage", + *pg, *this, this_instance_id); + co_await ihref.enter_stage(client_pp(*pg).get_obc, *this); + + DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub", + *pg, *this, this_instance_id); + if (int res = op_info.set_from_op(&*m, *pg->get_osdmap()); + res != 0) { + co_await reply_op_error(pg, res); + co_return; + } + co_await ihref.enter_blocker( + *this, pg->scrubber, &decltype(pg->scrubber)::wait_scrub, + m->get_hobj()); + + DEBUGDPP("{}.{}: past scrub blocker, getting obc", + *pg, *this, this_instance_id); + co_await pg->with_locked_obc( + m->get_hobj(), op_info, + [FNAME, this, pg, this_instance_id, &ihref] ( + auto head, auto obc + ) -> interruptible_future<> { + DEBUGDPP("{}.{}: got obc {}, entering process stage", + *pg, *this, this_instance_id, obc->obs); + return ihref.enter_stage( + client_pp(*pg).process, *this + ).then_interruptible( + [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable { + DEBUGDPP("{}.{}: in process stage, calling do_process", *pg, *this, this_instance_id); - if (int res = op_info.set_from_op(&*m, *pg->get_osdmap()); - res != 0) { - return reply_op_error(pg, res); - } - return ihref.enter_blocker( - *this, - pg->scrubber, - &decltype(pg->scrubber)::wait_scrub, - m->get_hobj() - ).then_interruptible( - [FNAME, this, pg, this_instance_id, &ihref]() mutable { - DEBUGDPP("{}.{}: past scrub blocker, getting obc", - *pg, *this, this_instance_id); - return pg->with_locked_obc( - m->get_hobj(), op_info, - [FNAME, this, pg, this_instance_id, &ihref]( - auto head, auto obc) mutable { - DEBUGDPP("{}.{}: got obc {}, entering process stage", - *pg, *this, this_instance_id, obc->obs); - return ihref.enter_stage( - client_pp(*pg).process, *this - ).then_interruptible( - [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable { - DEBUGDPP("{}.{}: in process stage, calling do_process", - *pg, *this, this_instance_id); - return do_process(ihref, pg, obc, this_instance_id); - }); - }); - }); - }); - } - }); - }).handle_error_interruptible( + return do_process(ihref, pg, obc, this_instance_id); + }); + } + ).handle_error_interruptible( PG::load_obc_ertr::all_same_way( - [FNAME, this, pg=std::move(pg), this_instance_id](const auto &code) { - DEBUGDPP("{}.{}: saw error code {}", - *pg, *this, this_instance_id, code); - assert(code.value() > 0); - return reply_op_error(pg, -code.value()); - })); + [FNAME, this, pg=std::move(pg), this_instance_id]( + const auto &code + ) -> interruptible_future<> { + DEBUGDPP("{}.{}: saw error code {}", + *pg, *this, this_instance_id, code); + assert(code.value() > 0); + return reply_op_error(pg, -code.value()); + }) + ); } ClientRequest::interruptible_future<>