]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/pg: implement queue_check_readable() and recheck_readable()
authorKefu Chai <kchai@redhat.com>
Sun, 29 Sep 2019 17:11:42 +0000 (01:11 +0800)
committerKefu Chai <kchai@redhat.com>
Mon, 30 Sep 2019 08:25:02 +0000 (16:25 +0800)
see also 7aec060e0aff4588ff51f744dd194d1c0f7793c2

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index c6b4331721e29d97adf07bbf5ffba45fe6d9fe22..6de58af4e445c3e61fb4ec602fc12b02c189fe56 100644 (file)
 
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
-#include "crimson/os/futurized_collection.h"
-#include "os/Transaction.h"
 #include "crimson/os/cyan_store.h"
-
+#include "crimson/os/futurized_collection.h"
 #include "crimson/osd/exceptions.h"
 #include "crimson/osd/pg_meta.h"
 #include "crimson/osd/pg_backend.h"
@@ -46,6 +44,15 @@ namespace {
   }
 }
 
+namespace std::chrono {
+std::ostream& operator<<(std::ostream& out, const signedspan& d)
+{
+  auto s = std::chrono::duration_cast<std::chrono::seconds>(d).count();
+  auto ns = std::abs((d % 1s).count());
+  fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : "");
+}
+}
+
 namespace ceph::osd {
 
 using ceph::common::local_conf;
@@ -129,6 +136,61 @@ bool PG::try_flush_or_schedule_async() {
   return false;
 }
 
+void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
+{
+  seastar::sleep(delay).then([last_peering_reset, this] {
+    shard_services.start_operation<LocalPeeringEvent>(
+      this,
+      shard_services,
+      pg_whoami,
+      pgid,
+      last_peering_reset,
+      last_peering_reset,
+      PeeringState::CheckReadable{});
+    });
+}
+
+void PG::recheck_readable()
+{
+  bool changed = false;
+  const auto mnow = shard_services.get_mnow();
+  if (peering_state.state_test(PG_STATE_WAIT)) {
+    auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
+    if (mnow < prior_readable_until_ub) {
+      logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
+                   __func__, mnow, prior_readable_until_ub);
+    } else {
+      logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
+                   __func__, mnow, prior_readable_until_ub);
+      peering_state.state_clear(PG_STATE_WAIT);
+      peering_state.clear_prior_readable_until_ub();
+      changed = true;
+    }
+  }
+  if (peering_state.state_test(PG_STATE_LAGGY)) {
+    auto readable_until = peering_state.get_readable_until();
+    if (readable_until == readable_until.zero()) {
+      logger().info("{} still laggy (mnow {}, readable_until zero)",
+                   __func__, mnow);
+    } else if (mnow >= readable_until) {
+      logger().info("{} still laggy (mnow {} >= readable_until {})",
+                   __func__, mnow, readable_until);
+    } else {
+      logger().info("{} no longer laggy (mnow {} < readable_until {})",
+                   __func__, mnow, readable_until);
+      peering_state.state_clear(PG_STATE_LAGGY);
+      changed = true;
+    }
+  }
+  if (changed) {
+    publish_stats_to_osd();
+    if (!peering_state.state_test(PG_STATE_WAIT) &&
+       !peering_state.state_test(PG_STATE_LAGGY)) {
+      // TODO: requeue ops waiting for readable
+    }
+  }
+}
+
 void PG::on_activate(interval_set<snapid_t>)
 {
   projected_last_update = peering_state.get_info().last_update;
index b25c2f911fcc9546ada6da93d18d919aa475a1fe..6a87b701c1db8446c1622fc50f219edd6e7efe09 100644 (file)
@@ -258,12 +258,9 @@ public:
     // Not needed yet
   }
 
-  void queue_check_readable(epoch_t lpr, ceph::timespan delay) final {
-#warning write me
-  }
-  void recheck_readable() final {
-#warning write me
-  }
+  void queue_check_readable(epoch_t last_peering_reset,
+                           ceph::timespan delay) final;
+  void recheck_readable() final;
 
   void on_pool_change() final {
     // Not needed yet