From: Samuel Just Date: Thu, 24 Oct 2024 23:39:46 +0000 (-0700) Subject: crimson: convert client_request, internal_client_request, snaptrim_event to use obc... X-Git-Tag: v20.0.0~524^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f655f7f58713c5468f3b0dba6b92d1d85683083d;p=ceph.git crimson: convert client_request, internal_client_request, snaptrim_event to use obc stages Signed-off-by: Samuel Just --- diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index ee78b5632100..fcd0f318db2e 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -101,7 +101,7 @@ PerShardPipeline &ClientRequest::get_pershard_pipeline( return shard_services.get_client_request_pipeline(); } -ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg) +CommonPGPipeline &ClientRequest::client_pp(PG &pg) { return pg.request_pg_pipeline; } @@ -140,6 +140,15 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib DEBUGDPP("{} start", *pgref, *this); PG &pg = *pgref; + + DEBUGDPP("{}.{}: entering wait_pg_ready stage", + *pgref, *this, this_instance_id); + // The prior stage is OrderedExclusive (PerShardPipeline::create_or_wait_pg) + // and wait_pg_ready is OrderedConcurrent. This transition, therefore, cannot + // block and using enter_stage_sync is legal and more efficient than + // enter_stage. + ihref.enter_stage_sync(client_pp(pg).wait_pg_ready, *this); + if (!m->get_hobj().get_key().empty()) { // There are no users of locator. It was used to ensure that multipart-upload // parts would end up in the same PG so that they could be clone_range'd into @@ -156,28 +165,22 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id); co_return; } - DEBUGDPP("{}.{}: entering await_map stage", - *pgref, *this, this_instance_id); - co_await ihref.enter_stage(client_pp(pg).await_map, *this); - DEBUGDPP("{}.{}: entered await_map stage, waiting for map", - pg, *this, this_instance_id); + auto map_epoch = co_await interruptor::make_interruptible( ihref.enter_blocker( *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map, m->get_min_epoch(), nullptr)); - DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active", + DEBUGDPP("{}.{}: waited for epoch {}, waiting for active", pg, *this, this_instance_id, map_epoch); - co_await ihref.enter_stage(client_pp(pg).wait_for_active, *this); - - DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active", - pg, *this, this_instance_id); co_await interruptor::make_interruptible( ihref.enter_blocker( *this, pg.wait_for_active_blocker, &decltype(pg.wait_for_active_blocker)::wait)); + co_await ihref.enter_stage(client_pp(pg).get_obc, *this); + if (int res = op_info.set_from_op(&*m, *pg.get_osdmap()); res != 0) { co_await reply_op_error(pgref, res); @@ -345,7 +348,13 @@ ClientRequest::process_op( instance_handle_t &ihref, Ref pg, unsigned this_instance_id) { LOG_PREFIX(ClientRequest::process_op); - ihref.enter_stage_sync(client_pp(*pg).recover_missing, *this); + ihref.obc_orderer = pg->obc_loader.get_obc_orderer(m->get_hobj()); + auto obc_manager = pg->obc_loader.get_obc_manager( + *(ihref.obc_orderer), + m->get_hobj()); + co_await ihref.enter_stage( + ihref.obc_orderer->obc_pp().process, *this); + if (!pg->is_primary()) { DEBUGDPP( "Skipping recover_missings on non primary pg for soid {}", @@ -365,16 +374,6 @@ ClientRequest::process_op( } } - /** - * The previous stage of recover_missing is a concurrent phase. - * Checking for already_complete requests must done exclusively. - * Since get_obc is also an exclusive stage, we can merge both stages into - * a single stage and avoid stage switching overhead. - */ - DEBUGDPP("{}.{}: entering check_already_complete_get_obc", - *pg, *this, this_instance_id); - co_await ihref.enter_stage( - client_pp(*pg).check_already_complete_get_obc, *this); DEBUGDPP("{}.{}: checking already_complete", *pg, *this, this_instance_id); auto completed = co_await pg->already_complete(m->get_reqid()); @@ -402,12 +401,6 @@ ClientRequest::process_op( DEBUGDPP("{}.{}: past scrub blocker, getting obc", *pg, *this, this_instance_id); - auto obc_manager = pg->obc_loader.get_obc_manager(m->get_hobj()); - - // initiate load_and_lock in order, but wait concurrently - ihref.enter_stage_sync( - client_pp(*pg).lock_obc, *this); - int load_err = co_await pg->obc_loader.load_and_lock( obc_manager, pg->get_lock_type(op_info) ).si_then([]() -> int { @@ -425,13 +418,8 @@ ClientRequest::process_op( co_return; } - DEBUGDPP("{}.{}: got obc {}, entering process stage", + DEBUGDPP("{}.{}: obc {} loaded and locked, calling do_process", *pg, *this, this_instance_id, obc_manager.get_obc()->obs); - co_await ihref.enter_stage( - client_pp(*pg).process, *this); - - DEBUGDPP("{}.{}: in process stage, calling do_process", - *pg, *this, this_instance_id); co_await do_process( ihref, pg, obc_manager.get_obc(), this_instance_id ); @@ -553,12 +541,14 @@ ClientRequest::do_process( std::move(ox), m->ops); co_await std::move(submitted); } - co_await ihref.enter_stage(client_pp(*pg).wait_repop, *this); + co_await ihref.enter_stage( + ihref.obc_orderer->obc_pp().wait_repop, *this); co_await std::move(all_completed); } - co_await ihref.enter_stage(client_pp(*pg).send_reply, *this); + co_await ihref.enter_stage( + ihref.obc_orderer->obc_pp().send_reply, *this); if (ret) { int err = -ret->value(); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 6d1043e27835..bbf4ed33e451 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -11,6 +11,7 @@ #include "osd/osd_op_util.h" #include "crimson/net/Connection.h" #include "crimson/osd/object_context.h" +#include "crimson/osd/object_context_loader.h" #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request_common.h" @@ -93,20 +94,18 @@ public: // don't leave any references on the source core, so we just bypass it by using // intrusive_ptr instead. using ref_t = boost::intrusive_ptr; + std::optional obc_orderer; PipelineHandle handle; std::tuple< - PGPipeline::AwaitMap::BlockingEvent, + CommonPGPipeline::WaitPGReady::BlockingEvent, PG_OSDMapGate::OSDMapBlocker::BlockingEvent, - PGPipeline::WaitForActive::BlockingEvent, PGActivationBlocker::BlockingEvent, - PGPipeline::RecoverMissing::BlockingEvent, + CommonPGPipeline::GetOBC::BlockingEvent, + CommonOBCPipeline::Process::BlockingEvent, scrub::PGScrubber::BlockingEvent, - PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, - PGPipeline::LockOBC::BlockingEvent, - PGPipeline::Process::BlockingEvent, - PGPipeline::WaitRepop::BlockingEvent, - PGPipeline::SendReply::BlockingEvent, + CommonOBCPipeline::WaitRepop::BlockingEvent, + CommonOBCPipeline::SendReply::BlockingEvent, CompletionEvent > pg_tracking_events; @@ -293,7 +292,7 @@ private: unsigned this_instance_id); bool is_pg_op() const; - PGPipeline &client_pp(PG &pg); + CommonPGPipeline &client_pp(PG &pg); template using interruptible_errorator = diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index 58f3bafe4109..b8f7646bc743 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -57,7 +57,12 @@ InternalClientRequest::with_interruption() LOG_PREFIX(InternalClientRequest::with_interruption); assert(pg->is_active()); - co_await enter_stage(client_pp().recover_missing); + obc_orderer = pg->obc_loader.get_obc_orderer(get_target_oid()); + auto obc_manager = pg->obc_loader.get_obc_manager( + *obc_orderer, + get_target_oid()); + + co_await enter_stage(obc_orderer->obc_pp().process); bool unfound = co_await do_recover_missing( pg, get_target_oid(), osd_reqid_t()); @@ -67,10 +72,8 @@ InternalClientRequest::with_interruption() std::make_error_code(std::errc::operation_canceled), fmt::format("{} is unfound, drop it!", get_target_oid())); } - co_await enter_stage( - client_pp().check_already_complete_get_obc); - DEBUGI("{}: getting obc lock", *this); + DEBUGI("{}: generating ops", *this); auto osd_ops = create_osd_ops(); @@ -80,21 +83,12 @@ InternalClientRequest::with_interruption() std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); assert(ret == 0); - auto obc_manager = pg->obc_loader.get_obc_manager(get_target_oid()); - - // initiate load_and_lock in order, but wait concurrently - enter_stage_sync(client_pp().lock_obc); - co_await pg->obc_loader.load_and_lock( obc_manager, pg->get_lock_type(op_info) ).handle_error_interruptible( crimson::ct_error::assert_all("unexpected error") ); - DEBUGDPP("{}: got obc {}, entering process stage", - *pg, *this, obc_manager.get_obc()->obs); - co_await enter_stage(client_pp().process); - auto params = get_do_osd_ops_params(); OpsExecuter ox( pg, obc_manager.get_obc(), op_info, params, params.get_connection(), @@ -114,6 +108,9 @@ InternalClientRequest::with_interruption() std::move(ox), osd_ops); co_await std::move(submitted); + + co_await enter_stage(obc_orderer->obc_pp().wait_repop); + co_await std::move(completed); DEBUGDPP("{}: complete", *pg, *this); diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h index cc457f35412e..1cfde4ab080e 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.h +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -4,6 +4,7 @@ #pragma once #include "crimson/common/type_helpers.h" +#include "crimson/osd/object_context_loader.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request_common.h" #include "crimson/osd/pg.h" @@ -48,6 +49,7 @@ private: Ref pg; epoch_t start_epoch; OpInfo op_info; + std::optional obc_orderer; PipelineHandle handle; public: @@ -55,12 +57,8 @@ public: std::tuple< StartEvent, - CommonPGPipeline::WaitForActive::BlockingEvent, - PGActivationBlocker::BlockingEvent, - CommonPGPipeline::RecoverMissing::BlockingEvent, - CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, - CommonPGPipeline::LockOBC::BlockingEvent, - CommonPGPipeline::Process::BlockingEvent, + CommonOBCPipeline::Process::BlockingEvent, + CommonOBCPipeline::WaitRepop::BlockingEvent, CompletionEvent > tracking_events; }; diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index 459c98bb9c0f..f8fb7aef6f26 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -388,20 +388,24 @@ SnapTrimObjSubEvent::remove_or_update( SnapTrimObjSubEvent::snap_trim_obj_subevent_ret_t SnapTrimObjSubEvent::start() { + obc_orderer = pg->obc_loader.get_obc_orderer( + coid); + ceph_assert(pg->is_active_clean()); - auto exit_handle = seastar::defer([this] { - logger().debug("{}: exit", *this); - handle.exit(); + auto exit_handle = seastar::defer([this, opref = IRef(this)] { + logger().debug("{}: exit", *opref); + std::ignore = handle.complete().then([opref = std::move(opref)] {}); }); co_await enter_stage( - client_pp().check_already_complete_get_obc); + obc_orderer->obc_pp().process); logger().debug("{}: getting obc for {}", *this, coid); auto obc_manager = pg->obc_loader.get_obc_manager( + *obc_orderer, coid, false /* resolve_oid */); co_await pg->obc_loader.load_and_lock( @@ -413,7 +417,6 @@ SnapTrimObjSubEvent::start() logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid()); - co_await enter_stage(client_pp().process); auto all_completed = interruptor::now(); { // as with PG::submit_executer, we need to build the pg log entries @@ -440,12 +443,11 @@ SnapTrimObjSubEvent::start() co_await std::move(submitted); } - co_await enter_stage(client_pp().wait_repop); + co_await enter_stage(obc_orderer->obc_pp().wait_repop); co_await std::move(all_completed); logger().debug("{}: completed", *this); - co_await interruptor::make_interruptible(handle.complete()); } void SnapTrimObjSubEvent::print(std::ostream &lhs) const diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h index fc99138d62f4..a2b4d3575684 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.h +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -6,6 +6,7 @@ #include #include +#include "crimson/osd/object_context_loader.h" #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/osd_operation.h" #include "crimson/common/subop_blocker.h" @@ -154,6 +155,7 @@ private: } Ref pg; + std::optional obc_orderer; PipelineHandle handle; osd_op_params_t osd_op_p; const hobject_t coid; @@ -165,9 +167,8 @@ public: std::tuple< StartEvent, - CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, - CommonPGPipeline::Process::BlockingEvent, - CommonPGPipeline::WaitRepop::BlockingEvent, + CommonOBCPipeline::Process::BlockingEvent, + CommonOBCPipeline::WaitRepop::BlockingEvent, CompletionEvent > tracking_events; };