]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg_shard_manager: cleanups around the remote pg submission
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 14 Sep 2023 09:02:03 +0000 (17:02 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 2 Nov 2023 07:29:08 +0000 (15:29 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/osd/pg_shard_manager.h

index fb5e93392caf041bd3b35519a9adb7b9104a4e29..71a9cf1a9c804a562672775cda69c94a6b06aeae 100644 (file)
@@ -153,11 +153,11 @@ public:
       core_id_t core,
       typename T::IRef &&op,
       F &&f) {
+    ceph_assert(op->use_count() == 1);
     if (seastar::this_shard_id() == core) {
       auto &target_shard_services = shard_services.local();
       return std::invoke(
         std::move(f),
-        target_shard_services.local_state,
         target_shard_services,
         std::move(op));
     }
@@ -183,80 +183,54 @@ public:
   /// Runs opref on the appropriate core, creating the pg as necessary.
   template <typename T>
   seastar::future<> run_with_pg_maybe_create(
-    typename T::IRef op
+    typename T::IRef op,
+    ShardServices &target_shard_services
   ) {
-    ceph_assert(op->use_count() == 1);
-    auto &logger = crimson::get_logger(ceph_subsys_osd);
     static_assert(T::can_create());
-    logger.debug("{}: can_create", *op);
-
-    return get_pg_to_shard_mapping().maybe_create_pg(
-      op->get_pgid()
-    ).then([this, op = std::move(op)](auto core) mutable {
-      return this->template with_remote_shard_state_and_op<T>(
-        core, std::move(op),
-        [](ShardServices &shard_services,
-           typename T::IRef op) {
-       auto &logger = crimson::get_logger(ceph_subsys_osd);
-       auto &opref = *op;
-       return opref.template with_blocking_event<
-         PGMap::PGCreationBlockingEvent
-         >([&shard_services, &opref](
-             auto &&trigger) {
-           return shard_services.get_or_create_pg(
-             std::move(trigger),
-             opref.get_pgid(),
-             std::move(opref.get_create_info())
-           );
-         }).safe_then([&logger, &shard_services, &opref](Ref<PG> pgref) {
-           logger.debug("{}: have_pg", opref);
-           return opref.with_pg(shard_services, pgref);
-         }).handle_error(
-           crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
-             logger.debug("{}: pg creation canceled, dropping", opref);
-             return seastar::now();
-           })
-         ).then([op=std::move(op)] {});
-      });
-    });
+    auto &logger = crimson::get_logger(ceph_subsys_osd);
+    auto &opref = *op;
+    return opref.template with_blocking_event<
+      PGMap::PGCreationBlockingEvent
+    >([&target_shard_services, &opref](auto &&trigger) {
+      return target_shard_services.get_or_create_pg(
+        std::move(trigger),
+        opref.get_pgid(),
+        std::move(opref.get_create_info())
+      );
+    }).safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
+      logger.debug("{}: have_pg", opref);
+      return opref.with_pg(target_shard_services, pgref);
+    }).handle_error(
+      crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
+        logger.debug("{}: pg creation canceled, dropping", opref);
+        return seastar::now();
+      })
+    ).then([op=std::move(op)] {});
   }
 
   /// Runs opref on the appropriate core, waiting for pg as necessary
   template <typename T>
   seastar::future<> run_with_pg_maybe_wait(
-    typename T::IRef op
+    typename T::IRef op,
+    ShardServices &target_shard_services
   ) {
-    ceph_assert(op->use_count() == 1);
-    auto &logger = crimson::get_logger(ceph_subsys_osd);
     static_assert(!T::can_create());
-    logger.debug("{}: !can_create", *op);
-
-    return get_pg_to_shard_mapping().maybe_create_pg(
-      op->get_pgid()
-    ).then([this, op = std::move(op)](auto core) mutable {
-      return this->template with_remote_shard_state_and_op<T>(
-        core, std::move(op),
-        [](ShardServices &shard_services,
-           typename T::IRef op) {
-       auto &logger = crimson::get_logger(ceph_subsys_osd);
-       auto &opref = *op;
-       return opref.template with_blocking_event<
-         PGMap::PGCreationBlockingEvent
-         >([&shard_services, &opref](
-             auto &&trigger) {
-           return shard_services.wait_for_pg(
-             std::move(trigger), opref.get_pgid());
-         }).safe_then([&logger, &shard_services, &opref](Ref<PG> pgref) {
-           logger.debug("{}: have_pg", opref);
-           return opref.with_pg(shard_services, pgref);
-         }).handle_error(
-           crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
-             logger.debug("{}: pg creation canceled, dropping", opref);
-             return seastar::now();
-           })
-         ).then([op=std::move(op)] {});
-      });
-    });
+    auto &logger = crimson::get_logger(ceph_subsys_osd);
+    auto &opref = *op;
+    return opref.template with_blocking_event<
+      PGMap::PGCreationBlockingEvent
+    >([&target_shard_services, &opref](auto &&trigger) {
+      return target_shard_services.wait_for_pg(
+        std::move(trigger), opref.get_pgid());
+    }).safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
+      logger.debug("{}: have_pg", opref);
+      return opref.with_pg(target_shard_services, pgref);
+    }).handle_error(
+      crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
+        logger.debug("{}: pg creation canceled, dropping", opref);
+        return seastar::now();
+      })
+    ).then([op=std::move(op)] {});
   }
 
   seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
@@ -364,16 +338,23 @@ public:
       logger.debug("{}: got map {}, entering get_pg", opref, epoch);
       return opref.template enter_stage<>(
        opref.get_connection_pipeline().get_pg);
-    }).then([this, &logger, &opref, op=std::move(op)]() mutable {
-      logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id());
-      logger.debug("{}: in get_pg", opref);
-      if constexpr (T::can_create()) {
-       logger.debug("{}: can_create", opref);
-       return run_with_pg_maybe_create<T>(std::move(op));
-      } else {
-       logger.debug("{}: !can_create", opref);
-       return run_with_pg_maybe_wait<T>(std::move(op));
-      }
+    }).then([this, &opref] {
+      return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid());
+    }).then([this, &logger, op=std::move(op)](auto core) mutable {
+      logger.debug("{}: can_create={}, target-core={}",
+                   *op, T::can_create(), core);
+      return this->template with_remote_shard_state_and_op<T>(
+        core, std::move(op),
+        [this](ShardServices &target_shard_services,
+           typename T::IRef op) {
+        if constexpr (T::can_create()) {
+          return this->template run_with_pg_maybe_create<T>(
+              std::move(op), target_shard_services);
+        } else {
+          return this->template run_with_pg_maybe_wait<T>(
+              std::move(op), target_shard_services);
+        }
+      });
     });
     return std::make_pair(id, std::move(fut));
   }