]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: shard PerShardState across cores
authorSamuel Just <sjust@redhat.com>
Wed, 14 Sep 2022 02:09:58 +0000 (19:09 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 27 Sep 2022 02:35:41 +0000 (19:35 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/pg.h
src/crimson/osd/pg_shard_manager.cc
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 3a9b709b0192bdb75296abb93ce2a96cedf20340..98e2d2103bc26235daaeb83624f2ab0f0ee4df02 100644 (file)
@@ -220,7 +220,9 @@ public:
     unsigned priority,
     PGPeeringEventURef on_grant,
     PGPeeringEventURef on_preempt) final {
-    shard_services.local_request_reservation(
+    // TODO -- we probably want to add a mechanism for blocking on this
+    // after handling the peering event
+    std::ignore = shard_services.local_request_reservation(
       pgid,
       on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
        start_peering_event_operation(std::move(*on_grant));
@@ -234,13 +236,17 @@ public:
 
   void update_local_background_io_priority(
     unsigned priority) final {
-    shard_services.local_update_priority(
+    // TODO -- we probably want to add a mechanism for blocking on this
+    // after handling the peering event
+    std::ignore = shard_services.local_update_priority(
       pgid,
       priority);
   }
 
   void cancel_local_background_io_reservation() final {
-    shard_services.local_cancel_reservation(
+    // TODO -- we probably want to add a mechanism for blocking on this
+    // after handling the peering event
+    std::ignore = shard_services.local_cancel_reservation(
       pgid);
   }
 
@@ -248,7 +254,9 @@ public:
     unsigned priority,
     PGPeeringEventURef on_grant,
     PGPeeringEventURef on_preempt) final {
-    shard_services.remote_request_reservation(
+    // TODO -- we probably want to add a mechanism for blocking on this
+    // after handling the peering event
+    std::ignore = shard_services.remote_request_reservation(
       pgid,
       on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
        start_peering_event_operation(std::move(*on_grant));
@@ -261,7 +269,9 @@ public:
   }
 
   void cancel_remote_recovery_reservation() final {
-    shard_services.remote_cancel_reservation(
+    // TODO -- we probably want to add a mechanism for blocking on this
+    // after handling the peering event
+    std::ignore =  shard_services.remote_cancel_reservation(
       pgid);
   }
 
@@ -285,10 +295,14 @@ public:
     // Not needed yet
   }
   void queue_want_pg_temp(const std::vector<int> &wanted) final {
-    shard_services.queue_want_pg_temp(pgid.pgid, wanted);
+    // TODO -- we probably want to add a mechanism for blocking on this
+    // after handling the peering event
+    std::ignore = shard_services.queue_want_pg_temp(pgid.pgid, wanted);
   }
   void clear_want_pg_temp() final {
-    shard_services.remove_want_pg_temp(pgid.pgid);
+    // TODO -- we probably want to add a mechanism for blocking on this
+    // after handling the peering event
+    std::ignore = shard_services.remove_want_pg_temp(pgid.pgid);
   }
   void check_recovery_sources(const OSDMapRef& newmap) final {
     // Not needed yet
index fef77d6166a12ae67c92037374dc6c4a5d42ca7f..84b17e721f952033da5c8e0cc3b6ce3a3a79951f 100644 (file)
@@ -20,19 +20,24 @@ seastar::future<> PGShardManager::start(
   crimson::mgr::Client &mgrc,
   crimson::os::FuturizedStore &store)
 {
-  osd_singleton_state.reset(
-    new OSDSingletonState(whoami, cluster_msgr, public_msgr,
-                         monc, mgrc));
-  shard_services.reset(
-    new ShardServices(*osd_singleton_state, whoami, store));
-  return seastar::now();
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return osd_singleton_state.start_single(
+    whoami, std::ref(cluster_msgr), std::ref(public_msgr),
+    std::ref(monc), std::ref(mgrc)
+  ).then([this, whoami, &store] {
+    ceph::mono_time startup_time = ceph::mono_clock::now();
+    return shard_services.start(
+      std::ref(osd_singleton_state), whoami, startup_time, std::ref(store));
+  });
 }
 
 seastar::future<> PGShardManager::stop()
 {
-  shard_services.reset();
-  osd_singleton_state.reset();
-  return seastar::now();
+  ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+  return shard_services.stop(
+  ).then([this] {
+    return osd_singleton_state.stop();
+  });
 }
 
 seastar::future<> PGShardManager::load_pgs()
@@ -76,29 +81,44 @@ seastar::future<> PGShardManager::load_pgs()
 
 seastar::future<> PGShardManager::stop_pgs()
 {
-  return get_local_state().stop_pgs();
+  return shard_services.invoke_on_all([](auto &local_service) {
+    return local_service.local_state.stop_pgs();
+  });
 }
 
 seastar::future<std::map<pg_t, pg_stat_t>>
 PGShardManager::get_pg_stats() const
 {
-  return seastar::make_ready_future<std::map<pg_t, pg_stat_t>>(
-    get_local_state().get_pg_stats());
+  return shard_services.map_reduce0(
+    [](auto &local) {
+      return local.local_state.get_pg_stats();
+    },
+    std::map<pg_t, pg_stat_t>(),
+    [](auto &&left, auto &&right) {
+      left.merge(std::move(right));
+      return std::move(left);
+    });
 }
 
 seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch)
 {
-  return get_local_state().broadcast_map_to_pgs(
-    get_shard_services(), epoch
-  ).then([this, epoch] {
+  return shard_services.invoke_on_all([epoch](auto &local_service) {
+    return local_service.local_state.broadcast_map_to_pgs(
+      local_service, epoch
+    );
+  }).then([this, epoch] {
     get_osd_singleton_state().osdmap_gate.got_map(epoch);
     return seastar::now();
   });
 }
 
 seastar::future<> PGShardManager::set_up_epoch(epoch_t e) {
-  local_state.set_up_epoch(e);
-  return seastar::now();
+  return shard_services.invoke_on_all(
+    seastar::smp_submit_to_options{},
+    [e](auto &local_service) {
+      local_service.local_state.set_up_epoch(e);
+      return seastar::now();
+    });
 }
 
 }
index 2135719fa062d7f3456eb4d5fa40e217e4d40550..66b289a073eb098c0cb304518af2837e3cd7569b 100644 (file)
@@ -21,8 +21,8 @@ namespace crimson::osd {
  * etc)
  */
 class PGShardManager {
-  std::unique_ptr<OSDSingletonState> osd_singleton_state;
-  std::unique_ptr<ShardServices> shard_services;
+  seastar::sharded<OSDSingletonState> osd_singleton_state;
+  seastar::sharded<ShardServices> shard_services;
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
   template <typename... Args>                                  \
@@ -54,22 +54,53 @@ public:
     crimson::os::FuturizedStore &store);
   seastar::future<> stop();
 
-  auto &get_osd_singleton_state() { return *osd_singleton_state; }
-  auto &get_osd_singleton_state() const { return *osd_singleton_state; }
-  auto &get_shard_services() { return *shard_services; }
-  auto &get_shard_services() const { return *shard_services; }
-  auto &get_local_state() { return shard_services->local_state; }
-  auto &get_local_state() const { return shard_services->local_state; }
+  auto &get_osd_singleton_state() {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    return osd_singleton_state.local();
+  }
+  auto &get_osd_singleton_state() const {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    return osd_singleton_state.local();
+  }
+  auto &get_shard_services() {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    return shard_services.local();
+  }
+  auto &get_shard_services() const {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    return shard_services.local();
+  }
+  auto &get_local_state() { return get_shard_services().local_state; }
+  auto &get_local_state() const { return get_shard_services().local_state; }
 
   seastar::future<> update_map(local_cached_map_t &&map) {
-    auto fmap = make_local_shared_foreign(std::move(map));
-    get_osd_singleton_state().update_map(fmap);
-    get_local_state().update_map(std::move(fmap));
-    return seastar::now();
+    get_osd_singleton_state().update_map(
+      make_local_shared_foreign(local_cached_map_t(map))
+    );
+    /* We need each core to get its own foreign_ptr<local_cached_map_t>.
+     * foreign_ptr can't be cheaply copied, so we make one for each core
+     * up front. */
+    return seastar::do_with(
+      std::vector<seastar::foreign_ptr<local_cached_map_t>>(),
+      [this, map](auto &fmaps) {
+       fmaps.resize(seastar::smp::count);
+       for (auto &i: fmaps) {
+         i = seastar::foreign_ptr(map);
+       }
+       return shard_services.invoke_on_all(
+         [&fmaps](auto &local) mutable {
+           local.local_state.update_map(
+             make_local_shared_foreign(
+               std::move(fmaps[seastar::this_shard_id()])
+             ));
+         });
+      });
   }
 
-  auto stop_registries() {
-    return get_local_state().stop_registry();
+  seastar::future<> stop_registries() {
+    return shard_services.invoke_on_all([](auto &local) {
+      return local.local_state.stop_registry();
+    });
   }
 
   FORWARD_TO_OSD_SINGLETON(send_pg_created)
@@ -106,14 +137,11 @@ public:
 
   template <typename F>
   auto with_remote_shard_state(core_id_t core, F &&f) {
-    ceph_assert(core == 0);
-    auto &local_state_ref = get_local_state();
-    auto &shard_services_ref = get_shard_services();
-    return seastar::smp::submit_to(
-      core,
-      [f=std::forward<F>(f), &local_state_ref, &shard_services_ref]() mutable {
+    return shard_services.invoke_on(
+      core, [f=std::move(f)](auto &target_shard_services) mutable {
        return std::invoke(
-         std::move(f), local_state_ref, shard_services_ref);
+         std::move(f), target_shard_services.local_state,
+         target_shard_services);
       });
   }
 
@@ -202,10 +230,14 @@ public:
    */
   template <typename F>
   seastar::future<> for_each_pg(F &&f) const {
-    for (auto &&pg: get_local_state().pg_map.get_pgs()) {
-      std::apply(f, pg);
-    }
-    return seastar::now();
+    return sharded_map_seq(
+      shard_services,
+      [f=std::move(f)](const auto &local_service) mutable {
+       for (auto &pg: local_service.local_state.pg_map.get_pgs()) {
+         std::apply(f, pg);
+       }
+       return seastar::now();
+      });
   }
 
   auto get_num_pgs() const {
index aadc55fea958ab1a3210e3dc4427b47122c68100..f9f0cb393b00f0c11f8ee6430a1cee6c20a874fc 100644 (file)
@@ -35,6 +35,7 @@ namespace crimson::osd {
 
 PerShardState::PerShardState(
   int whoami,
+  ceph::mono_time startup_time,
   crimson::os::FuturizedStore &store)
   : whoami(whoami),
     store(store),
@@ -42,7 +43,8 @@ PerShardState::PerShardState(
     obc_registry(crimson::common::local_conf()),
     next_tid(
       static_cast<ceph_tid_t>(seastar::this_shard_id()) <<
-      (std::numeric_limits<ceph_tid_t>::digits - 8))
+      (std::numeric_limits<ceph_tid_t>::digits - 8)),
+    startup_time(startup_time)
 {
   perf = build_osd_logger(&cct);
   cct.get_perfcounters_collection()->add(perf);
index 54b62dcb2d7f0456fe2f0319f517de0db21a1466..b5f9a388c615310678176714a412ece56f6fa2f8 100644 (file)
@@ -45,6 +45,9 @@ class BufferedRecoveryMessages;
 
 namespace crimson::osd {
 
+// seastar::sharded puts start_single on core 0
+constexpr core_id_t PRIMARY_CORE = 0;
+
 class PGShardManager;
 
 /**
@@ -132,9 +135,16 @@ class PerShardState {
   HeartbeatStampsRef get_hb_stamps(int peer);
   std::map<int, HeartbeatStampsRef> heartbeat_stamps;
 
+  // Time state
+  const ceph::mono_time startup_time;
+  ceph::signedspan get_mnow() const {
+    return ceph::mono_clock::now() - startup_time;
+  }
+
 public:
   PerShardState(
     int whoami,
+    ceph::mono_time startup_time,
     crimson::os::FuturizedStore &store);
 };
 
@@ -158,6 +168,7 @@ public:
     crimson::mon::Client &monc,
     crimson::mgr::Client &mgrc);
 
+private:
   const int whoami;
 
   crimson::common::CephContext cct;
@@ -214,19 +225,13 @@ public:
   seastar::future<> send_pg_temp();
 
   // TODO: add config to control mapping
-  PGShardMapping pg_to_shard_mapping{0, 1};
+  PGShardMapping pg_to_shard_mapping{0, seastar::smp::count};
 
   std::set<pg_t> pg_created;
   seastar::future<> send_pg_created(pg_t pgid);
   seastar::future<> send_pg_created();
   void prune_pg_created();
 
-  // Time state
-  ceph::mono_time startup_time = ceph::mono_clock::now();
-  ceph::signedspan get_mnow() const {
-    return ceph::mono_clock::now() - startup_time;
-  }
-
   struct DirectFinisher {
     void queue(Context *c) {
       c->complete(0);
@@ -263,7 +268,16 @@ class ShardServices : public OSDMapService {
   using local_cached_map_t = OSDMapService::local_cached_map_t;
 
   PerShardState local_state;
-  OSDSingletonState &osd_singleton_state;
+  seastar::sharded<OSDSingletonState> &osd_singleton_state;
+
+  template <typename F, typename... Args>
+  auto with_singleton(F &&f, Args&&... args) {
+    return osd_singleton_state.invoke_on(
+      PRIMARY_CORE,
+      std::forward<F>(f),
+      std::forward<Args>(args)...
+    );
+  }
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
   template <typename... Args>                                  \
@@ -278,17 +292,23 @@ class ShardServices : public OSDMapService {
   }
 
 #define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state)
-#define FORWARD_TO_OSD_SINGLETON(METHOD) \
-  FORWARD(METHOD, METHOD, osd_singleton_state)
 
-  template <typename F, typename... Args>
-  auto with_singleton(F &&f, Args&&... args) {
-    return std::invoke(f, osd_singleton_state, std::forward<Args>(args)...);
+#define FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, TARGET)                \
+  template <typename... Args>                                  \
+  auto METHOD(Args&&... args) {                                        \
+    return with_singleton(                                      \
+      [](auto &local_state, auto&&... args) {                   \
+        return local_state.TARGET(                              \
+         std::forward<decltype(args)>(args)...);               \
+      }, std::forward<Args>(args)...);                         \
   }
+#define FORWARD_TO_OSD_SINGLETON(METHOD) \
+  FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD)
+
 public:
   template <typename... PSSArgs>
   ShardServices(
-    OSDSingletonState &osd_singleton_state,
+    seastar::sharded<OSDSingletonState> &osd_singleton_state,
     PSSArgs&&... args)
     : local_state(std::forward<PSSArgs>(args)...),
       osd_singleton_state(osd_singleton_state) {}
@@ -387,7 +407,7 @@ public:
   FORWARD_TO_OSD_SINGLETON(send_pg_created)
   FORWARD_TO_OSD_SINGLETON(send_alive)
   FORWARD_TO_OSD_SINGLETON(send_pg_temp)
-  FORWARD_CONST(get_mnow, get_mnow, osd_singleton_state)
+  FORWARD_CONST(get_mnow, get_mnow, local_state)
   FORWARD_TO_LOCAL(get_hb_stamps)
 
   FORWARD(pg_created, pg_created, local_state.pg_map)
@@ -397,21 +417,60 @@ public:
   FORWARD(
     get_cached_obc, get_cached_obc, local_state.obc_registry)
 
-  FORWARD(
-    local_request_reservation, request_reservation,
-    osd_singleton_state.local_reserver)
-  FORWARD(
-    local_update_priority, update_priority,
-    osd_singleton_state.local_reserver)
-  FORWARD(
-    local_cancel_reservation, cancel_reservation,
-    osd_singleton_state.local_reserver)
-  FORWARD(
-    remote_request_reservation, request_reservation,
-    osd_singleton_state.remote_reserver)
-  FORWARD(
-    remote_cancel_reservation, cancel_reservation,
-    osd_singleton_state.remote_reserver)
+  FORWARD_TO_OSD_SINGLETON_TARGET(
+    local_update_priority,
+    local_reserver.update_priority)
+  FORWARD_TO_OSD_SINGLETON_TARGET(
+    local_cancel_reservation,
+    local_reserver.cancel_reservation)
+  FORWARD_TO_OSD_SINGLETON_TARGET(
+    remote_cancel_reservation,
+    remote_reserver.cancel_reservation)
+
+  Context *invoke_context_on_core(core_id_t core, Context *c) {
+    if (!c) return nullptr;
+    return new LambdaContext([core, c](int code) {
+      std::ignore = seastar::smp::submit_to(
+       core,
+       [c, code] {
+         c->complete(code);
+       });
+    });
+  }
+  seastar::future<> local_request_reservation(
+    spg_t item,
+    Context *on_reserved,
+    unsigned prio,
+    Context *on_preempt) {
+    return with_singleton(
+      [item, prio](OSDSingletonState &singleton,
+                  Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
+       return singleton.local_reserver.request_reservation(
+         item,
+         wrapped_on_reserved,
+         prio,
+         wrapped_on_preempt);
+      },
+      invoke_context_on_core(seastar::this_shard_id(), on_reserved),
+      invoke_context_on_core(seastar::this_shard_id(), on_preempt));
+  }
+  seastar::future<> remote_request_reservation(
+    spg_t item,
+    Context *on_reserved,
+    unsigned prio,
+    Context *on_preempt) {
+    return with_singleton(
+      [item, prio](OSDSingletonState &singleton,
+                  Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
+       return singleton.remote_reserver.request_reservation(
+         item,
+         wrapped_on_reserved,
+         prio,
+         wrapped_on_preempt);
+      },
+      invoke_context_on_core(seastar::this_shard_id(), on_reserved),
+      invoke_context_on_core(seastar::this_shard_id(), on_preempt));
+  }
 
 #undef FORWARD_CONST
 #undef FORWARD