]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add pg log based recovery machinery
authorXuehan Xu <xxhdx1985126@163.com>
Wed, 4 Mar 2020 09:56:45 +0000 (17:56 +0800)
committerXuehan Xu <xxhdx1985126@163.com>
Sun, 26 Apr 2020 07:46:35 +0000 (15:46 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@163.com>
16 files changed:
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/pg_recovery.cc [new file with mode: 0644]
src/crimson/osd/pg_recovery.h [new file with mode: 0644]
src/crimson/osd/pg_recovery_listener.h [new file with mode: 0644]
src/crimson/osd/recovery_backend.cc [new file with mode: 0644]
src/crimson/osd/recovery_backend.h [new file with mode: 0644]
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_recovery_backend.cc [new file with mode: 0644]
src/crimson/osd/replicated_recovery_backend.h [new file with mode: 0644]
src/osd/PeeringState.cc
src/osd/PeeringState.h

index a4298c97fe7fea54c306e92244daa5db0ef41d2c..ff5ba3aec6caaf85925dd1d9830b04cf3fb403f6 100644 (file)
@@ -19,6 +19,9 @@ add_executable(crimson-osd
   osd_operations/pg_advance_map.cc
   osd_operations/replicated_request.cc
   osd_operations/background_recovery.cc
+  pg_recovery.cc
+  recovery_backend.cc
+  replicated_recovery_backend.cc
   scheduler/scheduler.cc
   scheduler/mclock_scheduler.cc
   osdmap_gate.cc
index 705b1af6b8033cabd813c941daa2743fbdcf0f78..5148d46a65c43cbbc336c75443b3c0904fe018b9 100644 (file)
@@ -32,7 +32,7 @@ seastar::future<bool> BackgroundRecovery::do_recovery()
   if (pg->has_reset_since(epoch_started))
     return seastar::make_ready_future<bool>(false);
   return with_blocking_future(
-    pg->start_recovery_ops(
+    pg->get_recovery_handler()->start_recovery_ops(
       crimson::common::local_conf()->osd_recovery_max_single_start));
 }
 
@@ -57,8 +57,14 @@ seastar::future<> BackgroundRecovery::start()
 
   IRef ref = this;
   return ss.throttler.with_throttle_while(
-    this, get_scheduler_params(), [ref, this] {
+    this, get_scheduler_params(), [this] {
       return do_recovery();
+    }).handle_exception_type([ref, this](const std::system_error& err) {
+      if (err.code() == std::make_error_code(std::errc::interrupted)) {
+       logger().debug("{} recovery interruped: {}", *pg, err.what());
+       return seastar::now();
+      }
+      return seastar::make_exception_future<>(err);
     });
 }
 
index 47681a08c4ec5af7e4fdb4cfa93f084e8ba10b6f..8ee14f17741a382048fa0fb855a57f17090af2e0 100644 (file)
@@ -37,6 +37,8 @@
 #include "crimson/osd/ops_executer.h"
 #include "crimson/osd/osd_operations/osdop_params.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/pg_recovery.h"
+#include "crimson/osd/replicated_recovery_backend.h"
 
 namespace {
   seastar::logger& logger() {
@@ -98,6 +100,11 @@ PG::PG(
        coll_ref,
        shard_services,
        profile)),
+    recovery_backend(
+      std::make_unique<ReplicatedRecoveryBackend>(
+       *this, shard_services, coll_ref, backend.get())),
+    recovery_handler(
+      std::make_unique<PGRecovery>(this)),
     peering_state(
       shard_services.get_cct(),
       pg_shard,
@@ -248,6 +255,8 @@ void PG::on_activate_complete()
       get_osdmap_epoch(),
       PeeringState::RequestBackfill{});
   } else {
+    logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
+                  " for pg: {}", __func__, pgid);
     shard_services.start_operation<LocalPeeringEvent>(
       this,
       shard_services,
@@ -417,7 +426,7 @@ void PG::do_peering_event(
   PGPeeringEvent& evt, PeeringCtx &rctx)
 {
   if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
-    logger().debug("{} handling {}", __func__, evt.get_desc());
+    logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
     do_peering_event(evt.get_event(), rctx);
   } else {
     logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
@@ -850,267 +859,4 @@ void PG::handle_rep_op_reply(crimson::net::Connection* conn,
   backend->got_rep_op_reply(m);
 }
 
-crimson::osd::blocking_future<bool> PG::start_recovery_ops(size_t max_to_start)
-{
-  assert(is_primary());
-  assert(is_peered());
-  assert(!peering_state.is_deleting());
-
-  if (!is_recovering() && !is_backfilling()) {
-    return crimson::osd::make_ready_blocking_future<bool>(false);
-  }
-
-  std::vector<crimson::osd::blocking_future<>> started;
-  started.reserve(max_to_start);
-  max_to_start -= start_primary_recovery_ops(max_to_start, &started);
-  if (max_to_start > 0) {
-    max_to_start -= start_replica_recovery_ops(max_to_start, &started);
-  }
-  if (max_to_start > 0) {
-    max_to_start -= start_backfill_ops(max_to_start, &started);
-  }
-
-  bool done = max_to_start == 0;
-  return crimson::osd::join_blocking_futures(std::move(started)).then([this, done] {
-    return seastar::make_ready_future<bool>(done);
-  });
-}
-
-size_t PG::start_primary_recovery_ops(
-  size_t max_to_start,
-  std::vector<crimson::osd::blocking_future<>> *out)
-{
-  if (!is_recovering()) {
-    return 0;
-  }
-
-  if (!peering_state.have_missing()) {
-    peering_state.local_recovery_complete();
-    return 0;
-  }
-
-  const auto &missing = peering_state.get_pg_log().get_missing();
-
-  logger().info(
-    "{} recovering {} in pg {}, missing {}",
-    __func__, 
-    backend->total_recovering(),
-    *this,
-    missing);
-
-  unsigned started = 0;
-  int skipped = 0;
-
-  map<version_t, hobject_t>::const_iterator p =
-    missing.get_rmissing().lower_bound(peering_state.get_pg_log().get_log().last_requested);
-  while (started < max_to_start && p != missing.get_rmissing().end()) {
-    // TODO: chain futures here to enable yielding to scheduler?
-    hobject_t soid;
-    version_t v = p->first;
-
-    auto it_objects = peering_state.get_pg_log().get_log().objects.find(p->second);
-    if (it_objects != peering_state.get_pg_log().get_log().objects.end()) {
-      // look at log!
-      pg_log_entry_t *latest = it_objects->second;
-      assert(latest->is_update() || latest->is_delete());
-      soid = latest->soid;
-    } else {
-      soid = p->second;
-    }
-    const pg_missing_item& item = missing.get_items().find(p->second)->second;
-    ++p;
-
-    hobject_t head = soid.get_head();
-
-    logger().info(
-      "{} {} item.need {} {} {} {} {}",
-      __func__,
-      soid,
-      item.need,
-      missing.is_missing(soid) ? " (missing)":"",
-      missing.is_missing(head) ? " (missing head)":"",
-      backend->is_recovering(soid) ? " (recovering)":"",
-      backend->is_recovering(head) ? " (recovering head)":"");
-
-    // TODO: handle lost/unfound
-    if (!backend->is_recovering(soid)) {
-      if (backend->is_recovering(head)) {
-       ++skipped;
-      } else {
-       auto futopt = recover_missing(soid, item.need);
-       if (futopt) {
-         out->push_back(std::move(*futopt));
-         ++started;
-       } else {
-         ++skipped;
-       }
-      }
-    }
-
-    if (!skipped)
-      peering_state.set_last_requested(v);
-  }
-
-  logger().info(
-    "{} started {} skipped {}",
-    __func__,
-    started,
-    skipped);
-
-  return started;
-}
-
-std::vector<pg_shard_t> PG::get_replica_recovery_order() const
-{
-  std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
-    async_by_num_missing;
-  replicas_by_num_missing.reserve(
-    peering_state.get_acting_recovery_backfill().size() - 1);
-  for (auto &p: peering_state.get_acting_recovery_backfill()) {
-    if (p == peering_state.get_primary()) {
-      continue;
-    }
-    auto pm = peering_state.get_peer_missing().find(p);
-    assert(pm != peering_state.get_peer_missing().end());
-    auto nm = pm->second.num_missing();
-    if (nm != 0) {
-      if (peering_state.is_async_recovery_target(p)) {
-        async_by_num_missing.push_back(make_pair(nm, p));
-      } else {
-        replicas_by_num_missing.push_back(make_pair(nm, p));
-      }
-    }
-  }
-  // sort by number of missing objects, in ascending order.
-  auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs,
-                 const std::pair<unsigned int, pg_shard_t> &rhs) {
-    return lhs.first < rhs.first;
-  };
-  // acting goes first
-  std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func);
-  // then async_recovery_targets
-  std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func);
-  replicas_by_num_missing.insert(replicas_by_num_missing.end(),
-    async_by_num_missing.begin(), async_by_num_missing.end());
-
-  std::vector<pg_shard_t> ret;
-  ret.reserve(replicas_by_num_missing.size());
-  for (auto p : replicas_by_num_missing) {
-    ret.push_back(p.second);
-  }
-  return ret;
-}
-
-size_t PG::start_replica_recovery_ops(
-  size_t max_to_start,
-  std::vector<crimson::osd::blocking_future<>> *out)
-{
-  if (!is_recovering()) {
-    return 0;
-  }
-  uint64_t started = 0;
-
-  assert(!peering_state.get_acting_recovery_backfill().empty());
-
-  auto recovery_order = get_replica_recovery_order();
-  for (auto &peer: recovery_order) {
-    assert(peer != peering_state.get_primary());
-    auto pm = peering_state.get_peer_missing().find(peer);
-    assert(pm != peering_state.get_peer_missing().end());
-
-    size_t m_sz = pm->second.num_missing();
-
-    logger().debug("{}: peer osd.{} missing {} objects", __func__, peer, m_sz);
-    logger().trace("{}: peer osd.{} missing {}", __func__,
-                  peer, pm->second.get_items());
-
-    // recover oldest first
-    const pg_missing_t &m(pm->second);
-    for (auto p = m.get_rmissing().begin();
-        p != m.get_rmissing().end() && started < max_to_start;
-        ++p) {
-      const auto &soid = p->second;
-
-      if (peering_state.get_missing_loc().is_unfound(soid)) {
-       logger().debug("{}: object {} still unfound", __func__, soid);
-       continue;
-      }
-
-      const pg_info_t &pi = peering_state.get_peer_info(peer);
-      if (soid > pi.last_backfill) {
-       if (!backend->is_recovering(soid)) {
-         logger().error(
-           "{}: object {} in missing set for backfill (last_backfill {})"
-           " but not in recovering",
-           __func__,
-           soid,
-           pi.last_backfill);
-         ceph_abort();
-       }
-       continue;
-      }
-
-      if (backend->is_recovering(soid)) {
-       logger().debug("{}: already recovering object {}", __func__, soid);
-       continue;
-      }
-
-      if (peering_state.get_missing_loc().is_deleted(soid)) {
-       logger().debug("{}: soid {} is a delete, removing", __func__, soid);
-       map<hobject_t,pg_missing_item>::const_iterator r =
-         m.get_items().find(soid);
-       started += prep_object_replica_deletes(
-         soid, r->second.need, out);
-       continue;
-      }
-
-      if (soid.is_snap() &&
-         peering_state.get_pg_log().get_missing().is_missing(
-           soid.get_head())) {
-       logger().debug("{}: head {} still missing on primary",
-                      __func__, soid.get_head());
-       continue;
-      }
-
-      if (peering_state.get_pg_log().get_missing().is_missing(soid)) {
-       logger().debug("{}: soid {} still missing on primary", __func__, soid);
-       continue;
-      }
-
-      logger().debug(
-       "{}: recover_object_replicas({})",
-       __func__,
-       soid);
-      map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(
-       soid);
-      started += prep_object_replica_pushes(
-       soid, r->second.need, out);
-    }
-  }
-
-  return started;
-}
-
-size_t PG::start_backfill_ops(
-  size_t max_to_start,
-  std::vector<crimson::osd::blocking_future<>> *out)
-{
-  logger().debug(
-    "{}({}): bft={} lbs={} {}",
-    __func__,
-    peering_state.get_backfill_targets(),
-    last_backfill_started,
-    new_backfill ? "" : "new_backfill");
-  assert(!peering_state.get_backfill_targets().empty());
-
-  ceph_abort("not implemented!");
-}
-
-std::optional<crimson::osd::blocking_future<>> PG::recover_missing(
-      const hobject_t &hoid, eversion_t need)
-{
-    return std::nullopt;
-}
-
-
 }
