]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: wire up local and remote async reservers
authorSamuel Just <sjust@redhat.com>
Fri, 20 Sep 2019 22:00:14 +0000 (15:00 -0700)
committerXuehan Xu <xxhdx1985126@163.com>
Sun, 26 Apr 2020 07:46:35 +0000 (15:46 +0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/pg.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 464d4e2bccb1e45bd83976a74a59f599fa49b299..eeb0d1c665078b4b170affecb81c682421272db9 100644 (file)
@@ -10,6 +10,7 @@
 #include <boost/smart_ptr/local_shared_ptr.hpp>
 #include <seastar/core/future.hh>
 #include <seastar/core/shared_future.hh>
+#include <seastar/core/sleep.hh>
 
 #include "common/dout.h"
 #include "crimson/net/Fwd.h"
@@ -174,51 +175,85 @@ public:
     // will be needed for unblocking IO operations/peering
   }
 
+  template <typename T>
+  void start_peering_event_operation(T &&evt) {
+    shard_services.start_operation<LocalPeeringEvent>(
+      this,
+      shard_services,
+      pg_whoami,
+      pgid,
+      std::forward<T>(evt));
+  }
+
   void schedule_event_after(
     PGPeeringEventRef event,
     float delay) final {
-    ceph_assert(0 == "Not implemented yet");
+    // TODO: this is kind of a hack -- once the start_operation call
+    // happens, the operation will be registered, but during the delay
+    // it's just a dangling future.  It would be nice for the
+    // operation machinery to have something to take care of this.
+    (void)seastar::sleep(std::chrono::milliseconds(std::lround(delay*1000))).then(
+      [this, event=std::move(event)]() {
+       start_peering_event_operation(std::move(*event));
+      });
   }
 
   void request_local_background_io_reservation(
     unsigned priority,
     PGPeeringEventRef on_grant,
     PGPeeringEventRef on_preempt) final {
-    ceph_assert(0 == "Not implemented yet");
+    shard_services.local_reserver.request_reservation(
+      pgid,
+      on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+       start_peering_event_operation(std::move(*on_grant));
+      }) : nullptr,
+      priority,
+      on_preempt ? make_lambda_context(
+       [this, on_preempt=std::move(on_preempt)] (int) {
+       start_peering_event_operation(std::move(*on_preempt));
+      }) : nullptr);
   }
 
   void update_local_background_io_priority(
     unsigned priority) final {
-    ceph_assert(0 == "Not implemented yet");
+    shard_services.local_reserver.update_priority(
+      pgid,
+      priority);
   }
 
   void cancel_local_background_io_reservation() final {
-    // Not implemented yet, but gets called on exit() from some states
+    shard_services.local_reserver.cancel_reservation(
+      pgid);
   }
 
   void request_remote_recovery_reservation(
     unsigned priority,
     PGPeeringEventRef on_grant,
     PGPeeringEventRef on_preempt) final {
-    ceph_assert(0 == "Not implemented yet");
+    shard_services.remote_reserver.request_reservation(
+      pgid,
+      on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+       start_peering_event_operation(std::move(*on_grant));
+      }) : nullptr,
+      priority,
+      on_preempt ? make_lambda_context(
+       [this, on_preempt=std::move(on_preempt)] (int) {
+       start_peering_event_operation(std::move(*on_preempt));
+      }) : nullptr);
   }
 
   void cancel_remote_recovery_reservation() final {
-    // Not implemented yet, but gets called on exit() from some states
+    shard_services.remote_reserver.cancel_reservation(
+      pgid);
   }
 
   void schedule_event_on_commit(
     ceph::os::Transaction &t,
     PGPeeringEventRef on_commit) final {
     t.register_on_commit(
-      new LambdaContext(
-       [this, on_commit=std::move(on_commit)](int r){
-         shard_services.start_operation<LocalPeeringEvent>(
-           this,
-           shard_services,
-           pg_whoami,
-           pgid,
-           std::move(*on_commit));
+      make_lambda_context(
+       [this, on_commit=std::move(on_commit)](int) {
+         start_peering_event_operation(std::move(*on_commit));
        }));
   }
 
index 3804280b712ab0d7214171c748a0778e0a570416..b89fd0b8310f24156e9c414d5243fbb13b98f884 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "osd/osd_perf_counters.h"
 #include "osd/PeeringState.h"
+#include "crimson/common/config_proxy.h"
 #include "crimson/mgr/client.h"
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Messenger.h"
@@ -38,13 +39,48 @@ ShardServices::ShardServices(
       monc(monc),
       mgrc(mgrc),
       store(store),
-      obc_registry(crimson::common::local_conf())
+      obc_registry(crimson::common::local_conf()),
+      local_reserver(
+       &cct,
+       &finisher,
+       crimson::common::local_conf()->osd_max_backfills,
+       crimson::common::local_conf()->osd_min_recovery_priority),
+      remote_reserver(
+       &cct,
+       &finisher,
+       crimson::common::local_conf()->osd_max_backfills,
+       crimson::common::local_conf()->osd_min_recovery_priority)
 {
   perf = build_osd_logger(&cct);
   cct.get_perfcounters_collection()->add(perf);
 
   recoverystate_perf = build_recoverystate_perf(&cct);
   cct.get_perfcounters_collection()->add(recoverystate_perf);
+
+  crimson::common::local_conf().add_observer(this);
+}
+
+const char** ShardServices::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "osd_max_backfills",
+    "osd_min_recovery_priority",
+    nullptr
+  };
+  return KEYS;
+}
+
+void ShardServices::handle_conf_change(const ConfigProxy& conf,
+                                      const std::set <std::string> &changed)
+{
+  if (changed.count("osd_max_backfills")) {
+    local_reserver.set_max(conf->osd_max_backfills);
+    remote_reserver.set_max(conf->osd_max_backfills);
+  }
+  if (changed.count("osd_min_recovery_priority")) {
+    local_reserver.set_min_priority(conf->osd_min_recovery_priority);
+    remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
+  }
 }
 
 seastar::future<> ShardServices::send_to_osd(
index 66ba0702714f8fc6d88ab2cb993930207cc95d31..67d03eff09ec62934f7195d77136263c15631d30 100644 (file)
@@ -13,6 +13,7 @@
 #include "osd/PeeringState.h"
 #include "crimson/osd/osdmap_service.h"
 #include "crimson/osd/object_context.h"
+#include "common/AsyncReserver.h"
 
 namespace crimson::net {
   class Messenger;
@@ -39,7 +40,7 @@ namespace crimson::osd {
 /**
  * Represents services available to each PG
  */
-class ShardServices {
+class ShardServices : public md_config_obs_t {
   using cached_map_t = boost::local_shared_ptr<const OSDMap>;
   OSDMapService &osdmap_service;
   crimson::net::Messenger &cluster_msgr;
@@ -53,6 +54,9 @@ class ShardServices {
   PerfCounters *perf = nullptr;
   PerfCounters *recoverystate_perf = nullptr;
 
+  const char** get_tracked_conf_keys() const final;
+  void handle_conf_change(const ConfigProxy& conf,
+                          const std::set <std::string> &changed) final;
 public:
   ShardServices(
     OSDMapService &osdmap_service,
@@ -172,8 +176,18 @@ public:
 
   crimson::osd::ObjectContextRegistry obc_registry;
 
+  // Async Reservers
 private:
   unsigned num_pgs = 0;
+
+  struct DirectFinisher {
+    void queue(Context *c) {
+      c->complete(0);
+    }
+  } finisher;
+public:
+  AsyncReserver<spg_t, DirectFinisher> local_reserver;
+  AsyncReserver<spg_t, DirectFinisher> remote_reserver;
 };
 
 }