]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: refactor pg management for multicore
authorSamuel Just <sjust@redhat.com>
Thu, 1 Sep 2022 23:22:59 +0000 (23:22 +0000)
committerSamuel Just <sjust@redhat.com>
Tue, 27 Sep 2022 02:33:01 +0000 (19:33 -0700)
OSDSingletonState will now only be responsible for the spg_t->core
mapping for pgs, the individual PerShardState's will hold local
PGMap's.  PG management operations are now proxied from PGShardManager
to PerShardState.  Subsequent patches will shard PerShardState.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/osd_operations/pg_advance_map.h
src/crimson/osd/pg_map.h
src/crimson/osd/pg_shard_manager.cc
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 140dd63fb7e7cec9b689755489af7a6e06e6eb64..0bc7678642b8991e7588ffcf065d9e4ec0445b26 100644 (file)
@@ -6,8 +6,8 @@
 #include "include/types.h"
 #include "common/Formatter.h"
 #include "crimson/osd/pg.h"
-#include "crimson/osd/pg_shard_manager.h"
 #include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/shard_services.h"
 #include "crimson/osd/osd_operations/pg_advance_map.h"
 #include "crimson/osd/osd_operation_external_tracking.h"
 #include "osd/PeeringState.h"
@@ -21,9 +21,9 @@ namespace {
 namespace crimson::osd {
 
 PGAdvanceMap::PGAdvanceMap(
-  PGShardManager &shard_manager, Ref<PG> pg, epoch_t to,
+  ShardServices &shard_services, Ref<PG> pg, epoch_t to,
   PeeringCtx &&rctx, bool do_init)
-  : shard_manager(shard_manager), pg(pg), from(std::nullopt), to(to),
+  : shard_services(shard_services), pg(pg), to(to),
     rctx(std::move(rctx)), do_init(do_init) {}
 
 PGAdvanceMap::~PGAdvanceMap() {}
@@ -71,7 +71,7 @@ seastar::future<> PGAdvanceMap::start()
       boost::make_counting_iterator(*from + 1),
       boost::make_counting_iterator(to + 1),
       [this](epoch_t next_epoch) {
-        return shard_manager.get_shard_services().get_map(next_epoch).then(
+        return shard_services.get_map(next_epoch).then(
           [this] (cached_map_t&& next_map) {
             logger().debug("{}: advancing map to {}",
                            *this, next_map->get_epoch());
@@ -81,21 +81,21 @@ seastar::future<> PGAdvanceMap::start()
         pg->handle_activate_map(rctx);
         logger().debug("{}: map activated", *this);
         if (do_init) {
-          shard_manager.pg_created(pg->get_pgid(), pg);
-          shard_manager.get_shard_services().inc_pg_num();
+          shard_services.pg_created(pg->get_pgid(), pg);
+          shard_services.inc_pg_num();
           logger().info("PGAdvanceMap::start new pg {}", *pg);
         }
         return seastar::when_all_succeed(
           pg->get_need_up_thru()
-         ? shard_manager.get_shard_services().send_alive(
+         ? shard_services.send_alive(
            pg->get_same_interval_since())
          : seastar::now(),
-          shard_manager.get_shard_services().dispatch_context(
+          shard_services.dispatch_context(
             pg->get_collection_ref(),
             std::move(rctx)));
       }).then_unpack([this] {
         logger().debug("{}: sending pg temp", *this);
-        return shard_manager.get_shard_services().send_pg_temp();
+        return shard_services.send_pg_temp();
       });
   }).then([this, ref=std::move(ref)] {
     logger().debug("{}: complete", *this);
index 3391dd690fb71bff6a30221715644e50f7fcf021..6ae5a97bc0ff585ae0fe695e63376da61eb04613 100644 (file)
@@ -17,7 +17,7 @@ namespace ceph {
 
 namespace crimson::osd {
 
-class PGShardManager;
+class ShardServices;
 class PG;
 
 class PGAdvanceMap : public PhasedOperationT<PGAdvanceMap> {
@@ -25,7 +25,7 @@ public:
   static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
 
 protected:
-  PGShardManager &shard_manager;
+  ShardServices &shard_services;
   Ref<PG> pg;
   PipelineHandle handle;
 
@@ -37,7 +37,7 @@ protected:
 
 public:
   PGAdvanceMap(
-    PGShardManager &shard_manager, Ref<PG> pg, epoch_t to,
+    ShardServices &shard_services, Ref<PG> pg, epoch_t to,
     PeeringCtx &&rctx, bool do_init);
   ~PGAdvanceMap();
 
index 68f85e5f556b6152877cdba04062ffd070253026..0eabf34eb6bc5ebd0843fc23e42e03e5612a4b92 100644 (file)
@@ -65,6 +65,8 @@ public:
     pg_to_core.erase(iter);
   }
 
+  size_t get_num_pgs() const { return pg_to_core.size(); }
+
   /// Map to cores in [min_core_mapping, core_mapping_limit)
   PGShardMapping(core_id_t min_core_mapping, core_id_t core_mapping_limit) {
     ceph_assert_always(min_core_mapping < core_mapping_limit);
@@ -72,6 +74,7 @@ public:
       core_to_num_pgs.emplace(i, 0);
     }
   }
+
 private:
   std::map<core_id_t, unsigned> core_to_num_pgs;
   std::map<spg_t, core_id_t> pg_to_core;
index 8597725eaf1729803f46f52d4395555b3294b0af..5571e2d0ee12dbfb562653f03a2bb5b3df0140c9 100644 (file)
@@ -2,6 +2,13 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "crimson/osd/pg_shard_manager.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
 
 namespace crimson::osd {
 
@@ -18,6 +25,65 @@ PGShardManager::PGShardManager(
     shard_services(osd_singleton_state, local_state)
 {}
 
+seastar::future<> PGShardManager::load_pgs()
+{
+  return osd_singleton_state.store.list_collections(
+  ).then([this](auto colls) {
+    return seastar::parallel_for_each(
+      colls,
+      [this](auto coll) {
+       spg_t pgid;
+       if (coll.is_pg(&pgid)) {
+         auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+           pgid);
+         return with_remote_shard_state(
+           core,
+           [pgid](
+             PerShardState &per_shard_state,
+             ShardServices &shard_services) {
+             return shard_services.load_pg(
+               pgid
+             ).then([pgid, &per_shard_state, &shard_services](auto &&pg) {
+               logger().info("load_pgs: loaded {}", pgid);
+               per_shard_state.pg_map.pg_loaded(pgid, std::move(pg));
+               shard_services.inc_pg_num();
+               return seastar::now();
+             });
+           });
+       } else if (coll.is_temp(&pgid)) {
+         logger().warn(
+           "found temp collection on crimson osd, should be impossible: {}",
+           coll);
+         ceph_assert(0 == "temp collection on crimson osd, should be impossible");
+         return seastar::now();
+       } else {
+         logger().warn("ignoring unrecognized collection: {}", coll);
+         return seastar::now();
+       }
+      });
+  });
+}
+
+seastar::future<> PGShardManager::stop_pgs()
+{
+  return local_state.stop_pgs();
+}
+
+std::map<pg_t, pg_stat_t> PGShardManager::get_pg_stats() const
+{
+  return local_state.get_pg_stats();
+}
+
+seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch)
+{
+  return local_state.broadcast_map_to_pgs(
+    shard_services, epoch
+  ).then([this, epoch] {
+    osd_singleton_state.osdmap_gate.got_map(epoch);
+    return seastar::now();
+  });
+}
+
 seastar::future<> PGShardManager::set_up_epoch(epoch_t e) {
   local_state.set_up_epoch(e);
   return seastar::now();
index e9138cd63b7133fff7a99d25f180ffe53ed9c838..f40ea25a6f24ae0f0c9056d9889ea08fcb6789c2 100644 (file)
@@ -82,24 +82,120 @@ public:
 
   seastar::future<> set_up_epoch(epoch_t e);
 
-  FORWARD(pg_created, pg_created, osd_singleton_state.pg_map)
-  auto load_pgs() {
-    return osd_singleton_state.load_pgs(shard_services);
+  template <typename F>
+  auto with_remote_shard_state(core_id_t core, F &&f) {
+    ceph_assert(core == 0);
+    auto &local_state_ref = local_state;
+    auto &shard_services_ref = shard_services;
+    return seastar::smp::submit_to(
+      core,
+      [f=std::forward<F>(f), &local_state_ref, &shard_services_ref]() mutable {
+       return std::invoke(
+         std::move(f), local_state_ref, shard_services_ref);
+      });
+  }
+
+  /// Runs opref on the appropriate core, creating the pg as necessary.
+  template <typename T>
+  seastar::future<> run_with_pg_maybe_create(
+    typename T::IRef op
+  ) {
+    ceph_assert(op->use_count() == 1);
+    auto &logger = crimson::get_logger(ceph_subsys_osd);
+    static_assert(T::can_create());
+    logger.debug("{}: can_create", *op);
+
+    auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+      op->get_pgid());
+
+    local_state.registry.remove_from_registry(*op);
+    return with_remote_shard_state(
+      core,
+      [op=std::move(op)](
+       PerShardState &per_shard_state,
+       ShardServices &shard_services) mutable {
+       per_shard_state.registry.add_to_registry(*op);
+       auto &logger = crimson::get_logger(ceph_subsys_osd);
+       auto &opref = *op;
+       return opref.template with_blocking_event<
+         PGMap::PGCreationBlockingEvent
+         >([&shard_services, &opref](
+             auto &&trigger) {
+           return shard_services.get_or_create_pg(
+             std::move(trigger),
+             opref.get_pgid(), opref.get_epoch(),
+             std::move(opref.get_create_info()));
+         }).then([&logger, &shard_services, &opref](Ref<PG> pgref) {
+           logger.debug("{}: have_pg", opref);
+           return opref.with_pg(shard_services, pgref);
+         }).then([op=std::move(op)] {});
+      });
+  }
+
+  /// Runs opref on the appropriate core, waiting for pg as necessary
+  template <typename T>
+  seastar::future<> run_with_pg_maybe_wait(
+    typename T::IRef op
+  ) {
+    ceph_assert(op->use_count() == 1);
+    auto &logger = crimson::get_logger(ceph_subsys_osd);
+    static_assert(!T::can_create());
+    logger.debug("{}: !can_create", *op);
+
+    auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+      op->get_pgid());
+
+    local_state.registry.remove_from_registry(*op);
+    return with_remote_shard_state(
+      core,
+      [op=std::move(op)](
+       PerShardState &per_shard_state,
+       ShardServices &shard_services) mutable {
+       per_shard_state.registry.add_to_registry(*op);
+       auto &logger = crimson::get_logger(ceph_subsys_osd);
+       auto &opref = *op;
+       return opref.template with_blocking_event<
+         PGMap::PGCreationBlockingEvent
+         >([&shard_services, &opref](
+             auto &&trigger) {
+           return shard_services.wait_for_pg(
+             std::move(trigger), opref.get_pgid());
+         }).then([&logger, &shard_services, &opref](Ref<PG> pgref) {
+           logger.debug("{}: have_pg", opref);
+           return opref.with_pg(shard_services, pgref);
+         }).then([op=std::move(op)] {});
+      });
   }
-  FORWARD_TO_OSD_SINGLETON(stop_pgs)
-  FORWARD_CONST(get_pg_stats, get_pg_stats, osd_singleton_state)
 
-  FORWARD_CONST(for_each_pg, for_each_pg, osd_singleton_state)
-  auto get_num_pgs() const { return osd_singleton_state.pg_map.get_pgs().size(); }
+  seastar::future<> load_pgs();
+  seastar::future<> stop_pgs();
+
+  std::map<pg_t, pg_stat_t> get_pg_stats() const;
 
-  auto broadcast_map_to_pgs(epoch_t epoch) {
-    return osd_singleton_state.broadcast_map_to_pgs(
-      *this, shard_services, epoch);
+  template <typename F>
+  void for_each_pg(F &&f) const {
+    for (auto &&pg: local_state.pg_map.get_pgs()) {
+      std::apply(f, pg);
+    }
+  }
+
+  auto get_num_pgs() const {
+    return osd_singleton_state.pg_to_shard_mapping.get_num_pgs();
   }
 
+  seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
+
   template <typename F>
   auto with_pg(spg_t pgid, F &&f) {
-    return std::invoke(std::forward<F>(f), osd_singleton_state.get_pg(pgid));
+    core_id_t core = osd_singleton_state.pg_to_shard_mapping.get_pg_mapping(
+      pgid);
+    return with_remote_shard_state(
+      core,
+      [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
+       return std::invoke(
+         std::move(f),
+         local_state.pg_map.get_pg(pgid));
+      });
   }
 
   template <typename T, typename... Args>
@@ -108,8 +204,9 @@ public:
       std::forward<Args>(args)...);
     auto &logger = crimson::get_logger(ceph_subsys_osd);
     logger.debug("{}: starting {}", *op, __func__);
-    auto &opref = *op;
 
+    auto &opref = *op;
+    auto id = op->get_id();
     auto fut = opref.template enter_stage<>(
       opref.get_connection_pipeline().await_active
     ).then([this, &opref, &logger] {
@@ -135,37 +232,17 @@ public:
       logger.debug("{}: got map {}, entering get_pg", opref, epoch);
       return opref.template enter_stage<>(
        opref.get_connection_pipeline().get_pg);
-    }).then([this, &logger, &opref] {
+    }).then([this, &logger, &opref, op=std::move(op)]() mutable {
       logger.debug("{}: in get_pg", opref);
       if constexpr (T::can_create()) {
        logger.debug("{}: can_create", opref);
-       return opref.template with_blocking_event<
-         PGMap::PGCreationBlockingEvent
-         >([this, &opref](auto &&trigger) {
-           std::ignore = this; // avoid clang warning
-           return osd_singleton_state.get_or_create_pg(
-             *this,
-             shard_services,
-             std::move(trigger),
-             opref.get_pgid(), opref.get_epoch(),
-             std::move(opref.get_create_info()));
-         });
+       return run_with_pg_maybe_create<T>(std::move(op));
       } else {
        logger.debug("{}: !can_create", opref);
-       return opref.template with_blocking_event<
-         PGMap::PGCreationBlockingEvent
-         >([this, &opref](auto &&trigger) {
-           std::ignore = this; // avoid clang warning
-           return osd_singleton_state.wait_for_pg(
-             std::move(trigger), opref.get_pgid());
-         });
+       return run_with_pg_maybe_wait<T>(std::move(op));
       }
-    }).then([this, &logger, &opref](Ref<PG> pgref) {
-      logger.debug("{}: have_pg", opref);
-      return opref.with_pg(get_shard_services(), pgref);
-    }).then([op] { /* Retain refcount on op until completion */ });
-
-    return std::make_pair(std::move(op), std::move(fut));
+    });
+    return std::make_pair(id, std::move(fut));
   }
 };
 
index 0bf4ba485d3427cec29ee3b2826cf4191eaa3ff4..8975187ed3c12f15f6d1212023350ee1efa49b6c 100644 (file)
@@ -46,6 +46,49 @@ PerShardState::PerShardState(
   cct.get_perfcounters_collection()->add(recoverystate_perf);
 }
 
+seastar::future<> PerShardState::stop_pgs()
+{
+  return seastar::parallel_for_each(
+    pg_map.get_pgs(),
+    [](auto& p) {
+      return p.second->stop();
+    });
+}
+
+std::map<pg_t, pg_stat_t> PerShardState::get_pg_stats() const
+{
+  std::map<pg_t, pg_stat_t> ret;
+  for (auto [pgid, pg] : pg_map.get_pgs()) {
+    if (pg->is_primary()) {
+      auto stats = pg->get_stats();
+      // todo: update reported_epoch,reported_seq,last_fresh
+      stats.reported_epoch = osdmap->get_epoch();
+      ret.emplace(pgid.pgid, std::move(stats));
+    }
+  }
+  return ret;
+}
+
+seastar::future<> PerShardState::broadcast_map_to_pgs(
+  ShardServices &shard_services,
+  epoch_t epoch)
+{
+  auto &pgs = pg_map.get_pgs();
+  return seastar::parallel_for_each(
+    pgs.begin(), pgs.end(),
+    [=, &shard_services](auto& pg) {
+      return shard_services.start_operation<PGAdvanceMap>(
+       shard_services,
+       pg.second, epoch,
+       PeeringCtx{}, false).second;
+    });
+}
+
+Ref<PG> PerShardState::get_pg(spg_t pgid)
+{
+  return pg_map.get_pg(pgid);
+}
+
 OSDSingletonState::OSDSingletonState(
   int whoami,
   crimson::net::Messenger &cluster_msgr,
@@ -371,14 +414,13 @@ seastar::future<> OSDSingletonState::store_maps(ceph::os::Transaction& t,
     });
 }
 
-seastar::future<Ref<PG>> OSDSingletonState::make_pg(
-  ShardServices &shard_services,
+seastar::future<Ref<PG>> ShardServices::make_pg(
   OSDMapService::cached_map_t create_map,
   spg_t pgid,
   bool do_create)
 {
   using ec_profile_t = std::map<std::string, std::string>;
-  auto get_pool_info = [create_map, pgid, this] {
+  auto get_pool_info_for_pg = [create_map, pgid, this] {
     if (create_map->have_pg_pool(pgid.pool())) {
       pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
       std::string name = create_map->get_pool_name(pgid.pool());
@@ -395,51 +437,49 @@ seastar::future<Ref<PG>> OSDSingletonState::make_pg(
            std::move(ec_profile)));
     } else {
       // pool was deleted; grab final pg_pool_t off disk.
-      return get_meta_coll().load_final_pool_info(pgid.pool());
+      return get_pool_info(pgid.pool());
     }
   };
   auto get_collection = [pgid, do_create, this] {
     const coll_t cid{pgid};
     if (do_create) {
-      return store.create_new_collection(cid);
+      return get_store().create_new_collection(cid);
     } else {
-      return store.open_collection(cid);
+      return get_store().open_collection(cid);
     }
   };
   return seastar::when_all(
-    std::move(get_pool_info),
+    std::move(get_pool_info_for_pg),
     std::move(get_collection)
-  ).then([&shard_services, pgid, create_map, this] (auto&& ret) {
+  ).then([pgid, create_map, this](auto &&ret) {
     auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
     auto coll = std::move(std::get<1>(ret).get0());
     return seastar::make_ready_future<Ref<PG>>(
       new PG{
        pgid,
-       pg_shard_t{whoami, pgid.shard},
+       pg_shard_t{local_state.whoami, pgid.shard},
        std::move(coll),
        std::move(pool),
        std::move(name),
        create_map,
-       shard_services,
+       *this,
        ec_profile});
   });
 }
 
-seastar::future<Ref<PG>> OSDSingletonState::handle_pg_create_info(
-  PGShardManager &shard_manager,
-  ShardServices &shard_services,
+seastar::future<Ref<PG>> ShardServices::handle_pg_create_info(
   std::unique_ptr<PGCreateInfo> info) {
   return seastar::do_with(
     std::move(info),
-    [this, &shard_manager, &shard_services](auto &info)
+    [this](auto &info)
     -> seastar::future<Ref<PG>> {
-      return shard_services.get_map(info->epoch).then(
-       [&info, &shard_services, this](OSDMapService::cached_map_t startmap)
-       -> seastar::future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>> {
+      return get_map(info->epoch).then(
+       [&info, this](cached_map_t startmap)
+       -> seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
          const spg_t &pgid = info->pgid;
          if (info->by_mon) {
            int64_t pool_id = pgid.pgid.pool();
-           const pg_pool_t *pool = shard_services.get_map()->get_pg_pool(pool_id);
+           const pg_pool_t *pool = get_map()->get_pg_pool(pool_id);
            if (!pool) {
              logger().debug(
                "{} ignoring pgid {}, pool dne",
@@ -449,7 +489,8 @@ seastar::future<Ref<PG>> OSDSingletonState::handle_pg_create_info(
                std::tuple<Ref<PG>, OSDMapService::cached_map_t>
                >(std::make_tuple(Ref<PG>(), startmap));
            }
-           ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
+           ceph_assert(get_map()->require_osd_release >=
+                       ceph_release_t::octopus);
            if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
              // this ensures we do not process old creating messages after the
              // pool's initial pgs have been created (and pg are subsequently
@@ -463,13 +504,14 @@ seastar::future<Ref<PG>> OSDSingletonState::handle_pg_create_info(
                >(std::make_tuple(Ref<PG>(), startmap));
            }
          }
-         return make_pg(shard_services, startmap, pgid, true).then(
-           [startmap=std::move(startmap)](auto pg) mutable {
-             return seastar::make_ready_future<
-               std::tuple<Ref<PG>, OSDMapService::cached_map_t>
-               >(std::make_tuple(std::move(pg), std::move(startmap)));
-           });
-       }).then([this, &shard_manager, &shard_services, &info](auto&& ret)
+         return make_pg(
+           startmap, pgid, true
+         ).then([startmap=std::move(startmap)](auto pg) mutable {
+           return seastar::make_ready_future<
+             std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+             >(std::make_tuple(std::move(pg), std::move(startmap)));
+         });
+       }).then([this, &info](auto &&ret)
                ->seastar::future<Ref<PG>> {
          auto [pg, startmap] = std::move(ret);
          if (!pg)
@@ -482,7 +524,7 @@ seastar::future<Ref<PG>> OSDSingletonState::handle_pg_create_info(
            info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
 
          int role = startmap->calc_pg_role(
-           pg_shard_t(whoami, info->pgid.shard),
+           pg_shard_t(local_state.whoami, info->pgid.shard),
            acting);
 
          PeeringCtx rctx;
@@ -505,10 +547,10 @@ seastar::future<Ref<PG>> OSDSingletonState::handle_pg_create_info(
            info->past_intervals,
            rctx.transaction);
 
-         return shard_services.start_operation<PGAdvanceMap>(
-           shard_manager, pg, osdmap->get_epoch(), std::move(rctx), true
+         return start_operation<PGAdvanceMap>(
+           *this, pg, get_map()->get_epoch(), std::move(rctx), true
          ).second.then([pg=pg] {
-             return seastar::make_ready_future<Ref<PG>>(pg);
+           return seastar::make_ready_future<Ref<PG>>(pg);
          });
        });
     });
@@ -516,85 +558,46 @@ seastar::future<Ref<PG>> OSDSingletonState::handle_pg_create_info(
 
 
 seastar::future<Ref<PG>>
-OSDSingletonState::get_or_create_pg(
-  PGShardManager &shard_manager,
-  ShardServices &shard_services,
+ShardServices::get_or_create_pg(
   PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
   spg_t pgid,
   epoch_t epoch,
   std::unique_ptr<PGCreateInfo> info)
 {
   if (info) {
-    auto [fut, creating] = pg_map.wait_for_pg(std::move(trigger), pgid);
+    auto [fut, creating] = local_state.pg_map.wait_for_pg(
+      std::move(trigger), pgid);
     if (!creating) {
-      pg_map.set_creating(pgid);
+      local_state.pg_map.set_creating(pgid);
       (void)handle_pg_create_info(
-       shard_manager, shard_services, std::move(info));
+       std::move(info));
     }
     return std::move(fut);
   } else {
-    return seastar::make_ready_future<Ref<PG>>(pg_map.get_pg(pgid));
+    return seastar::make_ready_future<Ref<PG>>(
+      local_state.pg_map.get_pg(pgid));
   }
 }
 
-seastar::future<Ref<PG>> OSDSingletonState::wait_for_pg(
+seastar::future<Ref<PG>> ShardServices::wait_for_pg(
   PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid)
 {
-  return pg_map.wait_for_pg(std::move(trigger), pgid).first;
-}
-
-Ref<PG> OSDSingletonState::get_pg(spg_t pgid)
-{
-  return pg_map.get_pg(pgid);
+  return local_state.pg_map.wait_for_pg(std::move(trigger), pgid).first;
 }
 
-seastar::future<> OSDSingletonState::load_pgs(
-  ShardServices &shard_services)
-{
-  return store.list_collections(
-  ).then([this, &shard_services](auto colls) {
-    return seastar::parallel_for_each(
-      colls,
-      [this, &shard_services](auto coll) {
-       spg_t pgid;
-       if (coll.is_pg(&pgid)) {
-         return load_pg(
-           shard_services,
-           pgid
-         ).then([pgid, this, &shard_services](auto &&pg) {
-           logger().info("load_pgs: loaded {}", pgid);
-           pg_map.pg_loaded(pgid, std::move(pg));
-           shard_services.inc_pg_num();
-           return seastar::now();
-         });
-       } else if (coll.is_temp(&pgid)) {
-         logger().warn(
-           "found temp collection on crimson osd, should be impossible: {}",
-           coll);
-         ceph_assert(0 == "temp collection on crimson osd, should be impossible");
-         return seastar::now();
-       } else {
-         logger().warn("ignoring unrecognized collection: {}", coll);
-         return seastar::now();
-       }
-      });
-  });
-}
+seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid)
 
-seastar::future<Ref<PG>> OSDSingletonState::load_pg(
-  ShardServices &shard_services,
-  spg_t pgid)
 {
   logger().debug("{}: {}", __func__, pgid);
 
-  return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
+  return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) {
     return pg_meta.get_epoch();
-  }).then([&shard_services](epoch_t e) {
-    return shard_services.get_map(e);
-  }).then([pgid, this, &shard_services] (auto&& create_map) {
-    return make_pg(shard_services, std::move(create_map), pgid, false);
+  }).then([this](epoch_t e) {
+    return get_map(e);
+  }).then([pgid, this](auto&& create_map) {
+    return make_pg(std::move(create_map), pgid, false);
   }).then([this](Ref<PG> pg) {
-    return pg->read_state(&store).then([pg] {
+    return pg->read_state(&get_store()).then([pg] {
        return seastar::make_ready_future<Ref<PG>>(std::move(pg));
     });
   }).handle_exception([pgid](auto ep) {
@@ -604,47 +607,6 @@ seastar::future<Ref<PG>> OSDSingletonState::load_pg(
   });
 }
 
-seastar::future<> OSDSingletonState::stop_pgs()
-{
-  return seastar::parallel_for_each(
-    pg_map.get_pgs(),
-    [](auto& p) {
-      return p.second->stop();
-    });
-}
-
-std::map<pg_t, pg_stat_t> OSDSingletonState::get_pg_stats() const
-{
-  std::map<pg_t, pg_stat_t> ret;
-  for (auto [pgid, pg] : pg_map.get_pgs()) {
-    if (pg->is_primary()) {
-      auto stats = pg->get_stats();
-      // todo: update reported_epoch,reported_seq,last_fresh
-      stats.reported_epoch = osdmap->get_epoch();
-      ret.emplace(pgid.pgid, std::move(stats));
-    }
-  }
-  return ret;
-}
-
-seastar::future<> OSDSingletonState::broadcast_map_to_pgs(
-  PGShardManager &shard_manager,
-  ShardServices &shard_services,
-  epoch_t epoch)
-{
-  auto &pgs = pg_map.get_pgs();
-  return seastar::parallel_for_each(
-    pgs.begin(), pgs.end(),
-    [=, &shard_manager, &shard_services](auto& pg) {
-      return shard_services.start_operation<PGAdvanceMap>(
-       shard_manager, pg.second, epoch, PeeringCtx{}, false
-      ).second;
-    }).then([epoch, this] {
-      osdmap_gate.got_map(epoch);
-      return seastar::make_ready_future();
-    });
-}
-
 seastar::future<> ShardServices::dispatch_context_transaction(
   crimson::os::CollectionRef col, PeeringCtx &ctx) {
   if (ctx.transaction.empty()) {
index bc68f290f841a390000608bea64234b302edb4d7..29abcd2f7a38d5249be456d0089c26461fe0ae48 100644 (file)
@@ -90,6 +90,23 @@ class PerShardState {
     return registry.stop();
   }
 
+  // PGMap state
+  PGMap pg_map;
+
+  seastar::future<> stop_pgs();
+  std::map<pg_t, pg_stat_t> get_pg_stats() const;
+  seastar::future<> broadcast_map_to_pgs(
+    ShardServices &shard_services,
+    epoch_t epoch);
+
+  Ref<PG> get_pg(spg_t pgid);
+  template <typename F>
+  void for_each_pg(F &&f) const {
+    for (auto &pg : pg_map.get_pgs()) {
+      std::invoke(f, pg.first, pg.second);
+    }
+  }
+
   template <typename T, typename... Args>
   auto start_operation(Args&&... args) {
     if (__builtin_expect(stopping, false)) {
@@ -173,6 +190,10 @@ public:
     return *meta_coll;
   }
 
+  auto get_pool_info(int64_t poolid) {
+    return get_meta_coll().load_final_pool_info(poolid);
+  }
+
   // global pg temp state
   struct pg_temp_t {
     std::vector<int> acting;
@@ -188,6 +209,8 @@ public:
   void requeue_pg_temp();
   seastar::future<> send_pg_temp();
 
+  // TODO: add config to control mapping
+  PGShardMapping pg_to_shard_mapping{0, 1};
   unsigned num_pgs = 0;
   unsigned get_pg_num() const {
     return num_pgs;
@@ -238,46 +261,6 @@ public:
                     epoch_t e, bufferlist&& bl);
   seastar::future<> store_maps(ceph::os::Transaction& t,
                                epoch_t start, Ref<MOSDMap> m);
-
-  // PGMap state
-  PGMap pg_map;
-
-  seastar::future<Ref<PG>> make_pg(
-    ShardServices &shard_services,
-    cached_map_t create_map,
-    spg_t pgid,
-    bool do_create);
-  seastar::future<Ref<PG>> handle_pg_create_info(
-    PGShardManager &shard_manager,
-    ShardServices &shard_services,
-    std::unique_ptr<PGCreateInfo> info);
-  seastar::future<Ref<PG>> get_or_create_pg(
-    PGShardManager &shard_manager,
-    ShardServices &shard_services,
-    PGMap::PGCreationBlockingEvent::TriggerI&&,
-    spg_t pgid,
-    epoch_t epoch,
-    std::unique_ptr<PGCreateInfo> info);
-  seastar::future<Ref<PG>> wait_for_pg(
-    PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
-  Ref<PG> get_pg(spg_t pgid);
-  seastar::future<> load_pgs(ShardServices &shard_services);
-  seastar::future<Ref<PG>> load_pg(
-    ShardServices &shard_services,
-    spg_t pgid);
-  seastar::future<> stop_pgs();
-  std::map<pg_t, pg_stat_t> get_pg_stats() const;
-  seastar::future<> broadcast_map_to_pgs(
-    PGShardManager &shard_manager,
-    ShardServices &shard_services,
-    epoch_t epoch);
-
-  template <typename F>
-  void for_each_pg(F &&f) const {
-    for (auto &pg : pg_map.get_pgs()) {
-      std::invoke(f, pg.first, pg.second);
-    }
-  }
 };
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
@@ -341,6 +324,22 @@ public:
     return *local_state.perf;
   }
 
+  // Local PG Management
+  seastar::future<Ref<PG>> make_pg(
+    cached_map_t create_map,
+    spg_t pgid,
+    bool do_create);
+  seastar::future<Ref<PG>> handle_pg_create_info(
+    std::unique_ptr<PGCreateInfo> info);
+  seastar::future<Ref<PG>> get_or_create_pg(
+    PGMap::PGCreationBlockingEvent::TriggerI&&,
+    spg_t pgid,
+    epoch_t epoch,
+    std::unique_ptr<PGCreateInfo> info);
+  seastar::future<Ref<PG>> wait_for_pg(
+    PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
+  seastar::future<Ref<PG>> load_pg(spg_t pgid);
+
   /// Dispatch and reset ctx transaction
   seastar::future<> dispatch_context_transaction(
     crimson::os::CollectionRef col, PeeringCtx &ctx);
@@ -376,6 +375,7 @@ public:
       });
   }
 
+  FORWARD_TO_OSD_SINGLETON(get_pool_info)
   FORWARD_TO_OSD_SINGLETON(get_pg_num)
   FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
 
@@ -392,6 +392,8 @@ public:
   FORWARD_CONST(get_mnow, get_mnow, osd_singleton_state)
   FORWARD_TO_OSD_SINGLETON(get_hb_stamps)
 
+  FORWARD(pg_created, pg_created, local_state.pg_map)
+
   FORWARD(
     maybe_get_cached_obc, maybe_get_cached_obc, local_state.obc_registry)
   FORWARD(