index 9c43df8b419b882f76c9cd8bfb3ab01180c316d4..e5f20c701b93da111bd89cb214d47eed8be52bf9 100644 (file)
@@ -27,6 +27,9 @@
 #include "crimson/osd/osd_operations/background_recovery.h"
 #include "crimson/osd/shard_services.h"
 #include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/pg_recovery.h"
+#include "crimson/osd/pg_recovery_listener.h"
+#include "crimson/osd/recovery_backend.h"
 
 class OSDMap;
 class MQuery;
@@ -52,6 +55,7 @@ class ClientRequest;
 class PG : public boost::intrusive_ref_counter<
   PG,
   boost::thread_unsafe_counter>,
+  public PGRecoveryListener,
   PeeringState::PeeringListener,
   DoutPrefixProvider
 {
@@ -82,11 +86,11 @@ public:
 
   ~PG();
 
-  const pg_shard_t& get_pg_whoami() const {
+  const pg_shard_t& get_pg_whoami() const final {
     return pg_whoami;
   }
 
-  const spg_t& get_pgid() const {
+  const spg_t& get_pgid() const final {
     return pgid;
   }
 
@@ -96,8 +100,6 @@ public:
   const PGBackend& get_backend() const {
     return *backend;
   }
-
-  
   // EpochSource
   epoch_t get_osdmap_epoch() const final {
     return peering_state.get_osdmap_epoch();
@@ -199,7 +201,9 @@ public:
        start_peering_event_operation(std::move(*event));
       });
   }
-
+  std::vector<pg_shard_t> get_replica_recovery_order() const final {
+    return peering_state.get_replica_recovery_order();
+  }
   void request_local_background_io_reservation(
     unsigned priority,
     PGPeeringEventRef on_grant,
@@ -303,7 +307,7 @@ public:
     // Not needed yet
   }
   void on_change(ceph::os::Transaction &t) final {
-    // Not needed yet
+    recovery_backend->on_peering_interval_change(t);
   }
   void on_activate(interval_set<snapid_t> to_trim) final;
   void on_activate_complete() final;
@@ -344,24 +348,15 @@ public:
     return 0;
   }
 
