]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: enable PGShardMapping access from all cores
authorchunmei <chunmei.liu@intel.com>
Fri, 31 Mar 2023 02:49:33 +0000 (02:49 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Tue, 17 Oct 2023 16:18:11 +0000 (16:18 +0000)
Previously, all accesses (incuding loookups) had to occur on core 0.  Now that
we want to be able to dispatch from all cores, we need PGShardManager to be
accessible from all cores.  To that end, we now proxy updates to core 0, but maintain
local copies of the map so that cores can perform local lookups.

Signed-off-by: chunmei <chunmei.liu@intel.com>
(cherry picked from commit a3052969bfcfd9329b2baf7883765642d09ff038)

src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/pg_map.h
src/crimson/osd/pg_shard_manager.cc
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.h

index 9148f26a3d71f1a2f4290f96294431cbdd5768d6..c597fe92f7fa2ebc8f54ea6fd45a8ce12723f7aa 100644 (file)
@@ -357,13 +357,16 @@ seastar::future<> OSD::start()
   startup_time = ceph::mono_clock::now();
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
   return store.start().then([this] {
-    return osd_singleton_state.start_single(
-      whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
-      std::ref(*monc), std::ref(*mgrc)
+    return pg_to_shard_mappings.start(0, seastar::smp::count
     ).then([this] {
+      return osd_singleton_state.start_single(
+        whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
+        std::ref(*monc), std::ref(*mgrc));
+    }).then([this] {
       ceph::mono_time startup_time = ceph::mono_clock::now();
       return shard_services.start(
         std::ref(osd_singleton_state),
+        std::ref(pg_to_shard_mappings),
         whoami,
         startup_time,
         osd_singleton_state.local().perf,
@@ -373,7 +376,8 @@ seastar::future<> OSD::start()
       return shard_dispatchers.start(
         std::ref(*this),
         whoami,
-        std::ref(store));
+        std::ref(store),
+        std::ref(pg_to_shard_mappings));
     });
   }).then([this] {
     heartbeat.reset(new Heartbeat{
@@ -681,6 +685,8 @@ seastar::future<> OSD::stop()
       return shard_services.stop();
     }).then([this] {
       return osd_singleton_state.stop();
+    }).then([this] {
+      return pg_to_shard_mappings.stop();
     }).then([fut=std::move(gate_close_fut)]() mutable {
       return std::move(fut);
     }).then([this] {
index 1093cab34105d1caf3169bedbc0cf721e7474f6f..4f811c059ad65b14ec76a2e89c63c961fbe1bd06 100644 (file)
@@ -69,8 +69,10 @@ public:
     ShardDispatcher(
       OSD& osd,
       int whoami,
-      crimson::os::FuturizedStore& store)
-    : pg_shard_manager(osd.osd_singleton_state, osd.shard_services),
+      crimson::os::FuturizedStore& store,
+      PGShardMapping& pg_to_shard_mapping)
+    : pg_shard_manager(osd.osd_singleton_state,
+                       osd.shard_services, pg_to_shard_mapping),
       osd(osd),
       whoami(whoami),
       store(store) {}
@@ -185,6 +187,7 @@ public:
   void handle_authentication(const EntityName& name,
                             const AuthCapsInfo& caps) final;
 
+  seastar::sharded<PGShardMapping> pg_to_shard_mappings;
   seastar::sharded<OSDSingletonState> osd_singleton_state;
   seastar::sharded<ShardServices> shard_services;
   seastar::sharded<ShardDispatcher> shard_dispatchers;
index f4b38ae45f6c81cb97fdcd059e2b7a19a411697b..3269de43497f24dfc76163a84bef012199fa9c5a 100644 (file)
@@ -21,9 +21,11 @@ class PG;
 /**
  * PGShardMapping
  *
- * Maps pgs to shards.
+ * Maintains a mapping from spg_t to the core containing that PG.  Internally, each
+ * core has a local copy of the mapping to enable core-local lookups.  Updates
+ * are proxied to core 0, and the back out to all other cores -- see maybe_create_pg.
  */
-class PGShardMapping {
+class PGShardMapping : public seastar::peering_sharded_service<PGShardMapping> {
 public:
   /// Returns mapping if present, NULL_CORE otherwise
   core_id_t get_pg_mapping(spg_t pgid) {
@@ -33,44 +35,69 @@ public:
   }
 
   /// Returns mapping for pgid, creates new one if it doesn't already exist
-  core_id_t maybe_create_pg(spg_t pgid, core_id_t core = NULL_CORE) {
-    auto [insert_iter, inserted] = pg_to_core.emplace(pgid, core);
-    if (!inserted) {
-      ceph_assert_always(insert_iter->second != NULL_CORE);
+  seastar::future<core_id_t> maybe_create_pg(
+    spg_t pgid,
+    core_id_t core = NULL_CORE) {
+    auto find_iter = pg_to_core.find(pgid);
+    if (find_iter != pg_to_core.end()) {
+      ceph_assert_always(find_iter->second != NULL_CORE);
       if (core != NULL_CORE) {
-       ceph_assert_always(insert_iter->second == core);
+        ceph_assert_always(find_iter->second == core);
       }
-      return insert_iter->second;
+      return seastar::make_ready_future<core_id_t>(find_iter->second);
     } else {
-      ceph_assert_always(core_to_num_pgs.size() > 0);
-      std::map<core_id_t, unsigned>::iterator core_iter;
-      if (core == NULL_CORE) {
-        core_iter = std::min_element(
-          core_to_num_pgs.begin(),
-          core_to_num_pgs.end(),
-          [](const auto &left, const auto &right) {
-            return left.second < right.second;
+      return container().invoke_on(0,[pgid, core]
+        (auto &primary_mapping) {
+        auto [insert_iter, inserted] = primary_mapping.pg_to_core.emplace(pgid, core);
+        ceph_assert_always(inserted);
+        ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0);
+        std::map<core_id_t, unsigned>::iterator core_iter;
+        if (core == NULL_CORE) {
+          core_iter = std::min_element(
+            primary_mapping.core_to_num_pgs.begin(),
+            primary_mapping.core_to_num_pgs.end(),
+              [](const auto &left, const auto &right) {
+              return left.second < right.second;
+          });
+        } else {
+          core_iter = primary_mapping.core_to_num_pgs.find(core);
+        }
+        ceph_assert_always(primary_mapping.core_to_num_pgs.end() != core_iter);
+        insert_iter->second = core_iter->first;
+        core_iter->second++;
+        return primary_mapping.container().invoke_on_others(
+          [pgid = insert_iter->first, core = insert_iter->second]
+          (auto &other_mapping) {
+          ceph_assert_always(core != NULL_CORE);
+          auto [insert_iter, inserted] = other_mapping.pg_to_core.emplace(pgid, core);
+          ceph_assert_always(inserted);
         });
-      } else {
-       core_iter = core_to_num_pgs.find(core);
-      }
-      ceph_assert_always(core_to_num_pgs.end() != core_iter);
-      insert_iter->second = core_iter->first;
-      core_iter->second++;
-      return insert_iter->second;
+      }).then([this, pgid] {
+        auto find_iter = pg_to_core.find(pgid);
+        return seastar::make_ready_future<core_id_t>(find_iter->second);
+      });
     }
   }
 
   /// Remove pgid
-  void remove_pg(spg_t pgid) {
-    auto iter = pg_to_core.find(pgid);
-    ceph_assert_always(iter != pg_to_core.end());
-    ceph_assert_always(iter->second != NULL_CORE);
-    auto count_iter = core_to_num_pgs.find(iter->second);
-    ceph_assert_always(count_iter != core_to_num_pgs.end());
-    ceph_assert_always(count_iter->second > 0);
-    --(count_iter->second);
-    pg_to_core.erase(iter);
+  seastar::future<> remove_pg(spg_t pgid) {
+    return container().invoke_on(0, [pgid](auto &primary_mapping) {
+      auto iter = primary_mapping.pg_to_core.find(pgid);
+      ceph_assert_always(iter != primary_mapping.pg_to_core.end());
+      ceph_assert_always(iter->second != NULL_CORE);
+      auto count_iter = primary_mapping.core_to_num_pgs.find(iter->second);
+      ceph_assert_always(count_iter != primary_mapping.core_to_num_pgs.end());
+      ceph_assert_always(count_iter->second > 0);
+      --(count_iter->second);
+      primary_mapping.pg_to_core.erase(iter);
+      return primary_mapping.container().invoke_on_others(
+        [pgid](auto &other_mapping) {
+        auto iter = other_mapping.pg_to_core.find(pgid);
+        ceph_assert_always(iter != other_mapping.pg_to_core.end());
+        ceph_assert_always(iter->second != NULL_CORE);
+        other_mapping.pg_to_core.erase(iter);
+      });
+    });
   }
 
   size_t get_num_pgs() const { return pg_to_core.size(); }
index e0ef237489145a3f679f659d62639581c01d0e27..e586a40890af855d3adf7127618227c18fbc1b55 100644 (file)
@@ -23,12 +23,12 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
         auto[coll, shard_core] = coll_core;
        spg_t pgid;
        if (coll.is_pg(&pgid)) {
-         auto core = get_osd_singleton_state(
-         ).pg_to_shard_mapping.maybe_create_pg(
-           pgid, shard_core);
-         return with_remote_shard_state(
-           core,
-           [pgid](
+          return pg_to_shard_mapping.maybe_create_pg(
+            pgid, shard_core
+          ).then([this, pgid] (auto core) {
+            return this->template with_remote_shard_state(
+              core,
+              [pgid](
              PerShardState &per_shard_state,
              ShardServices &shard_services) {
              return shard_services.load_pg(
@@ -39,6 +39,7 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
                return seastar::now();
              });
            });
+          });
        } else if (coll.is_temp(&pgid)) {
          logger().warn(
            "found temp collection on crimson osd, should be impossible: {}",
index 9526ddcd05af55c4fb38311c078f736473e71629..8333b1f483b6417ca02b9fdab4c7640247d25e18 100644 (file)
@@ -26,6 +26,7 @@ namespace crimson::osd {
 class PGShardManager {
   seastar::sharded<OSDSingletonState> &osd_singleton_state;
   seastar::sharded<ShardServices> &shard_services;
+  PGShardMapping &pg_to_shard_mapping;
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
   template <typename... Args>                                  \
@@ -48,9 +49,11 @@ public:
 
   PGShardManager(
     seastar::sharded<OSDSingletonState> &osd_singleton_state,
-    seastar::sharded<ShardServices> &shard_services)
+    seastar::sharded<ShardServices> &shard_services,
+    PGShardMapping &pg_to_shard_mapping)
   : osd_singleton_state(osd_singleton_state),
-    shard_services(shard_services) {}
+    shard_services(shard_services),
+    pg_to_shard_mapping(pg_to_shard_mapping) {}
 
   auto &get_osd_singleton_state() {
     ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
@@ -188,15 +191,15 @@ public:
     static_assert(T::can_create());
     logger.debug("{}: can_create", *op);
 
-    auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
-      op->get_pgid());
-
     get_local_state().registry.remove_from_registry(*op);
-    return with_remote_shard_state_and_op<T>(
-      core, std::move(op),
-      [](PerShardState &per_shard_state,
-         ShardServices &shard_services,
-         typename T::IRef op) {
+    return pg_to_shard_mapping.maybe_create_pg(
+      op->get_pgid()
+    ).then([this, op = std::move(op)](auto core) mutable {
+      return this->template with_remote_shard_state_and_op<T>(
+        core, std::move(op),
+        [](PerShardState &per_shard_state,
+           ShardServices &shard_services,
+           typename T::IRef op) {
        per_shard_state.registry.add_to_registry(*op);
        auto &logger = crimson::get_logger(ceph_subsys_osd);
        auto &opref = *op;
@@ -219,6 +222,7 @@ public:
            })
          ).then([op=std::move(op)] {});
       });
+    });
   }
 
   /// Runs opref on the appropriate core, waiting for pg as necessary
@@ -232,15 +236,15 @@ public:
     static_assert(!T::can_create());
     logger.debug("{}: !can_create", *op);
 
-     auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
-      op->get_pgid());
-
     get_local_state().registry.remove_from_registry(*op);
-    return with_remote_shard_state_and_op<T>(
-      core, std::move(op),
-      [](PerShardState &per_shard_state,
-         ShardServices &shard_services,
-         typename T::IRef op) {
+    return pg_to_shard_mapping.maybe_create_pg(
+      op->get_pgid()
+    ).then([this, op = std::move(op)](auto core) mutable {
+      return this->template with_remote_shard_state_and_op<T>(
+        core, std::move(op),
+        [](PerShardState &per_shard_state,
+           ShardServices &shard_services,
+           typename T::IRef op) {
        per_shard_state.registry.add_to_registry(*op);
        auto &logger = crimson::get_logger(ceph_subsys_osd);
        auto &opref = *op;
@@ -260,6 +264,7 @@ public:
            })
          ).then([op=std::move(op)] {});
       });
+    });
   }
 
   seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
@@ -308,20 +313,19 @@ public:
    */
   template <typename F>
   void for_each_pgid(F &&f) const {
-    return get_osd_singleton_state().pg_to_shard_mapping.for_each_pgid(
+    return pg_to_shard_mapping.for_each_pgid(
       std::forward<F>(f));
   }
 
   auto get_num_pgs() const {
-    return get_osd_singleton_state().pg_to_shard_mapping.get_num_pgs();
+    return pg_to_shard_mapping.get_num_pgs();
   }
 
   seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
 
   template <typename F>
   auto with_pg(spg_t pgid, F &&f) {
-    core_id_t core = get_osd_singleton_state(
-    ).pg_to_shard_mapping.get_pg_mapping(pgid);
+    core_id_t core = pg_to_shard_mapping.get_pg_mapping(pgid);
     return with_remote_shard_state(
       core,
       [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
index d73f6b350271ccd65010d9c3ecf4fb10f668b9d9..c29cb83769f8d1ba45c50303bd4bb78fa49c0a9d 100644 (file)
@@ -281,9 +281,6 @@ private:
   void requeue_pg_temp();
   seastar::future<> send_pg_temp();
 
-  // TODO: add config to control mapping
-  PGShardMapping pg_to_shard_mapping{0, seastar::smp::count};
-
   std::set<pg_t> pg_created;
   seastar::future<> send_pg_created(pg_t pgid);
   seastar::future<> send_pg_created();
@@ -327,6 +324,7 @@ class ShardServices : public OSDMapService {
 
   PerShardState local_state;
   seastar::sharded<OSDSingletonState> &osd_singleton_state;
+  PGShardMapping& pg_to_shard_mapping;
 
   template <typename F, typename... Args>
   auto with_singleton(F &&f, Args&&... args) {
@@ -369,9 +367,11 @@ public:
   template <typename... PSSArgs>
   ShardServices(
     seastar::sharded<OSDSingletonState> &osd_singleton_state,
+    PGShardMapping& pg_to_shard_mapping,
     PSSArgs&&... args)
     : local_state(std::forward<PSSArgs>(args)...),
-      osd_singleton_state(osd_singleton_state) {}
+      osd_singleton_state(osd_singleton_state),
+      pg_to_shard_mapping(pg_to_shard_mapping) {}
 
   FORWARD_TO_OSD_SINGLETON(send_to_osd)
 
@@ -381,10 +381,7 @@ public:
 
   auto remove_pg(spg_t pgid) {
     local_state.pg_map.remove_pg(pgid);
-    return with_singleton(
-      [pgid](auto &osstate) {
-      osstate.pg_to_shard_mapping.remove_pg(pgid);
-    });
+    return pg_to_shard_mapping.remove_pg(pgid);
   }
 
   crimson::common::CephContext *get_cct() {