From d427b9029b6de03fd4d15b65bf3d6e07bacca42e Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 12 Jul 2022 23:35:50 +0000 Subject: [PATCH] crimson/osd: move start_pg_operation to pg_shard_manager Signed-off-by: Samuel Just --- src/crimson/osd/osd.cc | 17 ++--- src/crimson/osd/osd.h | 67 ------------------- src/crimson/osd/osd_operation.h | 4 +- .../compound_peering_request.cc | 15 +++-- .../osd_operations/compound_peering_request.h | 6 +- src/crimson/osd/pg_shard_manager.h | 67 ++++++++++++++++++- 6 files changed, 87 insertions(+), 89 deletions(-) diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 28a939d24d6d8..c940196426ef6 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -678,7 +678,7 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) return handle_osd_op(conn, boost::static_pointer_cast(m)); case MSG_OSD_PG_CREATE2: shard_services.start_operation( - *this, + pg_shard_manager, conn, m); return seastar::now(); @@ -968,7 +968,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn, Ref m) { - (void) start_pg_operation( + (void) pg_shard_manager.start_pg_operation( *this, conn, std::move(m)); @@ -980,7 +980,7 @@ seastar::future<> OSD::handle_update_log_missing( Ref m) { m->decode_payload(); - (void) start_pg_operation( + (void) pg_shard_manager.start_pg_operation( std::move(conn), std::move(m)); return seastar::now(); @@ -991,7 +991,7 @@ seastar::future<> OSD::handle_update_log_missing_reply( Ref m) { m->decode_payload(); - (void) start_pg_operation( + (void) pg_shard_manager.start_pg_operation( std::move(conn), std::move(m)); return seastar::now(); @@ -1027,7 +1027,7 @@ seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn, Ref m) { m->finish_decode(); - std::ignore = start_pg_operation( + std::ignore = pg_shard_manager.start_pg_operation( std::move(conn), std::move(m)); return seastar::now(); @@ -1061,7 +1061,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn, pg_shard_t from_shard{static_cast(m->get_source().num()), pgid.shard}; PeeringState::RequestScrub scrub_request{m->deep, m->repair}; - return start_pg_operation( + return pg_shard_manager.start_pg_operation( conn, from_shard, pgid, @@ -1081,7 +1081,8 @@ seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn, seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn, Ref m) { - std::ignore = start_pg_operation(conn, std::move(m)); + std::ignore = pg_shard_manager.start_pg_operation( + conn, std::move(m)); return seastar::now(); } @@ -1171,7 +1172,7 @@ seastar::future<> OSD::handle_peering_op( const int from = m->get_source().num(); logger().debug("handle_peering_op on {} from {}", m->get_spg(), from); std::unique_ptr evt(m->get_event()); - (void) start_pg_operation( + (void) pg_shard_manager.start_pg_operation( conn, pg_shard_t{from, m->get_spg().shard}, m->get_spg(), diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index ca7e59a62afb8..18ddd387b2f83 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -209,73 +209,6 @@ private: public: seastar::future<> send_beacon(); - template - auto start_pg_operation(Args&&... args) { - auto op = shard_services.get_registry().create_operation( - std::forward(args)...); - auto &logger = crimson::get_logger(ceph_subsys_osd); - logger.debug("{}: starting {}", *op, __func__); - auto &opref = *op; - - auto fut = opref.template enter_stage<>( - opref.get_connection_pipeline().await_active - ).then([this, &opref, &logger] { - logger.debug("{}: start_pg_operation in await_active stage", opref); - return pg_shard_manager.when_active(); - }).then([&logger, &opref] { - logger.debug("{}: start_pg_operation active, entering await_map", opref); - return opref.template enter_stage<>( - opref.get_connection_pipeline().await_map); - }).then([this, &logger, &opref] { - logger.debug("{}: start_pg_operation await_map stage", opref); - using OSDMapBlockingEvent = - OSD_OSDMapGate::OSDMapBlocker::BlockingEvent; - return opref.template with_blocking_event( - [this, &opref](auto &&trigger) { - return pg_shard_manager.wait_for_map( - std::move(trigger), - opref.get_epoch(), - &shard_services - ); - }); - }).then([&logger, &opref](auto epoch) { - logger.debug("{}: got map {}, entering get_pg", opref, epoch); - return opref.template enter_stage<>( - opref.get_connection_pipeline().get_pg); - }).then([this, &logger, &opref] { - logger.debug("{}: in get_pg", opref); - if constexpr (T::can_create()) { - logger.debug("{}: can_create", opref); - return opref.template with_blocking_event< - PGMap::PGCreationBlockingEvent - >([this, &opref](auto &&trigger) { - std::ignore = this; // avoid clang warning - return pg_shard_manager.get_or_create_pg( - pg_shard_manager, - pg_shard_manager.get_shard_services(), - std::move(trigger), - opref.get_pgid(), opref.get_epoch(), - std::move(opref.get_create_info())); - }); - } else { - logger.debug("{}: !can_create", opref); - return opref.template with_blocking_event< - PGMap::PGCreationBlockingEvent - >([this, &opref](auto &&trigger) { - std::ignore = this; // avoid clang warning - return pg_shard_manager.wait_for_pg( - std::move(trigger), opref.get_pgid()); - }); - } - }).then([this, &logger, &opref](Ref pgref) { - logger.debug("{}: have_pg", opref); - return opref.with_pg(shard_services, pgref); - }).then([op] { /* Retain refcount on op until completion */ }); - - return std::make_pair(std::move(op), std::move(fut)); - } - - private: LogClient log_client; LogChannelRef clog; diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 37da9c5633fdb..bea30f603c511 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -177,9 +177,9 @@ protected: template friend class crimson::os::seastore::OperationProxyT; - // OSD::start_pg_operation needs access to enter_stage, we can make this + // PGShardManager::start_pg_operation needs access to enter_stage, we can make this // more sophisticated later on - friend class OSD; + friend class PGShardManager; }; /** diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc index b630fc2b76002..5b32454c40fe3 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.cc +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -11,7 +11,7 @@ #include "crimson/common/exception.h" #include "crimson/osd/pg.h" -#include "crimson/osd/osd.h" +#include "crimson/osd/pg_shard_manager.h" #include "crimson/osd/osd_operation_external_tracking.h" #include "crimson/osd/osd_operations/compound_peering_request.h" @@ -61,7 +61,7 @@ public: }; std::vector handle_pg_create( - OSD &osd, + PGShardManager &pg_shard_manager, crimson::net::ConnectionRef conn, compound_state_ref state, Ref m) @@ -85,7 +85,7 @@ std::vector handle_pg_create( pgid, m->epoch, pi, history); } else { - auto op = osd.start_pg_operation( + auto op = pg_shard_manager.start_pg_operation( state, conn, pg_shard_t(), @@ -106,9 +106,9 @@ std::vector handle_pg_create( namespace crimson::osd { CompoundPeeringRequest::CompoundPeeringRequest( - OSD &osd, + PGShardManager &pg_shard_manager, crimson::net::ConnectionRef conn, Ref m) - : osd(osd), + : pg_shard_manager(pg_shard_manager), conn(conn), m(m) {} @@ -132,7 +132,7 @@ seastar::future<> CompoundPeeringRequest::start() [&] { assert((m->get_type() == MSG_OSD_PG_CREATE2)); return handle_pg_create( - osd, + pg_shard_manager, conn, state, boost::static_pointer_cast(m)); @@ -146,7 +146,8 @@ seastar::future<> CompoundPeeringRequest::start() return trigger.maybe_record_blocking(state->promise.get_future(), *blocker); }).then([this, blocker=std::move(blocker)](auto &&ctx) { logger().info("{}: sub events complete", *this); - return osd.get_shard_services().dispatch_context_messages(std::move(ctx)); + return pg_shard_manager.get_shard_services( + ).dispatch_context_messages(std::move(ctx)); }).then([this, ref=std::move(ref)] { track_event(); logger().info("{}: complete", *this); diff --git a/src/crimson/osd/osd_operations/compound_peering_request.h b/src/crimson/osd/osd_operations/compound_peering_request.h index 5300095082a1d..4e45de82353f8 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.h +++ b/src/crimson/osd/osd_operations/compound_peering_request.h @@ -13,7 +13,7 @@ namespace crimson::osd { -class OSD; +class PGShardManager; class PG; using osd_id_t = int; @@ -43,13 +43,13 @@ public: }; private: - OSD &osd; + PGShardManager &pg_shard_manager; crimson::net::ConnectionRef conn; Ref m; public: CompoundPeeringRequest( - OSD &osd, crimson::net::ConnectionRef conn, Ref m); + PGShardManager &pg_shard_manager, crimson::net::ConnectionRef conn, Ref m); void print(std::ostream &) const final; void dump_detail(Formatter *f) const final; diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index 1f759ac97b7a4..e473708df2133 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -84,8 +84,6 @@ public: FORWARD_TO_CORE(stop_pgs) FORWARD_CONST(get_pg_stats, get_pg_stats, core_state) - FORWARD_TO_CORE(get_or_create_pg) - FORWARD_TO_CORE(wait_for_pg) FORWARD_CONST(for_each_pg, for_each_pg, core_state) auto get_num_pgs() const { return core_state.pg_map.get_pgs().size(); } @@ -98,6 +96,71 @@ public: auto with_pg(spg_t pgid, F &&f) { return std::invoke(std::forward(f), core_state.get_pg(pgid)); } + + template + auto start_pg_operation(Args&&... args) { + auto op = local_state.registry.create_operation( + std::forward(args)...); + auto &logger = crimson::get_logger(ceph_subsys_osd); + logger.debug("{}: starting {}", *op, __func__); + auto &opref = *op; + + auto fut = opref.template enter_stage<>( + opref.get_connection_pipeline().await_active + ).then([this, &opref, &logger] { + logger.debug("{}: start_pg_operation in await_active stage", opref); + return core_state.osd_state.when_active(); + }).then([&logger, &opref] { + logger.debug("{}: start_pg_operation active, entering await_map", opref); + return opref.template enter_stage<>( + opref.get_connection_pipeline().await_map); + }).then([this, &logger, &opref] { + logger.debug("{}: start_pg_operation await_map stage", opref); + using OSDMapBlockingEvent = + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent; + return opref.template with_blocking_event( + [this, &opref](auto &&trigger) { + std::ignore = this; + return core_state.osdmap_gate.wait_for_map( + std::move(trigger), + opref.get_epoch(), + &shard_services); + }); + }).then([&logger, &opref](auto epoch) { + logger.debug("{}: got map {}, entering get_pg", opref, epoch); + return opref.template enter_stage<>( + opref.get_connection_pipeline().get_pg); + }).then([this, &logger, &opref] { + logger.debug("{}: in get_pg", opref); + if constexpr (T::can_create()) { + logger.debug("{}: can_create", opref); + return opref.template with_blocking_event< + PGMap::PGCreationBlockingEvent + >([this, &opref](auto &&trigger) { + std::ignore = this; // avoid clang warning + return core_state.get_or_create_pg( + *this, + shard_services, + std::move(trigger), + opref.get_pgid(), opref.get_epoch(), + std::move(opref.get_create_info())); + }); + } else { + logger.debug("{}: !can_create", opref); + return opref.template with_blocking_event< + PGMap::PGCreationBlockingEvent + >([this, &opref](auto &&trigger) { + std::ignore = this; // avoid clang warning + return core_state.wait_for_pg(std::move(trigger), opref.get_pgid()); + }); + } + }).then([this, &logger, &opref](Ref pgref) { + logger.debug("{}: have_pg", opref); + return opref.with_pg(get_shard_services(), pgref); + }).then([op] { /* Retain refcount on op until completion */ }); + + return std::make_pair(std::move(op), std::move(fut)); + } }; } -- 2.39.5