-  void start_background_recovery(
-    crimson::osd::scheduler::scheduler_class_t klass) {
-    shard_services.start_operation<BackgroundRecovery>(
-      this,
-      shard_services,
-      get_osdmap_epoch(),
-      klass);
-  }
-
   void on_backfill_reserved() final {
-    start_background_recovery(
+    recovery_handler->start_background_recovery(
       crimson::osd::scheduler::scheduler_class_t::background_best_effort);
   }
   void on_backfill_canceled() final {
     ceph_assert(0 == "Not implemented");
   }
   void on_recovery_reserved() final {
-    start_background_recovery(
+    recovery_handler->start_background_recovery(
       crimson::osd::scheduler::scheduler_class_t::background_recovery);
   }
 
@@ -437,16 +432,16 @@ public:
 
 
   // Utility
-  bool is_primary() const {
+  bool is_primary() const final {
     return peering_state.is_primary();
   }
-  bool is_peered() const {
+  bool is_peered() const final {
     return peering_state.is_peered();
   }
-  bool is_recovering() const {
+  bool is_recovering() const final {
     return peering_state.is_recovering();
   }
-  bool is_backfilling() const {
+  bool is_backfilling() const final {
     return peering_state.is_backfilling();
   }
   pg_stat_t get_stats() {
@@ -565,17 +560,69 @@ public:
     return eversion_t(projected_last_update.epoch,
                      ++projected_last_update.version);
   }
-
+  ShardServices& get_shard_services() final {
+    return shard_services;
+  }
 private:
   std::unique_ptr<PGBackend> backend;
+  std::unique_ptr<RecoveryBackend> recovery_backend;
+  std::unique_ptr<PGRecovery> recovery_handler;
 
   PeeringState peering_state;
   eversion_t projected_last_update;
 public:
-  bool has_reset_since(epoch_t epoch) const {
+  RecoveryBackend* get_recovery_backend() final {
+    return recovery_backend.get();
+  }
+  PGRecovery* get_recovery_handler() final {
+    return recovery_handler.get();
+  }
+  PeeringState& get_peering_state() final {
+    return peering_state;
+  }
+  bool has_reset_since(epoch_t epoch) const final {
     return peering_state.pg_has_reset_since(epoch);
   }
 
+  const pg_missing_tracker_t& get_local_missing() const {
+    return peering_state.get_pg_log().get_missing();
+  }
+  epoch_t get_last_peering_reset() const {
+    return peering_state.get_last_peering_reset();
+  }
+  const set<pg_shard_t> &get_acting_recovery_backfill() const {
+    return peering_state.get_acting_recovery_backfill();
+  }
+  void begin_peer_recover(pg_shard_t peer, const hobject_t oid) {
+    peering_state.begin_peer_recover(peer, oid);
+  }
+  uint64_t min_peer_features() const {
+    return peering_state.get_min_peer_features();
+  }
+  const map<hobject_t, set<pg_shard_t>>&
+  get_missing_loc_shards() const {
+    return peering_state.get_missing_loc().get_missing_locs();
+  }
+  const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
+    return peering_state.get_peer_missing();
+  }
+  const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
+    if (shard == pg_whoami)
+      return &get_local_missing();
+    else {
+      auto it = peering_state.get_peer_missing().find(shard);
+      if (it == peering_state.get_peer_missing().end())
+       return nullptr;
+      else
+       return &it->second;
+    }
+  }
+  int get_recovery_op_priority() const {
+    int64_t pri = 0;
+    get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+    return  pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+  }
+
 private:
   class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
     PG *pg;
@@ -599,41 +646,23 @@ private:
   friend class PGAdvanceMap;
   friend class PeeringEvent;
   friend class RepRequest;
-public:
-  crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
 private:
   seastar::future<bool> find_unfound() {
     return seastar::make_ready_future<bool>(true);
   }
 
-  bool new_backfill;
-  hobject_t last_backfill_started;
-
-  size_t start_primary_recovery_ops(
-    size_t max_to_start,
-    std::vector<crimson::osd::blocking_future<>> *out);
-  size_t start_replica_recovery_ops(
-    size_t max_to_start,
-    std::vector<crimson::osd::blocking_future<>> *out);
-  size_t start_backfill_ops(
-    size_t max_to_start,
-    std::vector<crimson::osd::blocking_future<>> *out);
-
-  std::vector<pg_shard_t> get_replica_recovery_order() const;
-  std::optional<crimson::osd::blocking_future<>> recover_missing(
-    const hobject_t &hoid, eversion_t need);
-
-  size_t prep_object_replica_deletes(
-    const hobject_t& soid,
-    eversion_t need,
-    std::vector<crimson::osd::blocking_future<>> *in_progress);
-
-  size_t prep_object_replica_pushes(
-    const hobject_t& soid,
-    eversion_t need,
-    std::vector<crimson::osd::blocking_future<>> *in_progress) {
-    return 0;
+  bool is_missing_object(const hobject_t& soid) const {
+    return peering_state.get_pg_log().get_missing().get_items().count(soid);
+  }
+  bool is_unreadable_object(const hobject_t &oid) const final {
+    return is_missing_object(oid) ||
+      !peering_state.get_missing_loc().readable_with_acting(
+       oid, get_actingset());
+  }
+  const set<pg_shard_t> &get_actingset() const {
+    return peering_state.get_actingset();
   }
+};
 
 std::ostream& operator<<(std::ostream&, const PG& pg);
 
index 72f32f61881692c66841f8216f7668070fddefd4..a079ff5e9991533ab02c2e5e7f458d295455c014 100644 (file)
 #include "os/Transaction.h"
 #include "common/Clock.h"
 
-#include "crimson/os/cyanstore/cyan_object.h"
 #include "crimson/os/futurized_collection.h"
 #include "crimson/os/futurized_store.h"
 #include "crimson/osd/osd_operation.h"
 #include "replicated_backend.h"
+#include "replicated_recovery_backend.h"
 #include "ec_backend.h"
 #include "exceptions.h"
 
@@ -199,6 +199,7 @@ PGBackend::read(const object_info_t& oi,
   return _read(oi.soid, offset, length, flags).safe_then(
     [&oi](auto&& bl) -> read_errorator::future<ceph::bufferlist> {
       if (const bool is_fine = _read_verify_data(oi, bl); is_fine) {
+       logger().debug("read: data length: {}", bl.length());
         return read_errorator::make_ready_future<bufferlist>(std::move(bl));
       } else {
         return crimson::ct_error::object_corrupted::make();
@@ -480,6 +481,13 @@ maybe_get_omap_vals(
   }
 }
 
+seastar::future<ceph::bufferlist> PGBackend::omap_get_header(
+  crimson::os::CollectionRef& c,
+  const ghobject_t& oid)
+{
+  return store->omap_get_header(c, oid);
+}
+
 seastar::future<> PGBackend::omap_get_keys(
   const ObjectState& os,
   OSDOp& osd_op) const
@@ -622,3 +630,21 @@ seastar::future<> PGBackend::omap_set_vals(
   os.oi.clear_omap_digest();
   return seastar::now();
 }
+
+seastar::future<struct stat> PGBackend::stat(
+  CollectionRef c,
+  const ghobject_t& oid) const
+{
+  return store->stat(c, oid);
+}
+
+seastar::future<std::map<uint64_t, uint64_t>>
+PGBackend::fiemap(
+  CollectionRef c,
+  const ghobject_t& oid,
+  uint64_t off,
+  uint64_t len)
+{
+  return store->fiemap(c, oid, off, len);
+}
+
index 13dfb30cff3acc0065897c0c2e12d917e71e5afd..289c67f27bd6ff6eae5b324d36fb6322d0fb6fcd 100644 (file)
@@ -7,13 +7,16 @@
 #include <memory>
 #include <string>
 #include <boost/smart_ptr/local_shared_ptr.hpp>
+#include <boost/container/flat_set.hpp>
 
 #include "crimson/os/futurized_store.h"
 #include "crimson/os/futurized_collection.h"
 #include "crimson/osd/acked_peers.h"
+#include "crimson/osd/pg.h"
 #include "crimson/common/shared_lru.h"
 #include "osd/osd_types.h"
 #include "crimson/osd/object_context.h"
+#include "crimson/osd/osd_operation.h"
 #include "crimson/osd/osd_operations/osdop_params.h"
 
 struct hobject_t;
@@ -36,6 +39,8 @@ protected:
   using ll_read_errorator = crimson::os::FuturizedStore::read_errorator;
 
 public:
+  using load_metadata_ertr = crimson::errorator<
+    crimson::ct_error::object_corrupted>;
   PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store);
   virtual ~PGBackend() = default;
   static std::unique_ptr<PGBackend> create(pg_t pgid,
@@ -44,7 +49,8 @@ public:
                                           crimson::os::CollectionRef coll,
                                           crimson::osd::ShardServices& shard_services,
                                           const ec_profile_t& ec_profile);
-
+  using attrs_t =
+    std::map<std::string, ceph::bufferptr, std::less<>>;
   using read_errorator = ll_read_errorator::extend<
     crimson::ct_error::object_corrupted>;
   read_errorator::future<ceph::bufferlist> read(
@@ -97,6 +103,14 @@ public:
   get_attr_errorator::future<ceph::bufferptr> getxattr(
     const hobject_t& soid,
     std::string_view key) const;
+  seastar::future<struct stat> stat(
+    CollectionRef c,
+    const ghobject_t& oid) const;
+  seastar::future<std::map<uint64_t, uint64_t>> fiemap(
+    CollectionRef c,
+    const ghobject_t& oid,
+    uint64_t off,
+    uint64_t len);
 
   // OMAP
   seastar::future<> omap_get_keys(
@@ -112,17 +126,16 @@ public:
     ObjectState& os,
     const OSDOp& osd_op,
     ceph::os::Transaction& trans);
+  seastar::future<ceph::bufferlist> omap_get_header(
+    crimson::os::CollectionRef& c,
+    const ghobject_t& oid);
 
   virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
-
 protected:
   const shard_id_t shard;
   CollectionRef coll;
   crimson::os::FuturizedStore* store;
-
 public:
-  using load_metadata_ertr = crimson::errorator<
-    crimson::ct_error::object_corrupted>;
   struct loaded_object_md_t {
     ObjectState os;
     std::optional<SnapSet> ss;
@@ -146,4 +159,5 @@ private:
                      const osd_op_params_t& osd_op_p,
                      epoch_t min_epoch, epoch_t max_epoch,
                      std::vector<pg_log_entry_t>&& log_entries) = 0;
+  friend class ReplicatedRecoveryBackend;
 };
diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc
new file mode 100644 (file)
index 0000000..af63df6
--- /dev/null
@@ -0,0 +1,389 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_backend.h"
+#include "crimson/osd/pg_recovery.h"
+
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
+
+#include "osd/osd_types.h"
+#include "osd/PeeringState.h"
+
+void PGRecovery::start_background_recovery(
+  crimson::osd::scheduler::scheduler_class_t klass)
+{
+  using BackgroundRecovery = crimson::osd::BackgroundRecovery;
+  pg->get_shard_services().start_operation<BackgroundRecovery>(
+    static_cast<crimson::osd::PG*>(pg),
+    pg->get_shard_services(),
+    pg->get_osdmap_epoch(),
+    klass);
+}
+
+crimson::osd::blocking_future<bool>
+PGRecovery::start_recovery_ops(size_t max_to_start)
+{
+  assert(pg->is_primary());
+  assert(pg->is_peered());
+  assert(!pg->get_peering_state().is_deleting());
+
+  if (!pg->is_recovering() && !pg->is_backfilling()) {
+    return crimson::osd::make_ready_blocking_future<bool>(false);
+  }
+
+  std::vector<crimson::osd::blocking_future<>> started;
+  started.reserve(max_to_start);
+  max_to_start -= start_primary_recovery_ops(max_to_start, &started);
+  if (max_to_start > 0) {
+    max_to_start -= start_replica_recovery_ops(max_to_start, &started);
+  }
+  if (max_to_start > 0) {
+    max_to_start -= start_backfill_ops(max_to_start, &started);
+  }
+  return crimson::osd::join_blocking_futures(std::move(started)).then(
+    [this] {
+    bool done = !pg->get_peering_state().needs_recovery();
+    if (done) {
+      crimson::get_logger(ceph_subsys_osd).debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
+                    pg->get_pgid());
+      using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
+      pg->get_shard_services().start_operation<LocalPeeringEvent>(
+       static_cast<crimson::osd::PG*>(pg),
+       pg->get_shard_services(),
+       pg->get_pg_whoami(),
+       pg->get_pgid(),
+       pg->get_osdmap_epoch(),
+       pg->get_osdmap_epoch(),
+       PeeringState::AllReplicasRecovered{});
+    }
+    return seastar::make_ready_future<bool>(!done);
+  });
+}
+
+size_t PGRecovery::start_primary_recovery_ops(
+  size_t max_to_start,
+  std::vector<crimson::osd::blocking_future<>> *out)
+{
+  if (!pg->is_recovering()) {
+    return 0;
+  }
+
+  if (!pg->get_peering_state().have_missing()) {
+    pg->get_peering_state().local_recovery_complete();
+    return 0;
+  }
+
+  const auto &missing = pg->get_peering_state().get_pg_log().get_missing();
+
+  crimson::get_logger(ceph_subsys_osd).info(
+    "{} recovering {} in pg {}, missing {}",
+    __func__,
+    pg->get_recovery_backend()->total_recovering(),
+    *static_cast<crimson::osd::PG*>(pg),
+    missing);
+
+  unsigned started = 0;
+  int skipped = 0;
+
+  map<version_t, hobject_t>::const_iterator p =
+    missing.get_rmissing().lower_bound(pg->get_peering_state().get_pg_log().get_log().last_requested);
+  while (started < max_to_start && p != missing.get_rmissing().end()) {
+    // TODO: chain futures here to enable yielding to scheduler?
+    hobject_t soid;
+    version_t v = p->first;
+
+    auto it_objects = pg->get_peering_state().get_pg_log().get_log().objects.find(p->second);
+    if (it_objects != pg->get_peering_state().get_pg_log().get_log().objects.end()) {
+      // look at log!
+      pg_log_entry_t *latest = it_objects->second;
+      assert(latest->is_update() || latest->is_delete());
+      soid = latest->soid;
+    } else {
+      soid = p->second;
+    }
+    const pg_missing_item& item = missing.get_items().find(p->second)->second;
+    ++p;
+
+    hobject_t head = soid.get_head();
+
+    crimson::get_logger(ceph_subsys_osd).info(
+      "{} {} item.need {} {} {} {} {}",
+      __func__,
+      soid,
+      item.need,
+      missing.is_missing(soid) ? " (missing)":"",
+      missing.is_missing(head) ? " (missing head)":"",
+      pg->get_recovery_backend()->is_recovering(soid) ? " (recovering)":"",
+      pg->get_recovery_backend()->is_recovering(head) ? " (recovering head)":"");
+
+    // TODO: handle lost/unfound
+    if (!pg->get_recovery_backend()->is_recovering(soid)) {
+      if (pg->get_recovery_backend()->is_recovering(head)) {
+       ++skipped;
+      } else {
+       auto futopt = recover_missing(soid, item.need);
+       if (futopt) {
+         out->push_back(std::move(*futopt));
+         ++started;
+       } else {
+         ++skipped;
+       }
+      }
+    }
+
+    if (!skipped)
+      pg->get_peering_state().set_last_requested(v);
+  }
+
+  crimson::get_logger(ceph_subsys_osd).info(
+    "{} started {} skipped {}",
+    __func__,
+    started,
+    skipped);
+
+  return started;
+}
+
+size_t PGRecovery::start_replica_recovery_ops(
+  size_t max_to_start,
+  std::vector<crimson::osd::blocking_future<>> *out)
+{
+  if (!pg->is_recovering()) {
+    return 0;
+  }
+  uint64_t started = 0;
+
+  assert(!pg->get_peering_state().get_acting_recovery_backfill().empty());
+
+  auto recovery_order = get_replica_recovery_order();
+  for (auto &peer : recovery_order) {
+    assert(peer != pg->get_peering_state().get_primary());
+    auto pm = pg->get_peering_state().get_peer_missing().find(peer);
+    assert(pm != pg->get_peering_state().get_peer_missing().end());
+
+    size_t m_sz = pm->second.num_missing();
+
+    crimson::get_logger(ceph_subsys_osd).debug(
+       "{}: peer osd.{} missing {} objects",
+       __func__,
+       peer,
+       m_sz);
+    crimson::get_logger(ceph_subsys_osd).trace(
+       "{}: peer osd.{} missing {}", __func__,
+       peer, pm->second.get_items());
+
+    // recover oldest first
+    const pg_missing_t &m(pm->second);
+    for (auto p = m.get_rmissing().begin();
+        p != m.get_rmissing().end() && started < max_to_start;
+        ++p) {
+      const auto &soid = p->second;
+
+      if (pg->get_peering_state().get_missing_loc().is_unfound(soid)) {
+       crimson::get_logger(ceph_subsys_osd).debug(
+           "{}: object {} still unfound", __func__, soid);
+       continue;
+      }
+
+      const pg_info_t &pi = pg->get_peering_state().get_peer_info(peer);
+      if (soid > pi.last_backfill) {
+       if (!pg->get_recovery_backend()->is_recovering(soid)) {
+         crimson::get_logger(ceph_subsys_osd).error(
+           "{}: object {} in missing set for backfill (last_backfill {})"
+           " but not in recovering",
+           __func__,
+           soid,
+           pi.last_backfill);
+         ceph_abort();
+       }
+       continue;
+      }
+
+      if (pg->get_recovery_backend()->is_recovering(soid)) {
+       crimson::get_logger(ceph_subsys_osd).debug(
+           "{}: already recovering object {}", __func__, soid);
+       continue;
+      }
+
+      if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
+       crimson::get_logger(ceph_subsys_osd).debug(
+           "{}: soid {} is a delete, removing", __func__, soid);
+       map<hobject_t,pg_missing_item>::const_iterator r =
+         m.get_items().find(soid);
+       started += prep_object_replica_deletes(
+         soid, r->second.need, out);
+       continue;
+      }
+
+      if (soid.is_snap() &&
+         pg->get_peering_state().get_pg_log().get_missing().is_missing(
+           soid.get_head())) {
+       crimson::get_logger(ceph_subsys_osd).debug(
+           "{}: head {} still missing on primary",
+           __func__, soid.get_head());
+       continue;
+      }
+
+      if (pg->get_peering_state().get_pg_log().get_missing().is_missing(soid)) {
+       crimson::get_logger(ceph_subsys_osd).debug(
+           "{}: soid {} still missing on primary", __func__, soid);
+       continue;
+      }
+
+      crimson::get_logger(ceph_subsys_osd).debug(
+       "{}: recover_object_replicas({})",
+       __func__,
+       soid);
+      map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(
+       soid);
+      started += prep_object_replica_pushes(
+       soid, r->second.need, out);
+    }
+  }
+
+  return started;
+}
+
+size_t PGRecovery::start_backfill_ops(
+  size_t max_to_start,
+  std::vector<crimson::osd::blocking_future<>> *out)
+{
+  assert(!pg->get_peering_state().get_backfill_targets().empty());
+
+  ceph_abort("not implemented!");
+}
+
+std::optional<crimson::osd::blocking_future<>> PGRecovery::recover_missing(
+  const hobject_t &soid, eversion_t need)
+{
+  if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
+    return pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
+       pg->get_recovery_backend()->recover_delete(soid, need));
+  } else {
+    return pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
+      pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
+       [=, soid = std::move(soid)] (auto e) {
+       on_failed_recover({ pg->get_pg_whoami() }, soid, need);
+       return seastar::make_ready_future<>();
+      })
+    );
+  }
+}
+
+size_t PGRecovery::prep_object_replica_deletes(
+  const hobject_t& soid,
+  eversion_t need,
+  std::vector<crimson::osd::blocking_future<>> *in_progress)
+{
+  in_progress->push_back(
+    pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
+      pg->get_recovery_backend()->push_delete(soid, need).then([=] {
+       object_stat_sum_t stat_diff;
+       stat_diff.num_objects_recovered = 1;
+       on_global_recover(soid, stat_diff, true);
+       return seastar::make_ready_future<>();
+      })
+    )
+  );
+  return 1;
+}
+
+size_t PGRecovery::prep_object_replica_pushes(
+  const hobject_t& soid,
+  eversion_t need,
+  std::vector<crimson::osd::blocking_future<>> *in_progress)
+{
+  in_progress->push_back(
+    pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
+      pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
+       [=, soid = std::move(soid)] (auto e) {
+       on_failed_recover({ pg->get_pg_whoami() }, soid, need);
+       return seastar::make_ready_future<>();
+      })
+    )
+  );
+  return 1;
+}
+
+void PGRecovery::on_local_recover(
+  const hobject_t& soid,
+  const ObjectRecoveryInfo& recovery_info,
+  const bool is_delete,
+  ceph::os::Transaction& t)
+{
+  pg->get_peering_state().recover_got(soid,
+      recovery_info.version, is_delete, t);
+
+  if (pg->is_primary()) {
+    if (!is_delete) {
+      auto& obc = pg->get_recovery_backend()->get_recovering(soid).obc; //TODO: move to pg backend?
+      obc->obs.exists = true;
+      obc->obs.oi = recovery_info.oi;
+      // obc is loaded the excl lock
+      obc->put_lock_type(RWState::RWEXCL);
+      assert(obc->get_recovery_read());
+    }
+    if (!pg->is_unreadable_object(soid)) {
+      pg->get_recovery_backend()->get_recovering(soid).set_readable();
+    }
+  }
+}
+
+void PGRecovery::on_global_recover (
+  const hobject_t& soid,
+  const object_stat_sum_t& stat_diff,
+  const bool is_delete)
+{
+  pg->get_peering_state().object_recovered(soid, stat_diff);
+  auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
+  if (!is_delete)
+    recovery_waiter.obc->drop_recovery_read();
+  recovery_waiter.set_recovered();
+  pg->get_recovery_backend()->remove_recovering(soid);
+}
+
+void PGRecovery::on_failed_recover(
+  const set<pg_shard_t>& from,
+  const hobject_t& soid,
+  const eversion_t& v)
+{
+  for (auto pg_shard : from) {
+    if (pg_shard != pg->get_pg_whoami()) {
+      pg->get_peering_state().force_object_missing(pg_shard, soid, v);
+    }
+  }
+}
+
+void PGRecovery::on_peer_recover(
+  pg_shard_t peer,
+  const hobject_t &oid,
+  const ObjectRecoveryInfo &recovery_info)
+{
+  crimson::get_logger(ceph_subsys_osd).debug(
+      "{}: {}, {} on {}", __func__, oid,
+      recovery_info.version, peer);
+  pg->get_peering_state().on_peer_recover(peer, oid, recovery_info.version);
+}
+
+void PGRecovery::_committed_pushed_object(epoch_t epoch,
+                             eversion_t last_complete)
+{
+  if (!pg->has_reset_since(epoch)) {
+    pg->get_peering_state().recovery_committed_to(last_complete);
+  } else {
+    crimson::get_logger(ceph_subsys_osd).debug(
+       "{} pg has changed, not touching last_complete_ondisk",
+       __func__);
+  }
+}
diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h
new file mode 100644 (file)
index 0000000..ccc4623
--- /dev/null
@@ -0,0 +1,79 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/pg_recovery_listener.h"
+#include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/osd/shard_services.h"
+
+#include "osd/object_state.h"
+
+class PGBackend;
+
+class PGRecovery {
+public:
+  PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
+  virtual ~PGRecovery() {}
+  void start_background_recovery(
+    crimson::osd::scheduler::scheduler_class_t klass);
+
+  crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
+
+private:
+  PGRecoveryListener* pg;
+  size_t start_primary_recovery_ops(
+    size_t max_to_start,
+    std::vector<crimson::osd::blocking_future<>> *out);
+  size_t start_replica_recovery_ops(
+    size_t max_to_start,
+    std::vector<crimson::osd::blocking_future<>> *out);
+  size_t start_backfill_ops(
+    size_t max_to_start,
+    std::vector<crimson::osd::blocking_future<>> *out);
+
+  std::vector<pg_shard_t> get_replica_recovery_order() const {
+    return pg->get_replica_recovery_order();
+  }
+  std::optional<crimson::osd::blocking_future<>> recover_missing(
+    const hobject_t &soid, eversion_t need);
+  size_t prep_object_replica_deletes(
+    const hobject_t& soid,
+    eversion_t need,
+    std::vector<crimson::osd::blocking_future<>> *in_progress);
+  size_t prep_object_replica_pushes(
+    const hobject_t& soid,
+    eversion_t need,
+    std::vector<crimson::osd::blocking_future<>> *in_progress);
+
+  void on_local_recover(
+    const hobject_t& soid,
+    const ObjectRecoveryInfo& recovery_info,
+    bool is_delete,
+    ceph::os::Transaction& t);
+  void on_global_recover (
+    const hobject_t& soid,
+    const object_stat_sum_t& stat_diff,
+    bool is_delete);
+  void on_failed_recover(
+    const set<pg_shard_t>& from,
+    const hobject_t& soid,
+    const eversion_t& v);
+  void on_peer_recover(
+    pg_shard_t peer,
+    const hobject_t &oid,
+    const ObjectRecoveryInfo &recovery_info);
+  void _committed_pushed_object(epoch_t epoch,
+                               eversion_t last_complete);
+  friend class ReplicatedRecoveryBackend;
+  seastar::future<> handle_pull(Ref<MOSDPGPull> m);
+  seastar::future<> handle_push(Ref<MOSDPGPush> m);
+  seastar::future<> handle_push_reply(Ref<MOSDPGPushReply> m);
+  seastar::future<> handle_recovery_delete(Ref<MOSDPGRecoveryDelete> m);
+  seastar::future<> handle_recovery_delete_reply(
+      Ref<MOSDPGRecoveryDeleteReply> m);
+  seastar::future<> handle_pull_response(Ref<MOSDPGPush> m);
+};
diff --git a/src/crimson/osd/pg_recovery_listener.h b/src/crimson/osd/pg_recovery_listener.h
new file mode 100644 (file)
index 0000000..2946f93
--- /dev/null
@@ -0,0 +1,33 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "common/hobject.h"
+#include "include/types.h"
+#include "osd/osd_types.h"
+
+namespace crimson::osd {
+  class ShardServices;
+};
+
+class RecoveryBackend;
+class PGRecovery;
+
+class PGRecoveryListener {
+public:
+  virtual crimson::osd::ShardServices& get_shard_services() = 0;
+  virtual PGRecovery* get_recovery_handler() = 0;
+  virtual epoch_t get_osdmap_epoch() const = 0;
+  virtual bool is_primary() const = 0;
+  virtual bool is_peered() const = 0;
+  virtual bool is_recovering() const = 0;
+  virtual bool is_backfilling() const = 0;
+  virtual PeeringState& get_peering_state() = 0;
+  virtual const pg_shard_t& get_pg_whoami() const = 0;
+  virtual const spg_t& get_pgid() const = 0;
+  virtual RecoveryBackend* get_recovery_backend() = 0;
+  virtual bool is_unreadable_object(const hobject_t&) const = 0;
+  virtual bool has_reset_since(epoch_t) const = 0;
+  virtual std::vector<pg_shard_t> get_replica_recovery_order() const = 0;
+};
diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc
new file mode 100644 (file)
index 0000000..31ae27f
--- /dev/null
@@ -0,0 +1,44 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/recovery_backend.h"
+#include "crimson/osd/pg.h"
+
+#include "osd/osd_types.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+hobject_t RecoveryBackend::get_temp_recovery_object(
+  const hobject_t& target,
+  eversion_t version)
+{
+  ostringstream ss;
+  ss << "temp_recovering_" << pg.get_info().pgid << "_" << version
+    << "_" << pg.get_info().history.same_interval_since << "_" << target.snap;
+  hobject_t hoid = target.make_temp_hobject(ss.str());
+  logger().debug("{} {}", __func__, hoid);
+  return hoid;
+}
+
+void RecoveryBackend::clean_up(ceph::os::Transaction& t,
+                              const std::string& why)
+{
+  for (auto& soid : temp_contents) {
+    t.remove(pg.get_collection_ref()->get_cid(),
+             ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
+  }
+  temp_contents.clear();
+
+  for (auto& [soid, recovery_waiter] : recovering) {
+    if (recovery_waiter.obc) {
+      recovery_waiter.obc->drop_recovery_read();
+      recovery_waiter.interrupt(why);
+    }
+  }
+  recovering.clear();
+}
+
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h
new file mode 100644 (file)
index 0000000..461ae62
--- /dev/null
@@ -0,0 +1,155 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+
+#include "crimson/common/type_helpers.h"
+#include "crimson/os/futurized_store.h"
+#include "crimson/os/futurized_collection.h"
+#include "crimson/osd/object_context.h"
+#include "crimson/osd/shard_services.h"
+
+#include "osd/osd_types.h"
+
+namespace crimson::osd{
+  class PG;
+}
+
+class PGBackend;
+
+class RecoveryBackend {
+protected:
+  class WaitForObjectRecovery;
+public:
+  RecoveryBackend(crimson::osd::PG& pg,
+                 crimson::osd::ShardServices& shard_services,
+                 crimson::os::CollectionRef coll,
+                 PGBackend* backend)
+    : pg{pg},
+      shard_services{shard_services},
+      store{&shard_services.get_store()},
+      coll{coll},
+      backend{backend} {}
+  virtual ~RecoveryBackend() {}
+  WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
+    return recovering[soid];
+  }
+  void remove_recovering(const hobject_t& soid) {
+    recovering.erase(soid);
+  }
+  bool is_recovering(const hobject_t& soid) {
+    return recovering.count(soid) != 0;
+  }
+  uint64_t total_recovering() {
+    return recovering.size();
+  }
+
+  virtual seastar::future<> handle_recovery_op(
+    Ref<MOSDFastDispatchOp> m) = 0;
+
+  virtual seastar::future<> recover_object(
+    const hobject_t& soid,
+    eversion_t need) = 0;
+  virtual seastar::future<> recover_delete(
+    const hobject_t& soid,
+    eversion_t need) = 0;
+  virtual seastar::future<> push_delete(
+    const hobject_t& soid,
+    eversion_t need) = 0;
+
+  void on_peering_interval_change(ceph::os::Transaction& t) {
+    clean_up(t, "new peering interval");
+  }
+protected:
+  crimson::osd::PG& pg;
+  crimson::osd::ShardServices& shard_services;
+  crimson::os::FuturizedStore* store;
+  crimson::os::CollectionRef coll;
+  PGBackend* backend;
+
+  struct PullInfo {
+    pg_shard_t from;
+    hobject_t soid;
+    ObjectRecoveryProgress recovery_progress;
+    ObjectRecoveryInfo recovery_info;
+    crimson::osd::ObjectContextRef head_ctx;
+    crimson::osd::ObjectContextRef obc;
+    object_stat_sum_t stat;
+    bool is_complete() const {
+      return recovery_progress.is_complete(recovery_info);
+    }
+  };
+
+  struct PushInfo {
+    ObjectRecoveryProgress recovery_progress;
+    ObjectRecoveryInfo recovery_info;
+    crimson::osd::ObjectContextRef obc;
+    object_stat_sum_t stat;
+  };
+
+  class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> {
+    seastar::shared_promise<> readable, recovered, pulled;
+    std::map<pg_shard_t, seastar::shared_promise<>> pushes;
+  public:
+    static constexpr const char* type_name = "WaitForObjectRecovery";
+
+    crimson::osd::ObjectContextRef obc;
+    PullInfo pi;
+    std::map<pg_shard_t, PushInfo> pushing;
+
+    seastar::future<> wait_for_readable() {
+      return readable.get_shared_future();
+    }
+    seastar::future<> wait_for_pushes(pg_shard_t shard) {
+      return pushes[shard].get_shared_future();
+    }
+    seastar::future<> wait_for_recovered() {
+      return recovered.get_shared_future();
+    }
+    seastar::future<> wait_for_pull() {
+      return pulled.get_shared_future();
+    }
+    void set_readable() {
+      readable.set_value();
+    }
+    void set_recovered() {
+      recovered.set_value();
+    }
+    void set_pushed(pg_shard_t shard) {
+      pushes[shard].set_value();
+    }
+    void set_pulled() {
+      pulled.set_value();
+    }
+    void interrupt(const std::string& why) {
+      readable.set_exception(std::system_error(
+           std::make_error_code(std::errc::interrupted), why));
+      recovered.set_exception(std::system_error(
+           std::make_error_code(std::errc::interrupted), why));
+      pulled.set_exception(std::system_error(
+           std::make_error_code(std::errc::interrupted), why));
+      for (auto& [pg_shard, pr] : pushes) {
+       pr.set_exception(std::system_error(
+             std::make_error_code(std::errc::interrupted), why));
+      }
+    }
+    void dump_detail(Formatter* f) const {
+    }
+  };
+  std::map<hobject_t, WaitForObjectRecovery> recovering;
+  hobject_t get_temp_recovery_object(
+    const hobject_t& target,
+    eversion_t version);
+
+  boost::container::flat_set<hobject_t> temp_contents;
+
+  void add_temp_obj(const hobject_t &oid) {
+    temp_contents.insert(oid);
+  }
+  void clear_temp_obj(const hobject_t &oid) {
+    temp_contents.erase(oid);
+  }
+  void clean_up(ceph::os::Transaction& t, const std::string& why);
+};
index 76f4539fa865a3e29a0b04f4aa52444d3ebbdf90..c9667e4927875906e75f52d458bd66f349a15855 100644 (file)
@@ -6,10 +6,8 @@
 #include "messages/MOSDRepOpReply.h"
 
 #include "crimson/common/log.h"
