]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add background recovery request and necessary interfaces for recover/backfill
authorSamuel Just <sjust@redhat.com>
Wed, 4 Mar 2020 09:52:32 +0000 (17:52 +0800)
committerXuehan Xu <xxhdx1985126@163.com>
Sun, 26 Apr 2020 07:46:35 +0000 (15:46 +0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
Signed-off-by: Xuehan Xu <xxhdx1985126@163.com>
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/osd_operations/background_recovery.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 25f0934030ae8bd86d17cd8eb91c83fd674b3bae..705b1af6b8033cabd813c941daa2743fbdcf0f78 100644 (file)
@@ -19,25 +19,36 @@ namespace {
 namespace crimson::osd {
 
 BackgroundRecovery::BackgroundRecovery(
+  Ref<PG> pg,
   ShardServices &ss,
-  PG &pg,
-  : ss(ss), pg(pg), scheduler_class(scheduler_class)
+  epoch_t epoch_started,
   crimson::osd::scheduler::scheduler_class_t scheduler_class)
+  : pg(pg), ss(ss), epoch_started(epoch_started),
+    scheduler_class(scheduler_class)
 {}
 
 seastar::future<bool> BackgroundRecovery::do_recovery()
 {
-  return seastar::make_ready_future<bool>(false);
+  if (pg->has_reset_since(epoch_started))
+    return seastar::make_ready_future<bool>(false);
+  return with_blocking_future(
+    pg->start_recovery_ops(
+      crimson::common::local_conf()->osd_recovery_max_single_start));
 }
 
 void BackgroundRecovery::print(std::ostream &lhs) const
 {
-  lhs << "BackgroundRecovery(" << pg.get_pgid() << ")";
+  lhs << "BackgroundRecovery(" << pg->get_pgid() << ")";
 }
 
 void BackgroundRecovery::dump_detail(Formatter *f) const
 {
-  f->dump_stream("pgid") << pg.get_pgid();
+  f->dump_stream("pgid") << pg->get_pgid();
+  f->open_object_section("recovery_detail");
+  {
+    // TODO pg->dump_recovery_state(f);
+  }
+  f->close_section();
 }
 
 seastar::future<> BackgroundRecovery::start()
index 57ca7544e2a7242b7317c73f0d81a2d07a60cf79..73f336fc366ca2e3b01c03853bf8f6a45969c7e1 100644 (file)
@@ -18,20 +18,22 @@ public:
   static constexpr OperationTypeCode type = OperationTypeCode::background_recovery;
 
   BackgroundRecovery(
+    Ref<PG> pg,
     ShardServices &ss,
-    PG &pg,
+    epoch_t epoch_started,
     crimson::osd::scheduler::scheduler_class_t scheduler_class);
 
   void print(std::ostream &) const final;
   void dump_detail(Formatter *f) const final;
   seastar::future<> start();
 private:
+  Ref<PG> pg;
   ShardServices &ss;
-  PG &pg;
+  epoch_t epoch_started;
   crimson::osd::scheduler::scheduler_class_t scheduler_class;
 
   auto get_scheduler_params() const {
-    return ceph::osd::scheduler::params_t{
+    return crimson::osd::scheduler::params_t{
       1, // cost
       0, // owner
       scheduler_class
index 9ebaf121efcea11081431bbf13a480a4a081ba26..47681a08c4ec5af7e4fdb4cfa93f084e8ba10b6f 100644 (file)
@@ -850,4 +850,267 @@ void PG::handle_rep_op_reply(crimson::net::Connection* conn,
   backend->got_rep_op_reply(m);
 }
 
+crimson::osd::blocking_future<bool> PG::start_recovery_ops(size_t max_to_start)
+{
+  assert(is_primary());
+  assert(is_peered());
+  assert(!peering_state.is_deleting());
+
+  if (!is_recovering() && !is_backfilling()) {
+    return crimson::osd::make_ready_blocking_future<bool>(false);
+  }
+
+  std::vector<crimson::osd::blocking_future<>> started;
+  started.reserve(max_to_start);
+  max_to_start -= start_primary_recovery_ops(max_to_start, &started);
+  if (max_to_start > 0) {
+    max_to_start -= start_replica_recovery_ops(max_to_start, &started);
+  }
+  if (max_to_start > 0) {
+    max_to_start -= start_backfill_ops(max_to_start, &started);
+  }
+
+  bool done = max_to_start == 0;
+  return crimson::osd::join_blocking_futures(std::move(started)).then([this, done] {
+    return seastar::make_ready_future<bool>(done);
+  });
+}
+
+size_t PG::start_primary_recovery_ops(
+  size_t max_to_start,
+  std::vector<crimson::osd::blocking_future<>> *out)
+{
+  if (!is_recovering()) {
+    return 0;
+  }
+
+  if (!peering_state.have_missing()) {
+    peering_state.local_recovery_complete();
+    return 0;
+  }
+
+  const auto &missing = peering_state.get_pg_log().get_missing();
+
+  logger().info(
+    "{} recovering {} in pg {}, missing {}",
+    __func__, 
+    backend->total_recovering(),
+    *this,
+    missing);
+
+  unsigned started = 0;
+  int skipped = 0;
+
+  map<version_t, hobject_t>::const_iterator p =
+    missing.get_rmissing().lower_bound(peering_state.get_pg_log().get_log().last_requested);
+  while (started < max_to_start && p != missing.get_rmissing().end()) {
+    // TODO: chain futures here to enable yielding to scheduler?
+    hobject_t soid;
+    version_t v = p->first;
+
+    auto it_objects = peering_state.get_pg_log().get_log().objects.find(p->second);
+    if (it_objects != peering_state.get_pg_log().get_log().objects.end()) {
+      // look at log!
+      pg_log_entry_t *latest = it_objects->second;
+      assert(latest->is_update() || latest->is_delete());
+      soid = latest->soid;
+    } else {
+      soid = p->second;
+    }
+    const pg_missing_item& item = missing.get_items().find(p->second)->second;
+    ++p;
+
+    hobject_t head = soid.get_head();
+
+    logger().info(
+      "{} {} item.need {} {} {} {} {}",
+      __func__,
+      soid,
+      item.need,
+      missing.is_missing(soid) ? " (missing)":"",
+      missing.is_missing(head) ? " (missing head)":"",
+      backend->is_recovering(soid) ? " (recovering)":"",
+      backend->is_recovering(head) ? " (recovering head)":"");
+
+    // TODO: handle lost/unfound
+    if (!backend->is_recovering(soid)) {
+      if (backend->is_recovering(head)) {
+       ++skipped;
+      } else {
+       auto futopt = recover_missing(soid, item.need);
+       if (futopt) {
+         out->push_back(std::move(*futopt));
+         ++started;
+       } else {
+         ++skipped;
+       }
+      }
+    }
+
+    if (!skipped)
+      peering_state.set_last_requested(v);
+  }
+
+  logger().info(
+    "{} started {} skipped {}",
+    __func__,
+    started,
+    skipped);
+
+  return started;
+}
+
+std::vector<pg_shard_t> PG::get_replica_recovery_order() const
+{
+  std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
+    async_by_num_missing;
+  replicas_by_num_missing.reserve(
+    peering_state.get_acting_recovery_backfill().size() - 1);
+  for (auto &p: peering_state.get_acting_recovery_backfill()) {
+    if (p == peering_state.get_primary()) {
+      continue;
+    }
+    auto pm = peering_state.get_peer_missing().find(p);
+    assert(pm != peering_state.get_peer_missing().end());
+    auto nm = pm->second.num_missing();
+    if (nm != 0) {
+      if (peering_state.is_async_recovery_target(p)) {
+        async_by_num_missing.push_back(make_pair(nm, p));
+      } else {
+        replicas_by_num_missing.push_back(make_pair(nm, p));
+      }
+    }
+  }
+  // sort by number of missing objects, in ascending order.
+  auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs,
+                 const std::pair<unsigned int, pg_shard_t> &rhs) {
+    return lhs.first < rhs.first;
+  };
+  // acting goes first
+  std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func);
+  // then async_recovery_targets
+  std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func);
+  replicas_by_num_missing.insert(replicas_by_num_missing.end(),
+    async_by_num_missing.begin(), async_by_num_missing.end());
+
+  std::vector<pg_shard_t> ret;
+  ret.reserve(replicas_by_num_missing.size());
+  for (auto p : replicas_by_num_missing) {
+    ret.push_back(p.second);
+  }
+  return ret;
+}
+
+size_t PG::start_replica_recovery_ops(
+  size_t max_to_start,
+  std::vector<crimson::osd::blocking_future<>> *out)
+{
+  if (!is_recovering()) {
+    return 0;
+  }
+  uint64_t started = 0;
+
+  assert(!peering_state.get_acting_recovery_backfill().empty());
+
+  auto recovery_order = get_replica_recovery_order();
+  for (auto &peer: recovery_order) {
+    assert(peer != peering_state.get_primary());
+    auto pm = peering_state.get_peer_missing().find(peer);
+    assert(pm != peering_state.get_peer_missing().end());
+
+    size_t m_sz = pm->second.num_missing();
+
+    logger().debug("{}: peer osd.{} missing {} objects", __func__, peer, m_sz);
+    logger().trace("{}: peer osd.{} missing {}", __func__,
+                  peer, pm->second.get_items());
+
+    // recover oldest first
+    const pg_missing_t &m(pm->second);
+    for (auto p = m.get_rmissing().begin();
+        p != m.get_rmissing().end() && started < max_to_start;
+        ++p) {
+      const auto &soid = p->second;
+
+      if (peering_state.get_missing_loc().is_unfound(soid)) {
+       logger().debug("{}: object {} still unfound", __func__, soid);
+       continue;
+      }
+
+      const pg_info_t &pi = peering_state.get_peer_info(peer);
+      if (soid > pi.last_backfill) {
+       if (!backend->is_recovering(soid)) {
+         logger().error(
+           "{}: object {} in missing set for backfill (last_backfill {})"
+           " but not in recovering",
+           __func__,
+           soid,
+           pi.last_backfill);
+         ceph_abort();
+       }
+       continue;
+      }
+
+      if (backend->is_recovering(soid)) {
+       logger().debug("{}: already recovering object {}", __func__, soid);
+       continue;
+      }
+
+      if (peering_state.get_missing_loc().is_deleted(soid)) {
+       logger().debug("{}: soid {} is a delete, removing", __func__, soid);
+       map<hobject_t,pg_missing_item>::const_iterator r =
+         m.get_items().find(soid);
+       started += prep_object_replica_deletes(
+         soid, r->second.need, out);
+       continue;
+      }
+
+      if (soid.is_snap() &&
+         peering_state.get_pg_log().get_missing().is_missing(
+           soid.get_head())) {
+       logger().debug("{}: head {} still missing on primary",
+                      __func__, soid.get_head());
+       continue;
+      }
+
+      if (peering_state.get_pg_log().get_missing().is_missing(soid)) {
+       logger().debug("{}: soid {} still missing on primary", __func__, soid);
+       continue;
+      }
+
+      logger().debug(
+       "{}: recover_object_replicas({})",
+       __func__,
+       soid);
+      map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(
+       soid);
+      started += prep_object_replica_pushes(
+       soid, r->second.need, out);
+    }
+  }
+
+  return started;
+}
+
+size_t PG::start_backfill_ops(
+  size_t max_to_start,
+  std::vector<crimson::osd::blocking_future<>> *out)
+{
+  logger().debug(
+    "{}({}): bft={} lbs={} {}",
+    __func__,
+    peering_state.get_backfill_targets(),
+    last_backfill_started,
+    new_backfill ? "" : "new_backfill");
+  assert(!peering_state.get_backfill_targets().empty());
+
+  ceph_abort("not implemented!");
+}
+
+std::optional<crimson::osd::blocking_future<>> PG::recover_missing(
+      const hobject_t &hoid, eversion_t need)
+{
+    return std::nullopt;
+}
+
+
 }
index eeb0d1c665078b4b170affecb81c682421272db9..9c43df8b419b882f76c9cd8bfb3ab01180c316d4 100644 (file)
@@ -24,6 +24,7 @@
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
 #include "crimson/osd/shard_services.h"
 #include "crimson/osd/osdmap_gate.h"
 
@@ -96,6 +97,7 @@ public:
     return *backend;
   }
 
+  
   // EpochSource
   epoch_t get_osdmap_epoch() const final {
     return peering_state.get_osdmap_epoch();
@@ -342,15 +344,25 @@ public:
     return 0;
   }
 
+  void start_background_recovery(
+    crimson::osd::scheduler::scheduler_class_t klass) {
+    shard_services.start_operation<BackgroundRecovery>(
+      this,
+      shard_services,
+      get_osdmap_epoch(),
+      klass);
+  }
 
   void on_backfill_reserved() final {
-    ceph_assert(0 == "Not implemented");
+    start_background_recovery(
+      crimson::osd::scheduler::scheduler_class_t::background_best_effort);
   }
   void on_backfill_canceled() final {
     ceph_assert(0 == "Not implemented");
   }
   void on_recovery_reserved() final {
-    ceph_assert(0 == "Not implemented");
+    start_background_recovery(
+      crimson::osd::scheduler::scheduler_class_t::background_recovery);
   }
 
 
@@ -428,6 +440,15 @@ public:
   bool is_primary() const {
     return peering_state.is_primary();
   }
+  bool is_peered() const {
+    return peering_state.is_peered();
+  }
+  bool is_recovering() const {
+    return peering_state.is_recovering();
+  }
+  bool is_backfilling() const {
+    return peering_state.is_backfilling();
+  }
   pg_stat_t get_stats() {
     auto stats = peering_state.prepare_stats_for_publish(
       false,
@@ -550,7 +571,12 @@ private:
 
   PeeringState peering_state;
   eversion_t projected_last_update;
+public:
+  bool has_reset_since(epoch_t epoch) const {
+    return peering_state.pg_has_reset_since(epoch);
+  }
 
+private:
   class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
     PG *pg;
 
@@ -573,7 +599,41 @@ private:
   friend class PGAdvanceMap;
   friend class PeeringEvent;
   friend class RepRequest;
-};
+public:
+  crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
+private:
+  seastar::future<bool> find_unfound() {
+    return seastar::make_ready_future<bool>(true);
+  }
+
+  bool new_backfill;
+  hobject_t last_backfill_started;
+
+  size_t start_primary_recovery_ops(
+    size_t max_to_start,
+    std::vector<crimson::osd::blocking_future<>> *out);
+  size_t start_replica_recovery_ops(
+    size_t max_to_start,
+    std::vector<crimson::osd::blocking_future<>> *out);
+  size_t start_backfill_ops(
+    size_t max_to_start,
+    std::vector<crimson::osd::blocking_future<>> *out);
+
+  std::vector<pg_shard_t> get_replica_recovery_order() const;
+  std::optional<crimson::osd::blocking_future<>> recover_missing(
+    const hobject_t &hoid, eversion_t need);
+
+  size_t prep_object_replica_deletes(
+    const hobject_t& soid,
+    eversion_t need,
+    std::vector<crimson::osd::blocking_future<>> *in_progress);
+
+  size_t prep_object_replica_pushes(
+    const hobject_t& soid,
+    eversion_t need,
+    std::vector<crimson::osd::blocking_future<>> *in_progress) {
+    return 0;
+  }
 
 std::ostream& operator<<(std::ostream&, const PG& pg);