]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: introduce pg_shard_manager to clarify shard-local vs osd-wide state
authorSamuel Just <sjust@redhat.com>
Tue, 12 Jul 2022 22:35:44 +0000 (22:35 +0000)
committerSamuel Just <sjust@redhat.com>
Thu, 14 Jul 2022 00:58:25 +0000 (00:58 +0000)
This commits begins to change ShardServices to be the interface by which
PGs access shard local and osd wide state.  Future work will further
clarify this interface boundary and introduce machinery to mediate cold
path access to state on remote shards.

Signed-off-by: Samuel Just <sjust@redhat.com>
14 files changed:
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation_external_tracking.h
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_shard_manager.cc [new file with mode: 0644]
src/crimson/osd/pg_shard_manager.h [new file with mode: 0644]
src/crimson/osd/recovery_backend.cc
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index cfa9f9d71ab019b307d0eaa0dcf876911d9f5064..779a5beca53ad8225139ada9adfd581819c001aa 100644 (file)
@@ -10,6 +10,7 @@ add_executable(crimson-osd
   pg_meta.cc
   replicated_backend.cc
   shard_services.cc
+  pg_shard_manager.cc
   object_context.cc
   ops_executer.cc
   osd_operation.cc
index b08c8dcfa0e73e4c8f4136593abe940448b9da3a..4c9d92fbfe3dfd14a7ec0cc31db4f33a511059c6 100644 (file)
@@ -90,7 +90,10 @@ OSD::OSD(int id, uint32_t nonce,
     monc{new crimson::mon::Client{*public_msgr, *this}},
     mgrc{new crimson::mgr::Client{*public_msgr, *this}},
     store{store},
-    shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, store},
+    pg_shard_manager{
+      static_cast<OSDMapService&>(*this), whoami, *cluster_msgr,
+      *public_msgr, *monc, *mgrc, store},
+    shard_services{pg_shard_manager.get_shard_services()},
     heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
     // do this in background
     tick_timer{[this] {
@@ -329,8 +332,8 @@ seastar::future<> OSD::start()
     superblock = std::move(sb);
     return get_map(superblock.current_epoch);
   }).then([this](cached_map_t&& map) {
-    shard_services.update_map(map);
     osdmap_gate.got_map(map->get_epoch());
+    pg_shard_manager.update_map(map);
     osdmap = std::move(map);
     bind_epoch = osdmap->get_epoch();
     return load_pgs();
@@ -554,12 +557,15 @@ seastar::future<> OSD::start_asok_admin()
     asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
     asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
     // ops commands
-    asok->register_command(make_asok_hook<DumpInFlightOpsHook>(
-      std::as_const(get_shard_services().registry)));
-    asok->register_command(make_asok_hook<DumpHistoricOpsHook>(
-      std::as_const(get_shard_services().registry)));
-    asok->register_command(make_asok_hook<DumpSlowestHistoricOpsHook>(
-      std::as_const(get_shard_services().registry)));
+    asok->register_command(
+      make_asok_hook<DumpInFlightOpsHook>(
+       std::as_const(get_shard_services().get_registry())));
+    asok->register_command(
+      make_asok_hook<DumpHistoricOpsHook>(
+       std::as_const(get_shard_services().get_registry())));
+    asok->register_command(
+      make_asok_hook<DumpSlowestHistoricOpsHook>(
+       std::as_const(get_shard_services().get_registry())));
   });
 }
 