-#include "crimson/os/cyanstore/cyan_object.h"
 #include "crimson/os/futurized_store.h"
 #include "crimson/osd/shard_services.h"
-#include "crimson/osd/pg.h"
 #include "osd/PeeringState.h"
 
 namespace {
diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc
new file mode 100644 (file)
index 0000000..4e0992c
--- /dev/null
@@ -0,0 +1,1083 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+#include <seastar/core/future.hh>
+#include <seastar/core/do_with.hh>
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_backend.h"
+#include "replicated_recovery_backend.h"
+
+#include "msg/Message.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+seastar::future<> ReplicatedRecoveryBackend::recover_object(
+  const hobject_t& soid,
+  eversion_t need)
+{
+  logger().debug("{}: {}, {}", __func__, soid, need);
+  auto& recovery_waiter = recovering[soid];
+  return seastar::do_with(std::map<pg_shard_t, PushOp>(), get_shards_to_push(soid),
+    [this, soid, need, &recovery_waiter](auto& pops, auto& shards) {
+    return [this, soid, need, &recovery_waiter] {
+      pg_missing_tracker_t local_missing = pg.get_local_missing();
+      if (local_missing.is_missing(soid)) {
+       PullOp po;
+       auto& pi = recovery_waiter.pi;
+       prepare_pull(po, pi, soid, need);
+       auto msg = make_message<MOSDPGPull>();
+       msg->from = pg.get_pg_whoami();
+       msg->set_priority(pg.get_recovery_op_priority());
+       msg->pgid = pg.get_pgid();
+       msg->map_epoch = pg.get_osdmap_epoch();
+       msg->min_epoch = pg.get_last_peering_reset();
+       std::vector<PullOp> pulls;
+       pulls.push_back(po);
+       msg->set_pulls(&pulls);
+       return shard_services.send_to_osd(pi.from.osd,
+                                          std::move(msg),
+                                          pg.get_osdmap_epoch()).then(
+         [&recovery_waiter] {
+         return recovery_waiter.wait_for_pull();
+       });
+      } else {
+       return seastar::make_ready_future<>();
+      }
+    }().then([this, &pops, &shards, soid, need, &recovery_waiter]() mutable {
+      return [this, &recovery_waiter, soid] {
+       if (!recovery_waiter.obc) {
+         return pg.get_or_load_head_obc(soid).safe_then(
+           [this, &recovery_waiter](auto p) {
+           auto& [obc, existed] = p;
+           logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
+           recovery_waiter.obc = obc;
+           if (!existed) {
+             // obc is loaded with excl lock
+             recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
+           }
+           assert(recovery_waiter.obc->get_recovery_read());
+           return seastar::make_ready_future<>();
+         }, crimson::osd::PG::load_obc_ertr::all_same_way(
+             [this, &recovery_waiter, soid](const std::error_code& e) {
+             auto [obc, existed] =
+                 shard_services.obc_registry.get_cached_obc(soid);
+             logger().debug("recover_object: load failure of obc: {}",
+                 obc->obs.oi.soid);
+             recovery_waiter.obc = obc;
+             // obc is loaded with excl lock
+             recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
+             assert(recovery_waiter.obc->get_recovery_read());
+             return seastar::make_ready_future<>();
+           })
+         );
+       }
+       return seastar::now();
+      }().then([this, soid, need, &pops, &shards] {
+       return prep_push(soid, need, &pops, shards);
+      });
+    }).handle_exception([this, soid, &recovery_waiter](auto e) {
+      auto& recovery_waiter = recovering[soid];
+      if (recovery_waiter.obc)
+       recovery_waiter.obc->drop_recovery_read();
+      recovering.erase(soid);
+      return seastar::make_exception_future<>(e);
+    }).then([this, &pops, &shards, soid] {
+      return seastar::parallel_for_each(shards,
+       [this, &pops, soid](auto shard) {
+       auto msg = make_message<MOSDPGPush>();
+       msg->from = pg.get_pg_whoami();
+       msg->pgid = pg.get_pgid();
+       msg->map_epoch = pg.get_osdmap_epoch();
+       msg->min_epoch = pg.get_last_peering_reset();
+       msg->set_priority(pg.get_recovery_op_priority());
+       msg->pushes.push_back(pops[shard->first]);
+       return shard_services.send_to_osd(shard->first.osd, std::move(msg),
+                                         pg.get_osdmap_epoch()).then(
+         [this, soid, shard] {
+         return recovering[soid].wait_for_pushes(shard->first);
+       });
+      });
+    }).then([this, soid, &recovery_waiter] {
+      bool error = recovering[soid].pi.recovery_progress.error;
+      if (!error) {
+       auto push_info = recovering[soid].pushing.begin();
+       object_stat_sum_t stat = {};
+       if (push_info != recovering[soid].pushing.end()) {
+         stat = push_info->second.stat;
+       } else {
+         // no push happened, take pull_info's stat
+         stat = recovering[soid].pi.stat;
+       }
+       pg.get_recovery_handler()->on_global_recover(soid, stat, false);
+       return seastar::make_ready_future<>();
+      } else {
+       auto& recovery_waiter = recovering[soid];
+       if (recovery_waiter.obc)
+         recovery_waiter.obc->drop_recovery_read();
+       recovering.erase(soid);
+       return seastar::make_exception_future<>(
+           std::runtime_error(fmt::format("Errors during pushing for {}", soid)));
+      }
+    });
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::push_delete(
+  const hobject_t& soid,
+  eversion_t need)
+{
+  logger().debug("{}: {}, {}", __func__, soid, need);
+  recovering[soid];
+  epoch_t min_epoch = pg.get_last_peering_reset();
+
+  assert(pg.get_acting_recovery_backfill().size() > 0);
+  return seastar::parallel_for_each(pg.get_acting_recovery_backfill(),
+    [this, soid, need, min_epoch](pg_shard_t shard) {
+    if (shard == pg.get_pg_whoami())
+      return seastar::make_ready_future<>();
+    auto iter = pg.get_shard_missing().find(shard);
+    if (iter == pg.get_shard_missing().end())
+      return seastar::make_ready_future<>();
+    if (iter->second.is_missing(soid)) {
+      logger().debug("{} will remove {} from {}", __func__, soid, shard);
+      pg.begin_peer_recover(shard, soid);
+      spg_t target_pg = spg_t(pg.get_info().pgid.pgid, shard.shard);
+      auto msg = make_message<MOSDPGRecoveryDelete>(
+         pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch);
+
+      msg->set_priority(pg.get_recovery_op_priority());
+      msg->objects.push_back(std::make_pair(soid, need));
+      return shard_services.send_to_osd(shard.osd, std::move(msg),
+                                       pg.get_osdmap_epoch()).then(
+       [this, soid, shard] {
+       return recovering[soid].wait_for_pushes(shard);
+      });
+    }
+    return seastar::make_ready_future<>();
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete(
+  Ref<MOSDPGRecoveryDelete> m)
+{
+  logger().debug("{}: {}", __func__, *m);
+
+  auto& p = m->objects.front(); //TODO: only one delete per message for now.
+  return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then(
+    [this, m, &p] {
+    auto reply = make_message<MOSDPGRecoveryDeleteReply>();
+    reply->from = pg.get_pg_whoami();
+    reply->set_priority(m->get_priority());
+    reply->pgid = spg_t(pg.get_info().pgid.pgid, m->from.shard);
+    reply->map_epoch = m->map_epoch;
+    reply->min_epoch = m->min_epoch;
+    reply->objects = m->objects;
+    return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch());
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist(
+  const hobject_t& soid,
+  const ObjectRecoveryInfo& _recovery_info,
+  bool is_delete,
+  epoch_t epoch_frozen)
+{
+  logger().debug("{}", __func__);
+  ceph::os::Transaction t;
+  pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t);
+  return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+    [this, &soid, &_recovery_info, epoch_frozen,
+    last_complete = pg.get_info().last_complete] {
+    pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
+    return seastar::make_ready_future<>();
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::local_recover_delete(
+  const hobject_t& soid,
+  eversion_t need,
+  epoch_t epoch_to_freeze)
+{
+  logger().debug("{}: {}, {}", __func__, soid, need);
+  return backend->load_metadata(soid).safe_then([this]
+    (auto lomt) {
+    if (lomt->os.exists) {
+      return seastar::do_with(ceph::os::Transaction(),
+       [this, lomt = std::move(lomt)](auto& txn) {
+       return backend->remove(lomt->os, txn).then([this, &txn]() mutable {
+         return shard_services.get_store().do_transaction(coll,
+                                                          std::move(txn));
+       });
+      });
+    }
+    return seastar::make_ready_future<>();
+  }).safe_then([this, soid, epoch_to_freeze, need] {
+    auto& recovery_waiter = recovering[soid];
+    auto& pi = recovery_waiter.pi;
+    pi.recovery_info.soid = soid;
+    pi.recovery_info.version = need;
+    return on_local_recover_persist(soid, pi.recovery_info,
+                                   true, epoch_to_freeze);
+  }, PGBackend::load_metadata_ertr::all_same_way(
+      [this, soid, epoch_to_freeze, need] (auto e) {
+      auto& recovery_waiter = recovering[soid];
+      auto& pi = recovery_waiter.pi;
+      pi.recovery_info.soid = soid;
+      pi.recovery_info.version = need;
+      return on_local_recover_persist(soid, pi.recovery_info,
+                                     true, epoch_to_freeze);
+    })
+  );
+}
+
+seastar::future<> ReplicatedRecoveryBackend::recover_delete(
+  const hobject_t &soid, eversion_t need)
+{
+  logger().debug("{}: {}, {}", __func__, soid, need);
+
+  epoch_t cur_epoch = pg.get_osdmap_epoch();
+  return seastar::do_with(object_stat_sum_t(),
+    [this, soid, need, cur_epoch](auto& stat_diff) {
+    return local_recover_delete(soid, need, cur_epoch).then(
+      [this, &stat_diff, cur_epoch, soid, need] {
+      if (!pg.has_reset_since(cur_epoch)) {
+       bool object_missing = false;
+       for (const auto& shard : pg.get_acting_recovery_backfill()) {
+         if (shard == pg.get_pg_whoami())
+           continue;
+         if (pg.get_shard_missing(shard)->is_missing(soid)) {
+           logger().debug("{}: soid {} needs to deleted from replca {}",
+                          __func__,
+                          soid,
+                          shard);
+           object_missing = true;
+           break;
+         }
+       }
+
+       if (!object_missing) {
+         stat_diff.num_objects_recovered = 1;
+         return seastar::make_ready_future<>();
+       } else {
+         return push_delete(soid, need);
+       }
+      }
+      return seastar::make_ready_future<>();
+    }).then([this, soid, &stat_diff] {
+      pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true);
+      return seastar::make_ready_future<>();
+    });
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::prep_push(
+  const hobject_t& soid,
+  eversion_t need,
+  std::map<pg_shard_t, PushOp>* pops,
+  const std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards)
+{
+  logger().debug("{}: {}, {}", __func__, soid, need);
+
+  return seastar::do_with(std::map<pg_shard_t, interval_set<uint64_t>>(),
+    [this, soid, need, pops, &shards](auto& data_subsets) {
+    return seastar::parallel_for_each(shards,
+      [this, soid, need, pops, &data_subsets](auto pg_shard) mutable {
+      pops->emplace(pg_shard->first, PushOp());
+      auto& recovery_waiter = recovering[soid];
+      auto& obc = recovery_waiter.obc;
+      auto& data_subset = data_subsets[pg_shard->first];
+
+      if (obc->obs.oi.size) {
+       data_subset.insert(0, obc->obs.oi.size);
+      }
+      const auto& missing = pg.get_shard_missing().find(pg_shard->first)->second;
+      if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
+       const auto it = missing.get_items().find(soid);
+       assert(it != missing.get_items().end());
+       data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+       logger().debug("calc_head_subsets {} data_subset {}", soid, data_subset);
+      }
+
+      logger().debug("prep_push: {} to {}", soid, pg_shard->first);
+      auto& pi = recovery_waiter.pushing[pg_shard->first];
+      pg.begin_peer_recover(pg_shard->first, soid);
+      const auto pmissing_iter = pg.get_shard_missing().find(pg_shard->first);
+      const auto missing_iter = pmissing_iter->second.get_items().find(soid);
+      assert(missing_iter != pmissing_iter->second.get_items().end());
+
+      pi.obc = obc;
+      pi.recovery_info.size = obc->obs.oi.size;
+      pi.recovery_info.copy_subset = data_subset;
+      pi.recovery_info.soid = soid;
+      pi.recovery_info.oi = obc->obs.oi;
+      pi.recovery_info.version = obc->obs.oi.version;
+      pi.recovery_info.object_exist =
+       missing_iter->second.clean_regions.object_is_exist();
+      pi.recovery_progress.omap_complete =
+       !missing_iter->second.clean_regions.omap_is_dirty() &&
+       HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
+
+      return build_push_op(pi.recovery_info, pi.recovery_progress,
+                          &pi.stat, &(*pops)[pg_shard->first]).then(
+       [this, soid, pg_shard](auto new_progress) {
+       auto& recovery_waiter = recovering[soid];
+       auto& pi = recovery_waiter.pushing[pg_shard->first];
+       pi.recovery_progress = new_progress;
+       return seastar::make_ready_future<>();
+      });
+    });
+  });
+}
+
+void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi,
+  const hobject_t& soid,
+  eversion_t need) {
+  logger().debug("{}: {}, {}", __func__, soid, need);
+
+  pg_missing_tracker_t local_missing = pg.get_local_missing();
+  const auto missing_iter = local_missing.get_items().find(soid);
+  auto m = pg.get_missing_loc_shards();
+  pg_shard_t fromshard = *(m[soid].begin());
+
+  //TODO: skipped snap objects case for now
+  po.recovery_info.copy_subset.insert(0, (uint64_t) -1);
+  if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS))
+    po.recovery_info.copy_subset.intersection_of(
+       missing_iter->second.clean_regions.get_dirty_regions());
+  po.recovery_info.size = ((uint64_t) -1);
+  po.recovery_info.object_exist =
+    missing_iter->second.clean_regions.object_is_exist();
+  po.recovery_info.soid = soid;
+  po.soid = soid;
+  po.recovery_progress.data_complete = false;
+  po.recovery_progress.omap_complete =
+    !missing_iter->second.clean_regions.omap_is_dirty() &&
+    HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
+  po.recovery_progress.data_recovered_to = 0;
+  po.recovery_progress.first = true;
+
+  pi.from = fromshard;
+  pi.soid = soid;
+  pi.recovery_info = po.recovery_info;
+  pi.recovery_progress = po.recovery_progress;
+}
+
+seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op(
+    const ObjectRecoveryInfo& recovery_info,
+    const ObjectRecoveryProgress& progress,
+    object_stat_sum_t* stat,
+    PushOp* pop
+  ) {
+  logger().debug("{}", __func__);
+  return seastar::do_with(ObjectRecoveryProgress(progress),
+                         object_info_t(),
+                         uint64_t(crimson::common::local_conf()
+                           ->osd_recovery_max_chunk),
+                         eversion_t(),
+    [this, &recovery_info, &progress, stat, pop]
+    (auto& new_progress, auto& oi, auto& available, auto& v) {
+    return [this, &recovery_info, &progress, &new_progress, &oi, pop, &v] {
+      if (progress.first) {
+       v = recovery_info.version;
+       return backend->omap_get_header(coll, ghobject_t(recovery_info.soid))
+         .then([this, &recovery_info, pop](auto bl) {
+         pop->omap_header.claim_append(bl);
+         return store->get_attrs(coll, ghobject_t(recovery_info.soid));
+       }).safe_then([this, &oi, pop, &new_progress, &v](auto attrs) mutable {
+         //pop->attrset = attrs;
+         for (auto p : attrs) {
+           pop->attrset[p.first].push_back(p.second);
+         }
+         logger().debug("build_push_op: {}", pop->attrset[OI_ATTR]);
+         oi.decode(pop->attrset[OI_ATTR]);
+         new_progress.first = false;
+         if (v == eversion_t()) {
+           v = oi.version;
+         }
+         return seastar::make_ready_future<>();
+       }, crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
+           [] (const std::error_code& e) {
+           return seastar::make_exception_future<>(e);
+         })
+       );
+      }
+      return seastar::make_ready_future<>();
+    }().then([this, &recovery_info] {
+      return shard_services.get_store().get_omap_iterator(coll,
+               ghobject_t(recovery_info.soid));
+    }).then([this, &progress, &available, &new_progress, pop](auto iter) {
+      if (!progress.omap_complete) {
+       return iter->lower_bound(progress.omap_recovered_to).then(
+         [this, iter, &new_progress, pop, &available](int ret) {
+         return seastar::repeat([this, iter, &new_progress, pop, &available] {
+           if (!iter->valid()) {
+             new_progress.omap_complete = true;
+             return seastar::make_ready_future<seastar::stop_iteration>(
+                       seastar::stop_iteration::yes);
+           }
+           if (!pop->omap_entries.empty()
+               && ((crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk > 0
+                   && pop->omap_entries.size()
+                   >= crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk)
+                 || available <= iter->key().size() + iter->value().length())) {
+             new_progress.omap_recovered_to = iter->key();
+             return seastar::make_ready_future<seastar::stop_iteration>(
+                       seastar::stop_iteration::yes);
+           }
+           pop->omap_entries.insert(make_pair(iter->key(), iter->value()));
+           if ((iter->key().size() + iter->value().length()) <= available)
+             available -= (iter->key().size() + iter->value().length());
+           else
+             available = 0;
+           return iter->next().then([](int r) {
+             return seastar::stop_iteration::no;
+           });
+         });
+       });
+      }
+      return seastar::make_ready_future<>();
+    }).then([this, &recovery_info, &progress, &available, &new_progress, pop] {
+      logger().debug("build_push_op: available: {}, copy_subset: {}",
+                    available, recovery_info.copy_subset);
+      if (available > 0) {
+       if (!recovery_info.copy_subset.empty()) {
+         return seastar::do_with(interval_set<uint64_t>(recovery_info.copy_subset),
+           [this, &recovery_info, &progress, &available, pop, &new_progress]
+           (auto& copy_subset) {
+           return backend->fiemap(coll, ghobject_t(recovery_info.soid),
+                         0, copy_subset.range_end()).then(
+             [&copy_subset](auto m) {
+             interval_set<uint64_t> fiemap_included(std::move(m));
+             copy_subset.intersection_of(fiemap_included);
+             return seastar::make_ready_future<>();
+           }).then([this, &recovery_info, &progress,
+             &copy_subset, &available, pop, &new_progress] {
+             pop->data_included.span_of(copy_subset, progress.data_recovered_to,
+                                        available);
+             if (pop->data_included.empty()) // zero filled section, skip to end!
+               new_progress.data_recovered_to =
+                 recovery_info.copy_subset.range_end();
+             else
+               new_progress.data_recovered_to = pop->data_included.range_end();
+             return seastar::make_ready_future<>();
+           }).handle_exception([&copy_subset](auto e) {
+             copy_subset.clear();
+             return seastar::make_ready_future<>();
+           });
+         });
+       } else {
+         return seastar::now();
+       }
+      } else {
+       pop->data_included.clear();
+       return seastar::make_ready_future<>();
+      }
+    }).then([this, &recovery_info, &progress, &oi, pop] {
+      //TODO: there's no readv in cyan_store yet, use read temporarily.
+      return store->readv(coll, ghobject_t{oi.soid}, pop->data_included, 0);
+    }).safe_then([this, &recovery_info, &progress,
+      &new_progress, &oi, stat, pop, &v]
+      (auto bl) {
+      pop->data.claim_append(bl);
+      if (new_progress.is_complete(recovery_info)) {
+       new_progress.data_complete = true;
+       if (stat)
+         stat->num_objects_recovered++;
+      } else if (progress.first && progress.omap_complete) {
+      // If omap is not changed, we need recovery omap
+      // when recovery cannot be completed once
+       new_progress.omap_complete = false;
+      }
+      if (stat) {
+       stat->num_keys_recovered += pop->omap_entries.size();
+       stat->num_bytes_recovered += pop->data.length();
+      }
+      pop->version = v;
+      pop->soid = recovery_info.soid;
+      pop->recovery_info = recovery_info;
+      pop->after_progress = new_progress;
+      pop->before_progress = progress;
+      logger().debug("build_push_op: pop version: {}, pop data length: {}",
+                    pop->version, pop->data.length());
+      return seastar::make_ready_future<ObjectRecoveryProgress>
+               (std::move(new_progress));
+    }, PGBackend::read_errorator::all_same_way([](auto e) {
+       logger().debug("build_push_op: read exception");
+       return seastar::make_exception_future<ObjectRecoveryProgress>(e);
+      })
+    );
+  });
+}
+
+std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>
+ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid)
+{
+  std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator> shards;
+  assert(pg.get_acting_recovery_backfill().size() > 0);
+  for (set<pg_shard_t>::iterator i =
+      pg.get_acting_recovery_backfill().begin();
+      i != pg.get_acting_recovery_backfill().end();
+      ++i) {
+    if (*i == pg.get_pg_whoami())
+      continue;
+    pg_shard_t peer = *i;
+    map<pg_shard_t, pg_missing_t>::const_iterator j =
+      pg.get_shard_missing().find(peer);
+    assert(j != pg.get_shard_missing().end());
+    if (j->second.is_missing(soid)) {
+      shards.push_back(j);
+    }
+  }
+  return shards;
+}
+
+seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
+{
+  logger().debug("{}: {}", __func__, *m);
+  vector<PullOp> pulls;
+  m->take_pulls(&pulls);
+  return seastar::do_with(std::move(pulls),
+    [this, m, from = m->from](auto& pulls) {
+    return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) {
+      const hobject_t& soid = pull_op.soid;
+      return seastar::do_with(PushOp(),
+       [this, &soid, &pull_op, from](auto& pop) {
+       logger().debug("handle_pull: {}", soid);
+       return backend->stat(coll, ghobject_t(soid)).then(
+         [this, &pull_op, &pop](auto st) {
+         ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
+         ObjectRecoveryProgress &progress = pull_op.recovery_progress;
+         if (progress.first && recovery_info.size == ((uint64_t) -1)) {
+           // Adjust size and copy_subset
+           recovery_info.size = st.st_size;
+           if (st.st_size) {
+             interval_set<uint64_t> object_range;
+             object_range.insert(0, st.st_size);
+             recovery_info.copy_subset.intersection_of(object_range);
+           } else {
+             recovery_info.copy_subset.clear();
+           }
+           assert(recovery_info.clone_subset.empty());
+         }
+         return build_push_op(recovery_info, progress, 0, &pop);
+       }).handle_exception([soid, &pop](auto e) {
+         pop.recovery_info.version = eversion_t();
+         pop.version = eversion_t();
+         pop.soid = soid;
+         return seastar::make_ready_future<ObjectRecoveryProgress>();
+       }).then([this, &pop, &pull_op, from](auto new_progress) {
+         auto msg = make_message<MOSDPGPush>();
+         msg->from = pg.get_pg_whoami();
+         msg->pgid = pg.get_pgid();
+         msg->map_epoch = pg.get_osdmap_epoch();
+         msg->min_epoch = pg.get_last_peering_reset();
+         msg->set_priority(pg.get_recovery_op_priority());
+         msg->pushes.push_back(pop);
+         return shard_services.send_to_osd(from.osd, std::move(msg),
+                                           pg.get_osdmap_epoch());
+       });
+      });
+    });
+  });
+}
+
+seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
+  pg_shard_t from,
+  PushOp& pop,
+  PullOp* response,
+  ceph::os::Transaction* t)
+{
+  logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}",
+      pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included);
+
+  const hobject_t &hoid = pop.soid;
+  auto& recovery_waiter = recovering[hoid];
+  auto& pi = recovery_waiter.pi;
+  if (pi.recovery_info.size == (uint64_t(-1))) {
+    pi.recovery_info.size = pop.recovery_info.size;
+    pi.recovery_info.copy_subset.intersection_of(
+       pop.recovery_info.copy_subset);
+  }
+
+  // If primary doesn't have object info and didn't know version
+  if (pi.recovery_info.version == eversion_t())
+    pi.recovery_info.version = pop.version;
+
+  bool first = pi.recovery_progress.first;
+
+  return [this, &pi, first, &recovery_waiter, &pop] {
+    if (first) {
+      return pg.get_or_load_head_obc(pi.recovery_info.soid).safe_then(
+       [this, &pi, &recovery_waiter, &pop](auto p) {
+       auto& [obc, existed] = p;
+       pi.obc = obc;
+       recovery_waiter.obc = obc;
+       obc->obs.oi.decode(pop.attrset[OI_ATTR]);
+       pi.recovery_info.oi = obc->obs.oi;
+       return seastar::make_ready_future<>();
+      }, crimson::osd::PG::load_obc_ertr::all_same_way(
+         [this, &pi](const std::error_code& e) {
+         auto [obc, existed] = shard_services.obc_registry.get_cached_obc(
+                                   pi.recovery_info.soid);
+         pi.obc = obc;
+         return seastar::make_ready_future<>();
+       })
+      );
+    }
+    return seastar::make_ready_future<>();
+  }().then([this, first, &pi, &pop, t, response]() mutable {
+    return seastar::do_with(interval_set<uint64_t>(),
+                           bufferlist(),
+                           interval_set<uint64_t>(),
+                           [this, &pop, &pi, first, t, response]
+                           (auto& data_zeros, auto& data,
+                            auto& usable_intervals) {
+      data = pop.data;
+      ceph::bufferlist usable_data;
+      trim_pushed_data(pi.recovery_info.copy_subset, pop.data_included, data,
+         &usable_intervals, &usable_data);
+      data.claim(usable_data);
+      pi.recovery_progress = pop.after_progress;
+      logger().debug("new recovery_info {}, new progress {}",
+         pi.recovery_info, pi.recovery_progress);
+      uint64_t z_offset = pop.before_progress.data_recovered_to;
+      uint64_t z_length = pop.after_progress.data_recovered_to
+         - pop.before_progress.data_recovered_to;
+      if (z_length)
+       data_zeros.insert(z_offset, z_length);
+      bool complete = pi.is_complete();
+      bool clear_omap = !pop.before_progress.omap_complete;
+      return submit_push_data(pi.recovery_info, first, complete, clear_omap,
+         data_zeros, usable_intervals, data, pop.omap_header,
+         pop.attrset, pop.omap_entries, t).then(
+       [this, response, &pi, &pop, &data, complete, t] {
+       pi.stat.num_keys_recovered += pop.omap_entries.size();
+       pi.stat.num_bytes_recovered += data.length();
+
+       if (complete) {
+         pi.stat.num_objects_recovered++;
+         pg.get_recovery_handler()->on_local_recover(pop.soid, recovering[pop.soid].pi.recovery_info,
+                             false, *t);
+         return seastar::make_ready_future<bool>(true);
+       } else {
+         response->soid = pop.soid;
+         response->recovery_info = pi.recovery_info;
+         response->recovery_progress = pi.recovery_progress;
+         return seastar::make_ready_future<bool>(false);
+       }
+      });
+    });
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::handle_pull_response(
+  Ref<MOSDPGPush> m)
+{
+  const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now.
+  if (pop.version == eversion_t()) {
+    // replica doesn't have it!
+    pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid,
+       get_recovering(pop.soid).pi.recovery_info.version);
+    return seastar::make_exception_future<>(
+       std::runtime_error(fmt::format(
+           "Error on pushing side {} when pulling obj {}",
+           m->from, pop.soid)));
+  }
+
+  logger().debug("{}: {}", __func__, *m);
+  return seastar::do_with(PullOp(), [this, m](auto& response) {
+    return seastar::do_with(ceph::os::Transaction(), m.get(),
+      [this, &response](auto& t, auto& m) {
+      pg_shard_t from = m->from;
+      PushOp& pop = m->pushes[0]; // only one push per message for now
+      return _handle_pull_response(from, pop, &response, &t).then(
+       [this, &t](bool complete) {
+       epoch_t epoch_frozen = pg.get_osdmap_epoch();
+       return shard_services.get_store().do_transaction(coll, std::move(t))
+         .then([this, epoch_frozen, complete,
+         last_complete = pg.get_info().last_complete] {
+         pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
+         return seastar::make_ready_future<bool>(complete);
+       });
+      });
+    }).then([this, m, &response](bool complete) {
+      if (complete) {
+       auto& pop = m->pushes[0];
+       recovering[pop.soid].set_pulled();
+       return seastar::make_ready_future<>();
+      } else {
+       auto reply = make_message<MOSDPGPull>();
+       reply->from = pg.get_pg_whoami();
+       reply->set_priority(m->get_priority());
+       reply->pgid = pg.get_info().pgid;
+       reply->map_epoch = m->map_epoch;
+       reply->min_epoch = m->min_epoch;
+       vector<PullOp> vec = { std::move(response) };
+       reply->set_pulls(&vec);
+       return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch());
+      }
+    });
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::_handle_push(
+  pg_shard_t from,
+  const PushOp &pop,
+  PushReplyOp *response,
+  ceph::os::Transaction *t)
+{
+  logger().debug("{}", __func__);
+
+  return seastar::do_with(interval_set<uint64_t>(),
+                         bufferlist(),
+    [this, &pop, t, response](auto& data_zeros, auto& data) {
+    data = pop.data;
+    bool first = pop.before_progress.first;
+    bool complete = pop.after_progress.data_complete
+      && pop.after_progress.omap_complete;
+    bool clear_omap = !pop.before_progress.omap_complete;
+    uint64_t z_offset = pop.before_progress.data_recovered_to;
+    uint64_t z_length = pop.after_progress.data_recovered_to
+      - pop.before_progress.data_recovered_to;
+    if (z_length)
+      data_zeros.insert(z_offset, z_length);
+    response->soid = pop.recovery_info.soid;
+
+    return submit_push_data(pop.recovery_info, first, complete, clear_omap,
+       data_zeros, pop.data_included, data, pop.omap_header, pop.attrset,
+       pop.omap_entries, t).then([this, complete, &data_zeros, &pop, t] {
+      if (complete) {
+       pg.get_recovery_handler()->on_local_recover(pop.recovery_info.soid,
+                           pop.recovery_info, false, *t);
+      }
+    });
+  });
+}
+
+seastar::future<> ReplicatedRecoveryBackend::handle_push(
+  Ref<MOSDPGPush> m)
+{
+  if (pg.is_primary()) {
+    return handle_pull_response(m);
+  }
+
+  logger().debug("{}: {}", __func__, *m);
+  return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
+    const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now
+    return seastar::do_with(ceph::os::Transaction(),
+      [this, m, &pop, &response](auto& t) {
+      return _handle_push(m->from, pop, &response, &t).then(
+       [this, &t] {
+       epoch_t epoch_frozen = pg.get_osdmap_epoch();
+       return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+         [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
+         //TODO: this should be grouped with pg.on_local_recover somehow.
+         pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
+       });
+      });
+    }).then([this, m, &pop, &response]() mutable {
+      auto reply = make_message<MOSDPGPushReply>();
+      reply->from = pg.get_pg_whoami();
+      reply->set_priority(m->get_priority());
+      reply->pgid = pg.get_info().pgid;
+      reply->map_epoch = m->map_epoch;
+      reply->min_epoch = m->min_epoch;
+      std::vector<PushReplyOp> replies = { std::move(response) };
+      reply->replies.swap(replies);
+      return shard_services.send_to_osd(m->from.osd,
+         std::move(reply), pg.get_osdmap_epoch());
+    });
+  });
+}
+
+seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
+  pg_shard_t peer,
+  const PushReplyOp &op,
+  PushOp *reply)
+{
+  const hobject_t& soid = op.soid;
+  logger().debug("{}, soid {}, from {}", __func__, soid, peer);
+  auto recovering_iter = recovering.find(soid);
+  if (recovering_iter == recovering.end()
+      || !recovering_iter->second.pushing.count(peer)) {
+    logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
+    return seastar::make_ready_future<bool>(true);
+  } else {
+    auto& pi = recovering_iter->second.pushing[peer];
+    return [this, &pi, &soid, reply, peer, recovering_iter] {
+      bool error = pi.recovery_progress.error;
+      if (!pi.recovery_progress.data_complete && !error) {
+       return build_push_op(pi.recovery_info, pi.recovery_progress,
+           &pi.stat, reply).then([&pi] (auto new_progress) {
+         pi.recovery_progress = new_progress;
+         return seastar::make_ready_future<bool>(false);
+       });
+      }
+      if (!error)
+       pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
+      recovering_iter->second.set_pushed(peer);
+      return seastar::make_ready_future<bool>(true);
+    }().handle_exception([this, recovering_iter, &pi, &soid, peer] (auto e) {
+      pi.recovery_progress.error = true;
+      recovering_iter->second.set_pushed(peer);
+      return seastar::make_ready_future<bool>(true);
+    });
+  }
+}
+
+seastar::future<> ReplicatedRecoveryBackend::handle_push_reply(
+  Ref<MOSDPGPushReply> m)
+{
+  logger().debug("{}: {}", __func__, *m);
+  auto from = m->from;
+  auto& push_reply = m->replies[0]; //TODO: only one reply per message
+
+  return seastar::do_with(PushOp(), [this, &push_reply, from](auto& pop) {
+    return _handle_push_reply(from, push_reply, &pop).then(
+      [this, &push_reply, &pop, from](bool finished) {
+      if (!finished) {
+       auto msg = make_message<MOSDPGPush>();
+       msg->from = pg.get_pg_whoami();
+       msg->pgid = pg.get_pgid();
+       msg->map_epoch = pg.get_osdmap_epoch();
+       msg->min_epoch = pg.get_last_peering_reset();
+       msg->set_priority(pg.get_recovery_op_priority());
+       msg->pushes.push_back(pop);
+       return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch());
+      }
+      return seastar::make_ready_future<>();
+    });
+  });
+}
+
+void ReplicatedRecoveryBackend::trim_pushed_data(
+  const interval_set<uint64_t> &copy_subset,
+  const interval_set<uint64_t> &intervals_received,
+  ceph::bufferlist data_received,
+  interval_set<uint64_t> *intervals_usable,
+  bufferlist *data_usable)
+{
+  logger().debug("{}", __func__);
+  if (intervals_received.subset_of(copy_subset)) {
+    *intervals_usable = intervals_received;
+    *data_usable = data_received;
+    return;
+  }
+
+  intervals_usable->intersection_of(copy_subset, intervals_received);
+
+  uint64_t off = 0;
+  for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
+      p != intervals_received.end(); ++p) {
+    interval_set<uint64_t> x;
+    x.insert(p.get_start(), p.get_len());
+    x.intersection_of(copy_subset);
+    for (interval_set<uint64_t>::const_iterator q = x.begin(); q != x.end();
+       ++q) {
+      bufferlist sub;
+      uint64_t data_off = off + (q.get_start() - p.get_start());
+      sub.substr_of(data_received, data_off, q.get_len());
+      data_usable->claim_append(sub);
+    }
+    off += p.get_len();
+  }
+}
+
+seastar::future<> ReplicatedRecoveryBackend::submit_push_data(
+  const ObjectRecoveryInfo &recovery_info,
+  bool first,
+  bool complete,
+  bool clear_omap,
+  interval_set<uint64_t> &data_zeros,
+  const interval_set<uint64_t> &intervals_included,
+  bufferlist data_included,
+  bufferlist omap_header,
+  const map<string, bufferlist> &attrs,
+  const map<string, bufferlist> &omap_entries,
+  ObjectStore::Transaction *t)
+{
+  logger().debug("{}", __func__);
+  hobject_t target_oid;
+  if (first && complete) {
+    target_oid = recovery_info.soid;
+  } else {
+    target_oid = get_temp_recovery_object(recovery_info.soid,
+                                         recovery_info.version);
+    if (first) {
+      logger().debug("{}: Adding oid {} in the temp collection",
+         __func__, target_oid);
+      add_temp_obj(target_oid);
+    }
+  }
+
+  return [this, &recovery_info, first, complete, t,
+    &omap_header, &attrs, &omap_entries, target_oid, clear_omap] {
+    if (first) {
+      if (!complete) {
+       t->remove(coll->get_cid(), ghobject_t(target_oid));
+       t->touch(coll->get_cid(), ghobject_t(target_oid));
+       bufferlist bv = attrs.at(OI_ATTR);
+       object_info_t oi(bv);
+       t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid),
+                         oi.expected_object_size,
+                         oi.expected_write_size,
+                         oi.alloc_hint_flags);
+      } else {
+        if (!recovery_info.object_exist) {
+         t->remove(coll->get_cid(), ghobject_t(target_oid));
+          t->touch(coll->get_cid(), ghobject_t(target_oid));
+          bufferlist bv = attrs.at(OI_ATTR);
+          object_info_t oi(bv);
+          t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid),
+                            oi.expected_object_size,
+                            oi.expected_write_size,
+                            oi.alloc_hint_flags);
+        }
+        //remove xattr and update later if overwrite on original object
+        t->rmattrs(coll->get_cid(), ghobject_t(target_oid));
+        //if need update omap, clear the previous content first
+        if (clear_omap)
+          t->omap_clear(coll->get_cid(), ghobject_t(target_oid));
+      }
+
+      t->truncate(coll->get_cid(), ghobject_t(target_oid), recovery_info.size);
+      if (omap_header.length())
+       t->omap_setheader(coll->get_cid(), ghobject_t(target_oid), omap_header);
+
+      return store->stat(coll, ghobject_t(recovery_info.soid)).then (
+       [this, &recovery_info, complete, t, target_oid,
+       omap_header = std::move(omap_header), &attrs, &omap_entries] (auto st) {
+       //TODO: pg num bytes counting
+       if (!complete) {
+         //clone overlap content in local object
+         if (recovery_info.object_exist) {
+           uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
+           interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
+           if (local_size) {
+             local_intervals_included.insert(0, local_size);
+             local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
+             local_intervals_included.subtract(local_intervals_excluded);
+           }
+           for (interval_set<uint64_t>::const_iterator q = local_intervals_included.begin();
+               q != local_intervals_included.end();
+               ++q) {
+             logger().debug(" clone_range {} {}~{}",
+                 recovery_info.soid, q.get_start(), q.get_len());
+             t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid), ghobject_t(target_oid),
+                 q.get_start(), q.get_len(), q.get_start());
+           }
+         }
+       }
+       return seastar::make_ready_future<>();
+      });
+    }
+    return seastar::make_ready_future<>();
+  }().then([this, &data_zeros, &recovery_info, &intervals_included, t, target_oid,
+    &omap_entries, &attrs, data_included, complete, first] {
+    uint64_t off = 0;
+    uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
+    // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
+    if (data_zeros.size() > 0) {
+      data_zeros.intersection_of(recovery_info.copy_subset);
+      assert(intervals_included.subset_of(data_zeros));
+      data_zeros.subtract(intervals_included);
+
+      logger().debug("submit_push_data recovering object {} copy_subset: {} "
+         "intervals_included: {} data_zeros: {}",
+         recovery_info.soid, recovery_info.copy_subset,
+         intervals_included, data_zeros);
+
+      for (auto p = data_zeros.begin(); p != data_zeros.end(); ++p)
+       t->zero(coll->get_cid(), ghobject_t(target_oid), p.get_start(), p.get_len());
+    }
+    logger().debug("submit_push_data: test");
+    for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
+       p != intervals_included.end();
+       ++p) {
+      bufferlist bit;
+      bit.substr_of(data_included, off, p.get_len());
+      logger().debug("submit_push_data: test1");
+      t->write(coll->get_cid(), ghobject_t(target_oid),
+         p.get_start(), p.get_len(), bit, fadvise_flags);
+      off += p.get_len();
+    }
+
+    if (!omap_entries.empty())
+      t->omap_setkeys(coll->get_cid(), ghobject_t(target_oid), omap_entries);
+    if (!attrs.empty())
+      t->setattrs(coll->get_cid(), ghobject_t(target_oid), attrs);
+
+    if (complete) {
+      if (!first) {
+       logger().debug("{}: Removing oid {} from the temp collection",
+           __func__, target_oid);
+       clear_temp_obj(target_oid);
+       t->remove(coll->get_cid(), ghobject_t(recovery_info.soid));
+       t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid),
+                                 coll->get_cid(), ghobject_t(recovery_info.soid));
+      }
+      submit_push_complete(recovery_info, t);
+    }
+    logger().debug("submit_push_data: done");
+    return seastar::make_ready_future<>();
+  });
+}
+
+void ReplicatedRecoveryBackend::submit_push_complete(
+  const ObjectRecoveryInfo &recovery_info,
+  ObjectStore::Transaction *t)
+{
+  for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
+      recovery_info.clone_subset.begin();
+      p != recovery_info.clone_subset.end(); ++p) {
+    for (interval_set<uint64_t>::const_iterator q = p->second.begin();
+       q != p->second.end(); ++q) {
+      logger().debug(" clone_range {} {}~{}", p->first, q.get_start(), q.get_len());
+      t->clone_range(coll->get_cid(), ghobject_t(p->first), ghobject_t(recovery_info.soid),
+         q.get_start(), q.get_len(), q.get_start());
+    }
+  }
+}
+
+seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply(
+  Ref<MOSDPGRecoveryDeleteReply> m)
+{
+  auto& p = m->objects.front();
+  hobject_t soid = p.first;
+  ObjectRecoveryInfo recovery_info;
+  recovery_info.version = p.second;
+  pg.get_recovery_handler()->on_peer_recover(m->from, soid, recovery_info);
+  get_recovering(soid).set_pushed(m->from);
+  return seastar::now();
+}
+
+seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
+{
+  switch (m->get_header().type) {
+  case MSG_OSD_PG_PULL:
+    return handle_pull(boost::static_pointer_cast<MOSDPGPull>(m));
+  case MSG_OSD_PG_PUSH:
+    return handle_push(boost::static_pointer_cast<MOSDPGPush>(m));
+  case MSG_OSD_PG_PUSH_REPLY:
+    return handle_push_reply(
+       boost::static_pointer_cast<MOSDPGPushReply>(m));
+  case MSG_OSD_PG_RECOVERY_DELETE:
+    return handle_recovery_delete(
+       boost::static_pointer_cast<MOSDPGRecoveryDelete>(m));
+  case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+    return handle_recovery_delete_reply(
+       boost::static_pointer_cast<MOSDPGRecoveryDeleteReply>(m));
+  default:
+    return seastar::make_exception_future<>(
+       std::invalid_argument(fmt::format("invalid request type: {}",
+                                         m->get_header().type)));
+  }
+}
+
diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h
new file mode 100644 (file)
index 0000000..467d7a5
--- /dev/null
@@ -0,0 +1,108 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/osd/recovery_backend.h"
+
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
+#include "os/ObjectStore.h"
+
+class ReplicatedRecoveryBackend : public RecoveryBackend {
+public:
+  ReplicatedRecoveryBackend(crimson::osd::PG& pg,
+                           crimson::osd::ShardServices& shard_services,
+                           crimson::os::CollectionRef coll,
+                           PGBackend* backend)
+    : RecoveryBackend(pg, shard_services, coll, backend) {}
+  seastar::future<> handle_recovery_op(
+    Ref<MOSDFastDispatchOp> m) final;
+
+  seastar::future<> recover_object(
+    const hobject_t& soid,
+    eversion_t need) final;
+  seastar::future<> recover_delete(
+    const hobject_t& soid,
+    eversion_t need) final;
+  seastar::future<> push_delete(
+    const hobject_t& soid,
+    eversion_t need) final;
+protected:
+  seastar::future<> handle_pull(
+    Ref<MOSDPGPull> m);
+  seastar::future<> handle_pull_response(
+    Ref<MOSDPGPush> m);
+  seastar::future<> handle_push(
+    Ref<MOSDPGPush> m);
+  seastar::future<> handle_push_reply(
+    Ref<MOSDPGPushReply> m);
+  seastar::future<> handle_recovery_delete(
+    Ref<MOSDPGRecoveryDelete> m);
+  seastar::future<> handle_recovery_delete_reply(
+    Ref<MOSDPGRecoveryDeleteReply> m);
+  seastar::future<> prep_push(
+    const hobject_t& soid,
+    eversion_t need,
+    std::map<pg_shard_t, PushOp>* pops,
+    const std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards);
+  void prepare_pull(
+    PullOp& po,
+    PullInfo& pi,
+    const hobject_t& soid,
+    eversion_t need);
+  std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator> get_shards_to_push(
+    const hobject_t& soid);
+  seastar::future<ObjectRecoveryProgress> build_push_op(
+    const ObjectRecoveryInfo& recovery_info,
+    const ObjectRecoveryProgress& progress,
+    object_stat_sum_t* stat,
+    PushOp* pop);
+  seastar::future<bool> _handle_pull_response(
+    pg_shard_t from,
+    PushOp& pop,
+    PullOp* response,
+    ceph::os::Transaction* t);
+  void trim_pushed_data(
+    const interval_set<uint64_t> &copy_subset,
+    const interval_set<uint64_t> &intervals_received,
+    ceph::bufferlist data_received,
+    interval_set<uint64_t> *intervals_usable,
+    bufferlist *data_usable);
+  seastar::future<> submit_push_data(
+    const ObjectRecoveryInfo &recovery_info,
+    bool first,
+    bool complete,
+    bool clear_omap,
+    interval_set<uint64_t> &data_zeros,
+    const interval_set<uint64_t> &intervals_included,
+    ceph::bufferlist data_included,
+    ceph::bufferlist omap_header,
+    const std::map<string, bufferlist> &attrs,
+    const std::map<string, bufferlist> &omap_entries,
+    ceph::os::Transaction *t);
+  void submit_push_complete(
+    const ObjectRecoveryInfo &recovery_info,
+    ObjectStore::Transaction *t);
+  seastar::future<> _handle_push(
+    pg_shard_t from,
+    const PushOp &pop,
+    PushReplyOp *response,
+    ceph::os::Transaction *t);
+  seastar::future<bool> _handle_push_reply(
+    pg_shard_t peer,
+    const PushReplyOp &op,
+    PushOp *reply);
+  seastar::future<> on_local_recover_persist(
+    const hobject_t& soid,
+    const ObjectRecoveryInfo& _recovery_info,
+    bool is_delete,
+    epoch_t epoch_to_freeze);
+  seastar::future<> local_recover_delete(
+    const hobject_t& soid,
+    eversion_t need,
+    epoch_t epoch_frozen);
+};
index 324cff0f5f8fd653cc8a394bfeef2361fc9b0da9..ac42d9c281851ddeeb2ee8ca1bfe8dc35252bede 100644 (file)
@@ -7055,3 +7055,45 @@ ostream &operator<<(ostream &out, const PeeringState &ps) {
   }
   return out;
 }
