]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: move start_pg_operation to pg_shard_manager 47089/head
authorSamuel Just <sjust@redhat.com>
Tue, 12 Jul 2022 23:35:50 +0000 (23:35 +0000)
committerSamuel Just <sjust@redhat.com>
Thu, 14 Jul 2022 00:58:29 +0000 (00:58 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/compound_peering_request.cc
src/crimson/osd/osd_operations/compound_peering_request.h
src/crimson/osd/pg_shard_manager.h

index 28a939d24d6d8fb72d0b9f5f9d4f25d8733c7873..c940196426ef6287670d6610b072b46e88fb625e 100644 (file)
@@ -678,7 +678,7 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
       return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
     case MSG_OSD_PG_CREATE2:
       shard_services.start_operation<CompoundPeeringRequest>(
-       *this,
+       pg_shard_manager,
        conn,
        m);
       return seastar::now();
@@ -968,7 +968,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
 seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
                                      Ref<MOSDOp> m)
 {
-  (void) start_pg_operation<ClientRequest>(
+  (void) pg_shard_manager.start_pg_operation<ClientRequest>(
     *this,
     conn,
     std::move(m));
@@ -980,7 +980,7 @@ seastar::future<> OSD::handle_update_log_missing(
   Ref<MOSDPGUpdateLogMissing> m)
 {
   m->decode_payload();
-  (void) start_pg_operation<LogMissingRequest>(
+  (void) pg_shard_manager.start_pg_operation<LogMissingRequest>(
     std::move(conn),
     std::move(m));
   return seastar::now();
@@ -991,7 +991,7 @@ seastar::future<> OSD::handle_update_log_missing_reply(
   Ref<MOSDPGUpdateLogMissingReply> m)
 {
   m->decode_payload();
-  (void) start_pg_operation<LogMissingRequestReply>(
+  (void) pg_shard_manager.start_pg_operation<LogMissingRequestReply>(
     std::move(conn),
     std::move(m));
   return seastar::now();
@@ -1027,7 +1027,7 @@ seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
                                     Ref<MOSDRepOp> m)
 {
   m->finish_decode();
-  std::ignore = start_pg_operation<RepRequest>(
+  std::ignore = pg_shard_manager.start_pg_operation<RepRequest>(
     std::move(conn),
     std::move(m));
   return seastar::now();
@@ -1061,7 +1061,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
     pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
                           pgid.shard};
     PeeringState::RequestScrub scrub_request{m->deep, m->repair};
-    return start_pg_operation<RemotePeeringEvent>(
+    return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
       conn,
       from_shard,
       pgid,
@@ -1081,7 +1081,8 @@ seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
 seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
                                   Ref<MOSDFastDispatchOp> m)
 {
-  std::ignore = start_pg_operation<RecoverySubRequest>(conn, std::move(m));
+  std::ignore = pg_shard_manager.start_pg_operation<RecoverySubRequest>(
+    conn, std::move(m));
   return seastar::now();
 }
 
@@ -1171,7 +1172,7 @@ seastar::future<> OSD::handle_peering_op(
   const int from = m->get_source().num();
   logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
   std::unique_ptr<PGPeeringEvent> evt(m->get_event());
-  (void) start_pg_operation<RemotePeeringEvent>(
+  (void) pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
     conn,
     pg_shard_t{from, m->get_spg().shard},
     m->get_spg(),
index ca7e59a62afb80e4236ef8aae453f3e12aef93e6..18ddd387b2f83879062934277d5357cdeade9037 100644 (file)
@@ -209,73 +209,6 @@ private:
 public:
   seastar::future<> send_beacon();
 
-  template <typename T, typename... Args>
-  auto start_pg_operation(Args&&... args) {
-    auto op = shard_services.get_registry().create_operation<T>(
-      std::forward<Args>(args)...);
-    auto &logger = crimson::get_logger(ceph_subsys_osd);
-    logger.debug("{}: starting {}", *op, __func__);
-    auto &opref = *op;
-
-    auto fut = opref.template enter_stage<>(
-      opref.get_connection_pipeline().await_active
-    ).then([this, &opref, &logger] {
-      logger.debug("{}: start_pg_operation in await_active stage", opref);
-      return pg_shard_manager.when_active();
-    }).then([&logger, &opref] {
-      logger.debug("{}: start_pg_operation active, entering await_map", opref);
-      return opref.template enter_stage<>(
-       opref.get_connection_pipeline().await_map);
-    }).then([this, &logger, &opref] {
-      logger.debug("{}: start_pg_operation await_map stage", opref);
-      using OSDMapBlockingEvent =
-       OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
-      return opref.template with_blocking_event<OSDMapBlockingEvent>(
-       [this, &opref](auto &&trigger) {
-         return pg_shard_manager.wait_for_map(
-           std::move(trigger),
-           opref.get_epoch(),
-           &shard_services
-         );
-       });
-    }).then([&logger, &opref](auto epoch) {
-      logger.debug("{}: got map {}, entering get_pg", opref, epoch);
-      return opref.template enter_stage<>(
-       opref.get_connection_pipeline().get_pg);
-    }).then([this, &logger, &opref] {
-      logger.debug("{}: in get_pg", opref);
-      if constexpr (T::can_create()) {
-       logger.debug("{}: can_create", opref);
-       return opref.template with_blocking_event<
-         PGMap::PGCreationBlockingEvent
-         >([this, &opref](auto &&trigger) {
-           std::ignore = this; // avoid clang warning
-           return pg_shard_manager.get_or_create_pg(
-             pg_shard_manager,
-             pg_shard_manager.get_shard_services(),
-             std::move(trigger),
-             opref.get_pgid(), opref.get_epoch(),
-             std::move(opref.get_create_info()));
-         });
-      } else {
-       logger.debug("{}: !can_create", opref);
-       return opref.template with_blocking_event<
-         PGMap::PGCreationBlockingEvent
-         >([this, &opref](auto &&trigger) {
-           std::ignore = this; // avoid clang warning
-           return pg_shard_manager.wait_for_pg(
-             std::move(trigger), opref.get_pgid());
-         });
-      }
-    }).then([this, &logger, &opref](Ref<PG> pgref) {
-      logger.debug("{}: have_pg", opref);
-      return opref.with_pg(shard_services, pgref);
-    }).then([op] { /* Retain refcount on op until completion */ });
-
-    return std::make_pair(std::move(op), std::move(fut));
-  }
-
-
 private:
   LogClient log_client;
   LogChannelRef clog;
index 37da9c5633fdbcc17c2e8992a5f86fd9e9d9e930..bea30f603c511caaee1f4be3b4682fe871ca1479 100644 (file)
@@ -177,9 +177,9 @@ protected:
   template <class OpT>
   friend class crimson::os::seastore::OperationProxyT;
 
-  // OSD::start_pg_operation needs access to enter_stage, we can make this
+  // PGShardManager::start_pg_operation needs access to enter_stage, we can make this
   // more sophisticated later on
-  friend class OSD;
+  friend class PGShardManager;
 };
 
 /**
index b630fc2b760023ef9db93fa88fdbe514a93ae894..5b32454c40fe3f6fcdd342948894bd21073685b9 100644 (file)
@@ -11,7 +11,7 @@
 
 #include "crimson/common/exception.h"
 #include "crimson/osd/pg.h"
-#include "crimson/osd/osd.h"
+#include "crimson/osd/pg_shard_manager.h"
 #include "crimson/osd/osd_operation_external_tracking.h"
 #include "crimson/osd/osd_operations/compound_peering_request.h"
 
@@ -61,7 +61,7 @@ public:
 };
 
 std::vector<crimson::OperationRef> handle_pg_create(
-  OSD &osd,
+  PGShardManager &pg_shard_manager,
   crimson::net::ConnectionRef conn,
   compound_state_ref state,
   Ref<MOSDPGCreate2> m)
@@ -85,7 +85,7 @@ std::vector<crimson::OperationRef> handle_pg_create(
         pgid, m->epoch,
         pi, history);
     } else {
-      auto op = osd.start_pg_operation<PeeringSubEvent>(
+      auto op = pg_shard_manager.start_pg_operation<PeeringSubEvent>(
          state,
          conn,
          pg_shard_t(),
@@ -106,9 +106,9 @@ std::vector<crimson::OperationRef> handle_pg_create(
 namespace crimson::osd {
 
 CompoundPeeringRequest::CompoundPeeringRequest(
-  OSD &osd,
+  PGShardManager &pg_shard_manager,
   crimson::net::ConnectionRef conn, Ref<Message> m)
-  : osd(osd),
+  : pg_shard_manager(pg_shard_manager),
     conn(conn),
     m(m)
 {}
@@ -132,7 +132,7 @@ seastar::future<> CompoundPeeringRequest::start()
     [&] {
       assert((m->get_type() == MSG_OSD_PG_CREATE2));
       return handle_pg_create(
-        osd,
+       pg_shard_manager,
        conn,
        state,
        boost::static_pointer_cast<MOSDPGCreate2>(m));
@@ -146,7 +146,8 @@ seastar::future<> CompoundPeeringRequest::start()
       return trigger.maybe_record_blocking(state->promise.get_future(), *blocker);
     }).then([this, blocker=std::move(blocker)](auto &&ctx) {
       logger().info("{}: sub events complete", *this);
-      return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+      return pg_shard_manager.get_shard_services(
+      ).dispatch_context_messages(std::move(ctx));
     }).then([this, ref=std::move(ref)] {
       track_event<CompletionEvent>();
       logger().info("{}: complete", *this);
index 5300095082a1d0f49955cd0fa76298ec946bcf9e..4e45de82353f825cf4ac727e720837e1503aa286 100644 (file)
@@ -13,7 +13,7 @@
 
 namespace crimson::osd {
 
-class OSD;
+class PGShardManager;
 class PG;
 
 using osd_id_t = int;
@@ -43,13 +43,13 @@ public:
   };
 
 private:
-  OSD &osd;
+  PGShardManager &pg_shard_manager;
   crimson::net::ConnectionRef conn;
   Ref<Message> m;
 
 public:
   CompoundPeeringRequest(
-    OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m);
+    PGShardManager &pg_shard_manager, crimson::net::ConnectionRef conn, Ref<Message> m);
 
   void print(std::ostream &) const final;
   void dump_detail(Formatter *f) const final;
index 1f759ac97b7a4de31f42665b4271aae2a2165a38..e473708df2133bd222fe95c4bcd0c57f2f438ecf 100644 (file)
@@ -84,8 +84,6 @@ public:
   FORWARD_TO_CORE(stop_pgs)
   FORWARD_CONST(get_pg_stats, get_pg_stats, core_state)
 
-  FORWARD_TO_CORE(get_or_create_pg)
-  FORWARD_TO_CORE(wait_for_pg)
   FORWARD_CONST(for_each_pg, for_each_pg, core_state)
   auto get_num_pgs() const { return core_state.pg_map.get_pgs().size(); }
 
@@ -98,6 +96,71 @@ public:
   auto with_pg(spg_t pgid, F &&f) {
     return std::invoke(std::forward<F>(f), core_state.get_pg(pgid));
   }
+
+  template <typename T, typename... Args>
+  auto start_pg_operation(Args&&... args) {
+    auto op = local_state.registry.create_operation<T>(
+      std::forward<Args>(args)...);
+    auto &logger = crimson::get_logger(ceph_subsys_osd);
+    logger.debug("{}: starting {}", *op, __func__);
+    auto &opref = *op;
+
+    auto fut = opref.template enter_stage<>(
+      opref.get_connection_pipeline().await_active
+    ).then([this, &opref, &logger] {
+      logger.debug("{}: start_pg_operation in await_active stage", opref);
+      return core_state.osd_state.when_active();
+    }).then([&logger, &opref] {
+      logger.debug("{}: start_pg_operation active, entering await_map", opref);
+      return opref.template enter_stage<>(
+       opref.get_connection_pipeline().await_map);
+    }).then([this, &logger, &opref] {
+      logger.debug("{}: start_pg_operation await_map stage", opref);
+      using OSDMapBlockingEvent =
+       OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
+      return opref.template with_blocking_event<OSDMapBlockingEvent>(
+       [this, &opref](auto &&trigger) {
+         std::ignore = this;
+         return core_state.osdmap_gate.wait_for_map(
+           std::move(trigger),
+           opref.get_epoch(),
+           &shard_services);
+       });
+    }).then([&logger, &opref](auto epoch) {
+      logger.debug("{}: got map {}, entering get_pg", opref, epoch);
+      return opref.template enter_stage<>(
+       opref.get_connection_pipeline().get_pg);
+    }).then([this, &logger, &opref] {
+      logger.debug("{}: in get_pg", opref);
+      if constexpr (T::can_create()) {
+       logger.debug("{}: can_create", opref);
+       return opref.template with_blocking_event<
+         PGMap::PGCreationBlockingEvent
+         >([this, &opref](auto &&trigger) {
+           std::ignore = this; // avoid clang warning
+           return core_state.get_or_create_pg(
+             *this,
+             shard_services,
+             std::move(trigger),
+             opref.get_pgid(), opref.get_epoch(),
+             std::move(opref.get_create_info()));
+         });
+      } else {
+       logger.debug("{}: !can_create", opref);
+       return opref.template with_blocking_event<
+         PGMap::PGCreationBlockingEvent
+         >([this, &opref](auto &&trigger) {
+           std::ignore = this; // avoid clang warning
+           return core_state.wait_for_pg(std::move(trigger), opref.get_pgid());
+         });
+      }
+    }).then([this, &logger, &opref](Ref<PG> pgref) {
+      logger.debug("{}: have_pg", opref);
+      return opref.with_pg(get_shard_services(), pgref);
+    }).then([op] { /* Retain refcount on op until completion */ });
+
+    return std::make_pair(std::move(op), std::move(fut));
+  }
 };
 
 }