@@ -578,7 +584,7 @@ seastar::future<> OSD::stop()
     return asok->stop().then([this] {
       return heartbeat->stop();
     }).then([this] {
-      return shard_services.stop();
+      return pg_shard_manager.stop_registries();
     }).then([this] {
       return store.umount();
     }).then([this] {
@@ -1150,7 +1156,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
                               [this](epoch_t cur) {
     return get_map(cur).then([this](cached_map_t&& o) {
       osdmap = std::move(o);
-      shard_services.update_map(osdmap);
+      pg_shard_manager.update_map(osdmap);
       if (up_epoch == 0 &&
           osdmap->is_up(whoami) &&
           osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
index 809f2dda0dced3dc8caf0cc5923ea987415bd75f..a6030e2ac91ec55874a171f2439963a1bce7ba3a 100644 (file)
@@ -20,8 +20,7 @@
 #include "crimson/mgr/client.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/osd/osdmap_service.h"
-#include "crimson/osd/state.h"
-#include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg_shard_manager.h"
 #include "crimson/osd/osdmap_gate.h"
 #include "crimson/osd/pg_map.h"
 #include "crimson/osd/osd_operations/peering_event.h"
@@ -111,7 +110,8 @@ class OSD final : public crimson::net::Dispatcher,
   void handle_authentication(const EntityName& name,
                             const AuthCapsInfo& caps) final;
 
-  crimson::osd::ShardServices shard_services;
+  crimson::osd::PGShardManager pg_shard_manager;
+  crimson::osd::ShardServices &shard_services;
 
   std::unique_ptr<Heartbeat> heartbeat;
   seastar::timer<seastar::lowres_clock> tick_timer;
@@ -218,7 +218,7 @@ public:
   OSD_OSDMapGate osdmap_gate;
 
   ShardServices &get_shard_services() {
-    return shard_services;
+    return pg_shard_manager.get_shard_services();
   }
 
   seastar::future<> consume_map(epoch_t epoch);
@@ -252,7 +252,7 @@ public:
 
   template <typename T, typename... Args>
   auto start_pg_operation(Args&&... args) {
-    auto op = shard_services.registry.create_operation<T>(
+    auto op = shard_services.get_registry().create_operation<T>(
       std::forward<Args>(args)...);
     auto &logger = crimson::get_logger(ceph_subsys_osd);
     logger.debug("{}: starting {}", *op, __func__);
index b77d0101e4201bd91e25fe3d2b7e9cfc4e744e98..a2eec6d39d59e931aef32924e93af571b3a5fd25 100644 (file)
@@ -241,8 +241,7 @@ struct HistoricBackend
 
   void handle(ClientRequest::CompletionEvent&, const Operation& op) override {
     if (crimson::common::local_conf()->osd_op_history_size) {
-      const auto& client_op = to_client_request(op);
-      client_op.osd.get_shard_services().registry.put_historic(client_op);
+      to_client_request(op).put_historic();
     }
   }
 };
index 6e8319e92a5b63aa573fb43fb20a715f75532228..116a9010d1c27f5243778908e67a2ce203f7a96e 100644 (file)
@@ -81,7 +81,7 @@ seastar::future<> BackgroundRecoveryT<T>::start()
   return maybe_delay.then([ref, this] {
     return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
       [ref, this] (auto&& trigger) {
-      return ss.throttler.with_throttle_while(
+      return ss.with_throttle_while(
         std::move(trigger),
         this, get_scheduler_params(), [this] {
           return T::interruptor::with_interruption([this] {
index 7d3ea41b50ec927874ff4d25f435ae15afb76983..db531f7bcb242f01680f1bd9da0a44a2138968a8 100644 (file)
@@ -334,4 +334,9 @@ bool ClientRequest::is_misdirected(const PG& pg) const
   return true;
 }
 
+void ClientRequest::put_historic() const
+{
+  osd.get_shard_services().get_registry().put_historic(*this);
+}
+
 }
index 365420b4ecbce079afd1b2cf4dac8c0478469772..1d8b521afbe9d7c0b635bd0eaea849815fb94a19 100644 (file)
@@ -165,6 +165,8 @@ public:
   auto get_completed() const {
     return get_event<CompletionEvent>().get_timestamp();
   };
+
+  void put_historic() const;
 };
 
 }
index 203e4542ae3d808c2fb3eade9b0db6f201146556..cac17c2f7aad637fc6678f80cb6afc87a6dfe178 100644 (file)
@@ -999,7 +999,7 @@ PG::load_obc_iertr::future<>
 PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
 {
   auto [obc, existed] =
-    shard_services.obc_registry.get_cached_obc(std::move(oid));
+    shard_services.get_cached_obc(std::move(oid));
   return with_head_obc<State>(std::move(obc), existed, std::move(func));
 }
 
@@ -1026,7 +1026,7 @@ PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
       logger().error("with_clone_obc: {} clone not found", coid);
       return load_obc_ertr::make_ready_future<>();
     }
-    auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid);
+    auto [clone, existed] = shard_services.get_cached_obc(*coid);
     return clone->template with_lock<State>(
       [coid=*coid, existed=existed,
        head=std::move(head), clone=std::move(clone),
index 236a92b30405b404adcc037ff7c82d1376df6473..7a5d6f7075ed6169b35afde4188ae817271e8cd7 100644 (file)
@@ -221,7 +221,7 @@ public:
     unsigned priority,
     PGPeeringEventURef on_grant,
     PGPeeringEventURef on_preempt) final {
-    shard_services.local_reserver.request_reservation(
+    shard_services.local_request_reservation(
       pgid,
       on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
        start_peering_event_operation(std::move(*on_grant));
@@ -235,13 +235,13 @@ public:
 
   void update_local_background_io_priority(
     unsigned priority) final {
-    shard_services.local_reserver.update_priority(
+    shard_services.local_update_priority(
       pgid,
       priority);
   }
 
   void cancel_local_background_io_reservation() final {
-    shard_services.local_reserver.cancel_reservation(
+    shard_services.local_cancel_reservation(
       pgid);
   }
 
@@ -249,7 +249,7 @@ public:
     unsigned priority,
     PGPeeringEventURef on_grant,
     PGPeeringEventURef on_preempt) final {
-    shard_services.remote_reserver.request_reservation(
+    shard_services.remote_request_reservation(
       pgid,
       on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
        start_peering_event_operation(std::move(*on_grant));
@@ -262,7 +262,7 @@ public:
   }
 
   void cancel_remote_recovery_reservation() final {
-    shard_services.remote_reserver.cancel_reservation(
+    shard_services.remote_cancel_reservation(
       pgid);
   }
 
diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc
new file mode 100644 (file)
index 0000000..5c00651
--- /dev/null
@@ -0,0 +1,28 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/pg_shard_manager.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace crimson::osd {
+
+PGShardManager::PGShardManager(
+  OSDMapService &osdmap_service,
+  const int whoami,
+  crimson::net::Messenger &cluster_msgr,
+  crimson::net::Messenger &public_msgr,
+  crimson::mon::Client &monc,
+  crimson::mgr::Client &mgrc,
+  crimson::os::FuturizedStore &store)
+  : core_state(whoami, osdmap_service, cluster_msgr, public_msgr,
+              monc, mgrc, store),
+    local_state(whoami),
+    shard_services(core_state, local_state)
+{}
+
+}
diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h
new file mode 100644 (file)
index 0000000..571f5cd
--- /dev/null
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sharded.hh>
+
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg_map.h"
+
+namespace crimson::osd {
+
+/**
+ * PGShardManager
+ *
+ * Manages all state required to partition PGs over seastar reactors
+ * as well as state required to route messages to pgs. Mediates access to
+ * shared resources required by PGs (objectstore, messenger, monclient,
+ * etc)
+ */
+class PGShardManager {
+  CoreState core_state;
+  PerShardState local_state;
+  ShardServices shard_services;
+
+public:
+  PGShardManager(
+    OSDMapService &osdmap_service,
+    const int whoami,
+    crimson::net::Messenger &cluster_msgr,
+    crimson::net::Messenger &public_msgr,
+    crimson::mon::Client &monc,
+    crimson::mgr::Client &mgrc,
+    crimson::os::FuturizedStore &store);
+
+  auto &get_shard_services() { return shard_services; }
+
+  void update_map(OSDMapService::cached_map_t map) {
+    core_state.update_map(map);
+    local_state.update_map(map);
+  }
+
+  auto stop_registries() {
+    return local_state.stop_registry();
+  }
+
+  FORWARD_TO_CORE(send_pg_created)
+};
+
+}
index 24d7d00477d6fe5573a37545d25df319b9898cda..76ca1bd8b0dff1b8c1496d0548359b617e86b725 100644 (file)
@@ -179,7 +179,7 @@ RecoveryBackend::scan_for_backfill(
       -> interruptible_future<> {
       crimson::osd::ObjectContextRef obc;
       if (pg.is_primary()) {
-        obc = shard_services.obc_registry.maybe_get_cached_obc(object);
+        obc = shard_services.maybe_get_cached_obc(object);
       }
       if (obc) {
         if (obc->obs.exists) {
index b9d97b2b283b95ad5655c5691ce2a83ec88d35d8..71c3c96ab852011101103de6bf5cd94a75c56d34 100644 (file)
@@ -27,67 +27,49 @@ using std::vector;
 
 namespace crimson::osd {
 
-ShardServices::ShardServices(
-  OSDMapService &osdmap_service,
-  const int whoami,
-  crimson::net::Messenger &cluster_msgr,
-  crimson::net::Messenger &public_msgr,
-  crimson::mon::Client &monc,
-  crimson::mgr::Client &mgrc,
-  crimson::os::FuturizedStore &store)
-    : osdmap_service(osdmap_service),
-      whoami(whoami),
-      cluster_msgr(cluster_msgr),
-      public_msgr(public_msgr),
-      monc(monc),
-      mgrc(mgrc),
-      store(store),
-      throttler(crimson::common::local_conf()),
-      obc_registry(crimson::common::local_conf()),
-      local_reserver(
-       &cct,
-       &finisher,
-       crimson::common::local_conf()->osd_max_backfills,
-       crimson::common::local_conf()->osd_min_recovery_priority),
-      remote_reserver(
-       &cct,
-       &finisher,
-       crimson::common::local_conf()->osd_max_backfills,
-       crimson::common::local_conf()->osd_min_recovery_priority)
+PerShardState::PerShardState(
+  int whoami)
+  : whoami(whoami),
+    throttler(crimson::common::local_conf()),
+    obc_registry(crimson::common::local_conf())
 {
   perf = build_osd_logger(&cct);
   cct.get_perfcounters_collection()->add(perf);
 
   recoverystate_perf = build_recoverystate_perf(&cct);
   cct.get_perfcounters_collection()->add(recoverystate_perf);
-
-  crimson::common::local_conf().add_observer(this);
-}
-
-const char** ShardServices::get_tracked_conf_keys() const
-{
-  static const char* KEYS[] = {
-    "osd_max_backfills",
-    "osd_min_recovery_priority",
-    nullptr
-  };
-  return KEYS;
 }
 
-void ShardServices::handle_conf_change(const ConfigProxy& conf,
-                                      const std::set <std::string> &changed)
+CoreState::CoreState(
+  int whoami,
+  OSDMapService &osdmap_service,
+  crimson::net::Messenger &cluster_msgr,
+  crimson::net::Messenger &public_msgr,
+  crimson::mon::Client &monc,
+  crimson::mgr::Client &mgrc,
+  crimson::os::FuturizedStore &store)
+  : whoami(whoami),
+    osdmap_service(osdmap_service),
+    cluster_msgr(cluster_msgr),
+    public_msgr(public_msgr),
+    monc(monc),
+    mgrc(mgrc),
+    store(store),
+    local_reserver(
+      &cct,
+      &finisher,
+      crimson::common::local_conf()->osd_max_backfills,
+      crimson::common::local_conf()->osd_min_recovery_priority),
+    remote_reserver(
+      &cct,
+      &finisher,
+      crimson::common::local_conf()->osd_max_backfills,
+      crimson::common::local_conf()->osd_min_recovery_priority)
 {
-  if (changed.count("osd_max_backfills")) {
-    local_reserver.set_max(conf->osd_max_backfills);
-    remote_reserver.set_max(conf->osd_max_backfills);
-  }
-  if (changed.count("osd_min_recovery_priority")) {
-    local_reserver.set_min_priority(conf->osd_min_recovery_priority);
-    remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
-  }
+  crimson::common::local_conf().add_observer(this);
 }
 
-seastar::future<> ShardServices::send_to_osd(
+seastar::future<> CoreState::send_to_osd(
   int peer, MessageURef m, epoch_t from_epoch)
 {
   if (osdmap->is_down(peer)) {
@@ -104,54 +86,20 @@ seastar::future<> ShardServices::send_to_osd(
   }
 }
 
-seastar::future<> ShardServices::dispatch_context_transaction(
-  crimson::os::CollectionRef col, PeeringCtx &ctx) {
-  if (ctx.transaction.empty()) {
-    logger().debug("ShardServices::dispatch_context_transaction: empty transaction");
-    return seastar::now();
-  }
-
-  logger().debug("ShardServices::dispatch_context_transaction: do_transaction ...");
-  auto ret = store.do_transaction(
-    col,
-    std::move(ctx.transaction));
-  ctx.reset_transaction();
-  return ret;
-}
-
-seastar::future<> ShardServices::dispatch_context_messages(
-  BufferedRecoveryMessages &&ctx)
-{
-  auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
-    [this](auto& osd_messages) {
-      auto& [peer, messages] = osd_messages;
-      logger().debug("dispatch_context_messages sending messages to {}", peer);
-      return seastar::parallel_for_each(
-        std::move(messages), [=, peer=peer](auto& m) {
-        return send_to_osd(peer, std::move(m), osdmap->get_epoch());
-      });
-    });
-  ctx.message_map.clear();
-  return ret;
-}
-
-seastar::future<> ShardServices::dispatch_context(
-  crimson::os::CollectionRef col,
-  PeeringCtx &&ctx)
+seastar::future<> CoreState::osdmap_subscribe(version_t epoch, bool force_request)
 {
-  ceph_assert(col || ctx.transaction.empty());
-  return seastar::when_all_succeed(
-    dispatch_context_messages(
-      BufferedRecoveryMessages{ctx}),
-    col ? dispatch_context_transaction(col, ctx) : seastar::now()
-  ).then_unpack([] {
+  logger().info("{}({})", __func__, epoch);
+  if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+      force_request) {
+    return monc.renew_subs();
+  } else {
     return seastar::now();
-  });
+  }
 }
 
-void ShardServices::queue_want_pg_temp(pg_t pgid,
-                                   const vector<int>& want,
-                                   bool forced)
+void CoreState::queue_want_pg_temp(pg_t pgid,
+                                      const vector<int>& want,
+                                      bool forced)
 {
   auto p = pg_temp_pending.find(pgid);
   if (p == pg_temp_pending.end() ||
@@ -161,13 +109,13 @@ void ShardServices::queue_want_pg_temp(pg_t pgid,
   }
 }
 
-void ShardServices::remove_want_pg_temp(pg_t pgid)
+void CoreState::remove_want_pg_temp(pg_t pgid)
 {
   pg_temp_wanted.erase(pgid);
   pg_temp_pending.erase(pgid);
 }
 
-void ShardServices::requeue_pg_temp()
+void CoreState::requeue_pg_temp()
 {
   unsigned old_wanted = pg_temp_wanted.size();
   unsigned old_pending = pg_temp_pending.size();
@@ -181,18 +129,7 @@ void ShardServices::requeue_pg_temp()
     pg_temp_wanted.size());
 }
 
-std::ostream& operator<<(
-  std::ostream& out,
-  const ShardServices::pg_temp_t& pg_temp)
-{
-  out << pg_temp.acting;
-  if (pg_temp.forced) {
-    out << " (forced)";
-  }
-  return out;
-}
-
-seastar::future<> ShardServices::send_pg_temp()
+seastar::future<> CoreState::send_pg_temp()
 {
   if (pg_temp_wanted.empty())
     return seastar::now();
@@ -218,17 +155,18 @@ seastar::future<> ShardServices::send_pg_temp()
     });
 }
 
-void ShardServices::update_map(cached_map_t new_osdmap)
-{
-  osdmap = std::move(new_osdmap);
-}
-
-ShardServices::cached_map_t &ShardServices::get_osdmap()
+std::ostream& operator<<(
+  std::ostream& out,
+  const CoreState::pg_temp_t& pg_temp)
 {
-  return osdmap;
+  out << pg_temp.acting;
+  if (pg_temp.forced) {
+    out << " (forced)";
+  }
+  return out;
 }
 
-seastar::future<> ShardServices::send_pg_created(pg_t pgid)
+seastar::future<> CoreState::send_pg_created(pg_t pgid)
 {
   logger().debug(__func__);
   auto o = get_osdmap();
@@ -237,7 +175,7 @@ seastar::future<> ShardServices::send_pg_created(pg_t pgid)
   return monc.send_message(crimson::make_message<MOSDPGCreated>(pgid));
 }
 
-seastar::future<> ShardServices::send_pg_created()
+seastar::future<> CoreState::send_pg_created()
 {
   logger().debug(__func__);
   auto o = get_osdmap();
@@ -248,7 +186,7 @@ seastar::future<> ShardServices::send_pg_created()
     });
 }
 
-void ShardServices::prune_pg_created()
+void CoreState::prune_pg_created()
 {
   logger().debug(__func__);
   auto o = get_osdmap();
@@ -265,18 +203,7 @@ void ShardServices::prune_pg_created()
   }
 }
 
-seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_request)
-{
-  logger().info("{}({})", __func__, epoch);
-  if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
-      force_request) {
-    return monc.renew_subs();
-  } else {
-    return seastar::now();
-  }
-}
-
-HeartbeatStampsRef ShardServices::get_hb_stamps(int peer)
+HeartbeatStampsRef CoreState::get_hb_stamps(int peer)
 {
   auto [stamps, added] = heartbeat_stamps.try_emplace(peer);
   if (added) {
@@ -285,7 +212,7 @@ HeartbeatStampsRef ShardServices::get_hb_stamps(int peer)
   return stamps->second;
 }
 
-seastar::future<> ShardServices::send_alive(const epoch_t want)
+seastar::future<> CoreState::send_alive(const epoch_t want)
 {
   logger().info(
     "{} want={} up_thru_wanted={}",
@@ -314,4 +241,73 @@ seastar::future<> ShardServices::send_alive(const epoch_t want)
   }
 }
 
+const char** CoreState::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "osd_max_backfills",
+    "osd_min_recovery_priority",
+    nullptr
+  };
+  return KEYS;
+}
+
+void CoreState::handle_conf_change(const ConfigProxy& conf,
+                                  const std::set <std::string> &changed)
+{
+  if (changed.count("osd_max_backfills")) {
+    local_reserver.set_max(conf->osd_max_backfills);
+    remote_reserver.set_max(conf->osd_max_backfills);
+  }
+  if (changed.count("osd_min_recovery_priority")) {
+    local_reserver.set_min_priority(conf->osd_min_recovery_priority);
+    remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
+  }
+}
+
+seastar::future<> ShardServices::dispatch_context_transaction(
+  crimson::os::CollectionRef col, PeeringCtx &ctx) {
+  if (ctx.transaction.empty()) {
+    logger().debug("ShardServices::dispatch_context_transaction: empty transaction");
+    return seastar::now();
+  }
+
+  logger().debug("ShardServices::dispatch_context_transaction: do_transaction ...");
+  auto ret = get_store().do_transaction(
+    col,
+    std::move(ctx.transaction));
+  ctx.reset_transaction();
+  return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context_messages(
+  BufferedRecoveryMessages &&ctx)
+{
+  auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
+    [this](auto& osd_messages) {
+      auto& [peer, messages] = osd_messages;
+      logger().debug("dispatch_context_messages sending messages to {}", peer);
+      return seastar::parallel_for_each(
+        std::move(messages), [=, peer=peer](auto& m) {
+        return send_to_osd(peer, std::move(m), local_state.osdmap->get_epoch());
+      });
+    });
+  ctx.message_map.clear();
+  return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context(
+  crimson::os::CollectionRef col,
+  PeeringCtx &&ctx)
+{
+  ceph_assert(col || ctx.transaction.empty());
+  return seastar::when_all_succeed(
+    dispatch_context_messages(
+      BufferedRecoveryMessages{ctx}),
+    col ? dispatch_context_transaction(col, ctx) : seastar::now()
+  ).then_unpack([] {
+    return seastar::now();
+  });
+}
+
+
 };
index 917c06303865ccb069ff0e564684e74caed8b839..d3df2c8f794c5f496ffcd25dc2f0eae961cee911 100644 (file)
@@ -13,7 +13,9 @@
 #include "crimson/os/futurized_collection.h"
 #include "osd/PeeringState.h"
 #include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/osdmap_gate.h"
 #include "crimson/osd/object_context.h"
+#include "crimson/osd/state.h"
 #include "common/AsyncReserver.h"
 
 namespace crimson::net {
@@ -39,59 +41,44 @@ class BufferedRecoveryMessages;
 namespace crimson::osd {
 
 /**
- * Represents services available to each PG
+ * PerShardState
+ *
+ * Per-shard state holding instances local to each shard.
  */
-class ShardServices : public md_config_obs_t {
-  using cached_map_t = boost::local_shared_ptr<const OSDMap>;
-  OSDMapService &osdmap_service;
-  const int whoami;
-  crimson::net::Messenger &cluster_msgr;
-  crimson::net::Messenger &public_msgr;
-  crimson::mon::Client &monc;
-  crimson::mgr::Client &mgrc;
-  crimson::os::FuturizedStore &store;
+class PerShardState {
+  friend class ShardServices;
+  friend class PGShardManager;
 
+  const int whoami;
   crimson::common::CephContext cct;
 
   PerfCounters *perf = nullptr;
   PerfCounters *recoverystate_perf = nullptr;
 
-  const char** get_tracked_conf_keys() const final;
-  void handle_conf_change(const ConfigProxy& conf,
-                          const std::set <std::string> &changed) final;
-
-public:
-  ShardServices(
-    OSDMapService &osdmap_service,
-    const int whoami,
-    crimson::net::Messenger &cluster_msgr,
-    crimson::net::Messenger &public_msgr,
-    crimson::mon::Client &monc,
-    crimson::mgr::Client &mgrc,
-    crimson::os::FuturizedStore &store);
-
-  seastar::future<> send_to_osd(
-    int peer,
-    MessageURef m,
-    epoch_t from_epoch);
+  // Op Management
+  OSDOperationRegistry registry;
+  OperationThrottler throttler;
 
-  crimson::os::FuturizedStore &get_store() {
-    return store;
+  OSDMapService::cached_map_t osdmap;
+  OSDMapService::cached_map_t &get_osdmap() { return osdmap; }
+  void update_map(OSDMapService::cached_map_t new_osdmap) {
+    osdmap = std::move(new_osdmap);
   }
 
-  crimson::common::CephContext *get_cct() {
-    return &cct;
-  }
+  crimson::osd::ObjectContextRegistry obc_registry;
 
-  // OSDMapService
-  const OSDMapService &get_osdmap_service() const {
-    return osdmap_service;
+  // prevent creating new osd operations when system is shutting down,
+  // this is necessary because there are chances that a new operation
+  // is created, after the interruption of all ongoing operations, and
+  // creats and waits on a new and may-never-resolve future, in which
+  // case the shutdown may never succeed.
+  bool stopping = false;
+  seastar::future<> stop_registry() {
+    crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__);
+    stopping = true;
+    return registry.stop();
   }
 
-  // Op Management
-  OSDOperationRegistry registry;
-  OperationThrottler throttler;
-
   template <typename T, typename... Args>
   auto start_operation(Args&&... args) {
     if (__builtin_expect(stopping, false)) {
@@ -107,50 +94,58 @@ public:
     return std::make_pair(std::move(op), std::move(fut));
   }
 
-  seastar::future<> stop() {
-    crimson::get_logger(ceph_subsys_osd).info("ShardServices::{}", __func__);
-    stopping = true;
-    return registry.stop();
-  }
+  PerShardState(int whoami);
+};
 
-  // Loggers
-  PerfCounters &get_recoverystate_perf_logger() {
-    return *recoverystate_perf;
-  }
-  PerfCounters &get_perf_logger() {
-    return *perf;
-  }
+/**
+ * CoreState
+ *
+ * OSD-wide singleton holding instances that need to be accessible
+ * from all PGs.
+ */
+class CoreState : public md_config_obs_t {
+  friend class ShardServices;
+  friend class PGShardManager;
+  CoreState(
+    int whoami,
+    OSDMapService &osdmap_service,
+    crimson::net::Messenger &cluster_msgr,
+    crimson::net::Messenger &public_msgr,
+    crimson::mon::Client &monc,
+    crimson::mgr::Client &mgrc,
+    crimson::os::FuturizedStore &store);
 
-  /// Dispatch and reset ctx transaction
-  seastar::future<> dispatch_context_transaction(
-    crimson::os::CollectionRef col, PeeringCtx &ctx);
+  const int whoami;
 
-  /// Dispatch and reset ctx messages
-  seastar::future<> dispatch_context_messages(
-    BufferedRecoveryMessages &&ctx);
+  crimson::common::CephContext cct;
 
-  /// Dispatch ctx and dispose of context
-  seastar::future<> dispatch_context(
-    crimson::os::CollectionRef col,
-    PeeringCtx &&ctx);
 
-  /// Dispatch ctx and dispose of ctx, transaction must be empty
-  seastar::future<> dispatch_context(
-    PeeringCtx &&ctx) {
-    return dispatch_context({}, std::move(ctx));
+  OSDMapService &osdmap_service;
+  OSDMapService::cached_map_t osdmap;
+  OSDMapService::cached_map_t &get_osdmap() { return osdmap; }
+  void update_map(OSDMapService::cached_map_t new_osdmap) {
+    osdmap = std::move(new_osdmap);
   }
 
-  // -- tids --
-  // for ops i issue
+  crimson::net::Messenger &cluster_msgr;
+  crimson::net::Messenger &public_msgr;
+
+  seastar::future<> send_to_osd(int peer, MessageURef m, epoch_t from_epoch);
+
+  crimson::mon::Client &monc;
+  seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
+
+  crimson::mgr::Client &mgrc;
+
+  crimson::os::FuturizedStore &store;
+
+  // tids for ops i issue
   unsigned int next_tid{0};
   ceph_tid_t get_tid() {
     return (ceph_tid_t)next_tid++;
   }
 
-  // PG Temp State
-private:
-  // TODO: hook into map processing and some kind of heartbeat/peering
-  // message processing
+  // global pg temp state
   struct pg_temp_t {
     std::vector<int> acting;
     bool forced = false;
@@ -158,28 +153,14 @@ private:
   std::map<pg_t, pg_temp_t> pg_temp_wanted;
   std::map<pg_t, pg_temp_t> pg_temp_pending;
   friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
-public:
+
   void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
                          bool forced = false);
   void remove_want_pg_temp(pg_t pgid);
   void requeue_pg_temp();
   seastar::future<> send_pg_temp();
 
-  // Shard-local OSDMap
-private:
-  cached_map_t osdmap;
-public:
-  void update_map(cached_map_t new_osdmap);
-  cached_map_t &get_osdmap();
-
-  // PG Created State
-private:
-  std::set<pg_t> pg_created;
-public:
-  seastar::future<> send_pg_created(pg_t pgid);
-  seastar::future<> send_pg_created();
-  void prune_pg_created();
-
+  unsigned num_pgs = 0;
   unsigned get_pg_num() const {
     return num_pgs;
   }
@@ -190,41 +171,147 @@ public:
     --num_pgs;
   }
 
-  seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
+  std::set<pg_t> pg_created;
+  seastar::future<> send_pg_created(pg_t pgid);
+  seastar::future<> send_pg_created();
+  void prune_pg_created();
 
   // Time state
   ceph::mono_time startup_time = ceph::mono_clock::now();
   ceph::signedspan get_mnow() const {
     return ceph::mono_clock::now() - startup_time;
   }
+
   HeartbeatStampsRef get_hb_stamps(int peer);
   std::map<int, HeartbeatStampsRef> heartbeat_stamps;
 
-  crimson::osd::ObjectContextRegistry obc_registry;
-
-  // Async Reservers
-private:
-  unsigned num_pgs = 0;
-
   struct DirectFinisher {
     void queue(Context *c) {
       c->complete(0);
     }
   } finisher;
-  // prevent creating new osd operations when system is shutting down,
-  // this is necessary because there are chances that a new operation
-  // is created, after the interruption of all ongoing operations, and
-  // creats and waits on a new and may-never-resolve future, in which
-  // case the shutdown may never succeed.
-  bool stopping = false;
-public:
   AsyncReserver<spg_t, DirectFinisher> local_reserver;
   AsyncReserver<spg_t, DirectFinisher> remote_reserver;
 
-private:
   epoch_t up_thru_wanted = 0;
-public:
   seastar::future<> send_alive(epoch_t want);
+
+  const char** get_tracked_conf_keys() const final;
+  void handle_conf_change(
+    const ConfigProxy& conf,
+    const std::set <std::string> &changed) final;
+};
+
+#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
+  template <typename... Args>                                  \
+  auto FROM_METHOD(Args&&... args) const {                     \
+    return TARGET.TO_METHOD(std::forward<Args>(args)...);      \
+  }
+
+#define FORWARD(FROM_METHOD, TO_METHOD, TARGET)                \
+  template <typename... Args>                                  \
+  auto FROM_METHOD(Args&&... args) {                           \
+    return TARGET.TO_METHOD(std::forward<Args>(args)...);      \
+  }
+
+#define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state)
+#define FORWARD_TO_CORE(METHOD) FORWARD(METHOD, METHOD, core_state)
+
+/**
+ * Represents services available to each PG
+ */
+class ShardServices {
+  using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+
+  CoreState &core_state;
+  PerShardState &local_state;
+public:
+  ShardServices(
+    CoreState &core_state,
+    PerShardState &local_state)
+    : core_state(core_state), local_state(local_state) {}
+
+  FORWARD_TO_CORE(send_to_osd)
+
+  crimson::os::FuturizedStore &get_store() {
+    return core_state.store;
+  }
+
+  crimson::common::CephContext *get_cct() {
+    return &(local_state.cct);
+  }
+
+  // OSDMapService
+  const OSDMapService &get_osdmap_service() const {
+    return core_state.osdmap_service;
+  }
+
+  template <typename T, typename... Args>
+  auto start_operation(Args&&... args) {
+    return local_state.start_operation<T>(std::forward<Args>(args)...);
+  }
+
+  auto &get_registry() { return local_state.registry; }
+
+  // Loggers
+  PerfCounters &get_recoverystate_perf_logger() {
+    return *local_state.recoverystate_perf;
+  }
+  PerfCounters &get_perf_logger() {
+    return *local_state.perf;
+  }
+
+  /// Dispatch and reset ctx transaction
+  seastar::future<> dispatch_context_transaction(
+    crimson::os::CollectionRef col, PeeringCtx &ctx);
+
+  /// Dispatch and reset ctx messages
+  seastar::future<> dispatch_context_messages(
+    BufferedRecoveryMessages &&ctx);
+
+  /// Dispatch ctx and dispose of context
+  seastar::future<> dispatch_context(
+    crimson::os::CollectionRef col,
+    PeeringCtx &&ctx);
+
+  /// Dispatch ctx and dispose of ctx, transaction must be empty
+  seastar::future<> dispatch_context(
+    PeeringCtx &&ctx) {
+    return dispatch_context({}, std::move(ctx));
+  }
+
+  FORWARD_TO_LOCAL(get_osdmap)
+  FORWARD_TO_CORE(get_pg_num)
+  FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
+
+  FORWARD_TO_CORE(osdmap_subscribe)
+  FORWARD_TO_CORE(get_tid)
+  FORWARD_TO_CORE(queue_want_pg_temp)
+  FORWARD_TO_CORE(remove_want_pg_temp)
+  FORWARD_TO_CORE(requeue_pg_temp)
+  FORWARD_TO_CORE(send_pg_created)
+  FORWARD_TO_CORE(inc_pg_num)
+  FORWARD_TO_CORE(dec_pg_num)
+  FORWARD_TO_CORE(send_alive)
+  FORWARD_TO_CORE(send_pg_temp)
+  FORWARD_CONST(get_mnow, get_mnow, core_state)
+  FORWARD_TO_CORE(get_hb_stamps)
+
+  FORWARD(
+    maybe_get_cached_obc, maybe_get_cached_obc, local_state.obc_registry)
+  FORWARD(
+    get_cached_obc, get_cached_obc, local_state.obc_registry)
+
+  FORWARD(
+    local_request_reservation, request_reservation, core_state.local_reserver)
+  FORWARD(
+    local_update_priority, update_priority, core_state.local_reserver)
+  FORWARD(
+    local_cancel_reservation, cancel_reservation, core_state.local_reserver)
+  FORWARD(
+    remote_request_reservation, request_reservation, core_state.remote_reserver)
+  FORWARD(
+    remote_cancel_reservation, cancel_reservation, core_state.remote_reserver)
 };
 
 }