+
+std::vector<pg_shard_t> PeeringState::get_replica_recovery_order() const
+{
+  std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
+    async_by_num_missing;
+  replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1);
+  for (auto &p : get_acting_recovery_backfill()) {
+    if (p == get_primary()) {
+      continue;
+    }
+    auto pm = get_peer_missing().find(p);
+    assert(pm != get_peer_missing().end());
+    auto nm = pm->second.num_missing();
+    if (nm != 0) {
+      if (is_async_recovery_target(p)) {
+       async_by_num_missing.push_back(make_pair(nm, p));
+      } else {
+       replicas_by_num_missing.push_back(make_pair(nm, p));
+      }
+    }
+  }
+  // sort by number of missing objects, in ascending order.
+  auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs,
+                const std::pair<unsigned int, pg_shard_t> &rhs) {
+    return lhs.first < rhs.first;
+  };
+  // acting goes first
+  std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func);
+  // then async_recovery_targets
+  std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func);
+  replicas_by_num_missing.insert(replicas_by_num_missing.end(),
+    async_by_num_missing.begin(), async_by_num_missing.end());
+
+  std::vector<pg_shard_t> ret;
+  ret.reserve(replicas_by_num_missing.size());
+  for (auto p : replicas_by_num_missing) {
+    ret.push_back(p.second);
+  }
+  return ret;
+}
+
+
index ec3e365843ce3200bb58440acf3c0a8c31a4e322..5a765e03017c62b7c570aa46ed7d5f894267a9ee 100644 (file)
@@ -1739,6 +1739,9 @@ public:
   /// Updates info.hit_set to hset_history, does not dirty
   void update_hset(const pg_hit_set_history_t &hset_history);
 
+  /// Get all pg_shards that needs recovery
+  std::vector<pg_shard_t> get_replica_recovery_order() const;
+
   /**
    * update_history
    *