]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/replicate_backend: add the skipped newly created clone 60598/head
authorXuehan Xu <xuxuehan@qianxin.com>
Mon, 4 Nov 2024 09:30:10 +0000 (17:30 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Sun, 17 Nov 2024 11:15:21 +0000 (19:15 +0800)
object to the push queue after the clone request completes

Fixes: https://tracker.ceph.com/issues/68808
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
13 files changed:
src/crimson/osd/backfill_state.cc
src/crimson/osd/backfill_state.h
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.h
src/crimson/osd/pg_recovery.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h

index a77cbe87652494905b4a69226645848acc9156f5..0269627a2c83f1ac3e3d26fab5f2397ad8146f32 100644 (file)
@@ -610,4 +610,12 @@ void BackfillState::ProgressTracker::complete_to(
   }
 }
 
+void BackfillState::enqueue_standalone_push(
+  const hobject_t &obj,
+  const eversion_t &v,
+  const std::vector<pg_shard_t> &peers) {
+  progress_tracker->enqueue_push(obj);
+  backfill_machine.backfill_listener.enqueue_push(obj, v, peers);
+}
+
 } // namespace crimson::osd
index a49cbeaac068bd04c59905a887e3717b94c4d803..072c91e079d7f0bc998b9da16cafc3aa69af9570 100644 (file)
@@ -304,6 +304,15 @@ public:
     backfill_machine.process_event(*std::move(evt));
   }
 
+  void enqueue_standalone_push(
+    const hobject_t &obj,
+    const eversion_t &v,
+    const std::vector<pg_shard_t> &peers);
+
+  bool is_triggered() const {
+    return backfill_machine.triggering_event() != nullptr;
+  }
+
   hobject_t get_last_backfill_started() const {
     return last_backfill_started;
   }
index 32eaaf02b3f3723822d11dcd82808a78283b0697..007d0bf35f3d83c8b529426ea45497aa0d17618f 100644 (file)
@@ -26,6 +26,7 @@ ECBackend::_read(const hobject_t& hoid,
 ECBackend::rep_op_fut_t
 ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
                               const hobject_t& hoid,
+                             crimson::osd::ObjectContextRef&& new_clone,
                               ceph::os::Transaction&& txn,
                               osd_op_params_t&& osd_op_p,
                               epoch_t min_epoch, epoch_t max_epoch,
index 90a7e2b1f4d7fafa63c0ec10ef81512cbc313e15..b14c78c9fc4a093324f1e8a718e55017e4f8a45c 100644 (file)
@@ -28,6 +28,7 @@ private:
   rep_op_fut_t
   submit_transaction(const std::set<pg_shard_t> &pg_shards,
                     const hobject_t& hoid,
+                    crimson::osd::ObjectContextRef&& new_clone,
                     ceph::os::Transaction&& txn,
                     osd_op_params_t&& req,
                     epoch_t min_epoch, epoch_t max_epoch,
index 4e735c3b4cb96c37c0fe958b4f3f563889c86e22..97b241fdce40bb6c0df02aaa24e695834d90fa93 100644 (file)
@@ -940,6 +940,7 @@ std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone(
   };
   encode(cloned_snaps, cloning_ctx->log_entry.snaps);
   cloning_ctx->log_entry.clean_regions.mark_data_region_dirty(0, initial_obs.oi.size);
+  cloning_ctx->clone_obc = clone_obc;
 
   return cloning_ctx;
 }
@@ -966,7 +967,7 @@ void OpsExecuter::update_clone_overlap() {
 
 void OpsExecuter::CloningContext::apply_to(
   std::vector<pg_log_entry_t>& log_entries,
-  ObjectContext& processed_obc) &&
+  ObjectContext& processed_obc)
 {
   log_entry.mtime = processed_obc.obs.oi.mtime;
   log_entries.insert(log_entries.begin(), std::move(log_entry));
@@ -983,7 +984,7 @@ OpsExecuter::flush_clone_metadata(
   assert(!txn.empty());
   update_clone_overlap();
   if (cloning_ctx) {
-    std::move(*cloning_ctx).apply_to(log_entries, *obc);
+    cloning_ctx->apply_to(log_entries, *obc);
   }
   if (snapc.seq > obc->ssc->snapset.seq) {
      // update snapset with latest snap context
index 3c8b961d4b9a34262316cf6aca3737a0723439de..94b64ccebb165575151132fea0c49bb8ece46edd 100644 (file)
@@ -197,10 +197,11 @@ private:
   struct CloningContext {
     SnapSet new_snapset;
     pg_log_entry_t log_entry;
+    ObjectContextRef clone_obc;
 
     void apply_to(
       std::vector<pg_log_entry_t>& log_entries,
-      ObjectContext& processed_obc) &&;
+      ObjectContext& processed_obc);
   };
   std::unique_ptr<CloningContext> cloning_ctx;
 
@@ -520,7 +521,10 @@ OpsExecuter::flush_changes_n_do_ops_effects(
       std::move(txn),
       std::move(obc),
       std::move(*osd_op_params),
-      std::move(log_entries));
+      std::move(log_entries),
+      cloning_ctx
+       ? std::move(cloning_ctx->clone_obc)
+       : nullptr);
 
     submitted = std::move(_submitted);
     all_completed = std::move(_all_completed);
index c5bdcae47f237f2cb9d9d64bf1848f1cad2d61ed..8cab612568217a1c617ce3879fc1851dc1482c4a 100644 (file)
@@ -435,6 +435,7 @@ SnapTrimObjSubEvent::process_and_submit(ObjectContextRef head_obc,
 
   auto [submitted, all_completed] = co_await pg->submit_transaction(
          std::move(clone_obc),
+         nullptr,
          std::move(txn),
          std::move(osd_op_p),
          std::move(log_entries)
index 544ffb6a6853d31f06972e56fba394cde93d27e7..1e2988efbbe9e75aa12e4a5af9ecc6271b38f050 100644 (file)
@@ -907,11 +907,23 @@ void PG::mutate_object(
   }
 }
 
+void PG::enqueue_push_for_backfill(
+  const hobject_t &obj,
+  const eversion_t &v,
+  const std::vector<pg_shard_t> &peers)
+{
+  assert(recovery_handler);
+  assert(recovery_handler->backfill_state);
+  auto backfill_state = recovery_handler->backfill_state.get();
+  backfill_state->enqueue_standalone_push(obj, v, peers);
+}
+
 PG::interruptible_future<
   std::tuple<PG::interruptible_future<>,
              PG::interruptible_future<>>>
 PG::submit_transaction(
   ObjectContextRef&& obc,
+  ObjectContextRef&& new_clone,
   ceph::os::Transaction&& txn,
   osd_op_params_t&& osd_op_p,
   std::vector<pg_log_entry_t>&& log_entries)
@@ -940,6 +952,7 @@ PG::submit_transaction(
   auto [submitted, all_completed] = co_await backend->submit_transaction(
       peering_state.get_acting_recovery_backfill(),
       obc->obs.oi.soid,
+      std::move(new_clone),
       std::move(txn),
       std::move(osd_op_p),
       peering_state.get_last_peering_reset(),
@@ -948,8 +961,8 @@ PG::submit_transaction(
   co_return std::make_tuple(
     std::move(submitted),
     all_completed.then_interruptible(
-      [this, at_version,
-      last_complete=peering_state.get_info().last_complete](auto acked) {
+      [this, last_complete=peering_state.get_info().last_complete, at_version]
+      (auto acked) {
       for (const auto& peer : acked) {
         peering_state.update_peer_last_complete_ondisk(
           peer.shard, peer.last_complete_ondisk);
@@ -1154,11 +1167,13 @@ PG::submit_executer_fut PG::submit_executer(
     [FNAME, this](auto&& txn,
                  auto&& obc,
                  auto&& osd_op_p,
-                 auto&& log_entries) {
+                 auto&& log_entries,
+                  auto&& new_clone) {
       DEBUGDPP("object {} submitting txn", *this, obc->get_oid());
       mutate_object(obc, txn, osd_op_p);
       return submit_transaction(
        std::move(obc),
+        std::move(new_clone),
        std::move(txn),
        std::move(osd_op_p),
        std::move(log_entries));
index 632683690a2ea2a27f2f03bfe010f189e09bf863..15aeec0e4f35c0fd93c3b73d3ec91a603cb7f6b3 100644 (file)
@@ -45,6 +45,7 @@
 class MQuery;
 class OSDMap;
 class PGBackend;
+class ReplicatedBackend;
 class PGPeeringEvent;
 class osd_op_params_t;
 
@@ -678,6 +679,7 @@ private:
     std::tuple<interruptible_future<>, interruptible_future<>>>
   submit_transaction(
     ObjectContextRef&& obc,
+    ObjectContextRef&& new_clone,
     ceph::os::Transaction&& txn,
     osd_op_params_t&& oop,
     std::vector<pg_log_entry_t>&& log_entries);
@@ -885,6 +887,10 @@ private:
   friend class SnapTrimObjSubEvent;
 private:
 
+  void enqueue_push_for_backfill(
+    const hobject_t &obj,
+    const eversion_t &v,
+    const std::vector<pg_shard_t> &peers);
   void mutate_object(
     ObjectContextRef& obc,
     ceph::os::Transaction& txn,
@@ -913,6 +919,7 @@ private:
 
 private:
   friend class IOInterruptCondition;
+  friend class ::ReplicatedBackend;
   struct log_update_t {
     std::set<pg_shard_t> waiting_on;
     seastar::shared_promise<> all_committed;
index fa1f1405ffe0f4fcfa9d65f94bcc657379583558..813218983fdf75e1d78009962d0b9f7bf043fb97 100644 (file)
@@ -414,6 +414,7 @@ public:
   virtual rep_op_fut_t
   submit_transaction(const std::set<pg_shard_t> &pg_shards,
                     const hobject_t& hoid,
+                    crimson::osd::ObjectContextRef&& new_clone,
                     ceph::os::Transaction&& txn,
                     osd_op_params_t&& osd_op_p,
                     epoch_t min_epoch, epoch_t max_epoch,
index 705b3176b97900fe14c090f8ca7c2e092b2d17e8..657e6d3e888c7385a15fae9f6799b8f158f046a4 100644 (file)
@@ -45,6 +45,10 @@ public:
 
   seastar::future<> stop() { return seastar::now(); }
   void on_pg_clean();
+  void enqueue_push(
+    const hobject_t& obj,
+    const eversion_t& v,
+    const std::vector<pg_shard_t> &peers) final;
 private:
   PGRecoveryListener* pg;
   size_t start_primary_recovery_ops(
@@ -108,10 +112,6 @@ private:
     const hobject_t& end) final;
   void request_primary_scan(
     const hobject_t& begin) final;
-  void enqueue_push(
-    const hobject_t& obj,
-    const eversion_t& v,
-    const std::vector<pg_shard_t> &peers) final;
   void enqueue_drop(
     const pg_shard_t& target,
     const hobject_t& obj,
index 681d4c9a817407537fa840f3e0242b06fc0a863d..f09cd147ea9e4e2234341c534c6432e9b6b0b72d 100644 (file)
@@ -44,6 +44,7 @@ MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
   epoch_t min_epoch,
   epoch_t map_epoch,
   const std::vector<pg_log_entry_t> &log_entries,
+  bool send_op,
   ceph_tid_t tid)
 {
   ceph_assert(pg_shard != whoami);
@@ -57,7 +58,7 @@ MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
     min_epoch,
     tid,
     osd_op_p.at_version);
-  if (pg.should_send_op(pg_shard, hoid)) {
+  if (send_op) {
     m->set_data(encoded_txn);
   } else {
     ceph::os::Transaction t;
@@ -73,18 +74,21 @@ MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
 }
 
 ReplicatedBackend::rep_op_fut_t
-ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
-                                      const hobject_t& hoid,
-                                      ceph::os::Transaction&& t,
-                                      osd_op_params_t&& opp,
-                                      epoch_t min_epoch, epoch_t map_epoch,
-                                     std::vector<pg_log_entry_t>&& logv)
+ReplicatedBackend::submit_transaction(
+  const std::set<pg_shard_t> &pg_shards,
+  const hobject_t& hoid,
+  crimson::osd::ObjectContextRef &&new_clone,
+  ceph::os::Transaction&& t,
+  osd_op_params_t&& opp,
+  epoch_t min_epoch, epoch_t map_epoch,
+  std::vector<pg_log_entry_t>&& logv)
 {
   LOG_PREFIX(ReplicatedBackend::submit_transaction);
   DEBUGDPP("object {}", dpp, hoid);
   auto log_entries = std::move(logv);
   auto txn = std::move(t);
   auto osd_op_p = std::move(opp);
+  auto _new_clone = std::move(new_clone);
 
   const ceph_tid_t tid = shard_services.get_tid();
   auto pending_txn =
@@ -96,18 +100,34 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
     le.mark_unrollbackable();
   }
 
+  std::vector<pg_shard_t> to_push_clone;
   auto sends = std::make_unique<std::vector<seastar::future<>>>();
-  for (auto pg_shard : pg_shards) {
-    if (pg_shard != whoami) {
-      auto m = new_repop_msg(
+  for (auto &pg_shard : pg_shards) {
+    if (pg_shard == whoami) {
+      continue;
+    }
+    MURef<MOSDRepOp> m;
+    if (pg.should_send_op(pg_shard, hoid)) {
+      m = new_repop_msg(
+       pg_shard, hoid, encoded_txn, osd_op_p,
+       min_epoch, map_epoch, log_entries, true, tid);
+    } else {
+      m = new_repop_msg(
        pg_shard, hoid, encoded_txn, osd_op_p,
-       min_epoch, map_epoch, log_entries, tid);
-      pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
-      // TODO: set more stuff. e.g., pg_states
-      sends->emplace_back(
-       shard_services.send_to_osd(
-         pg_shard.osd, std::move(m), map_epoch));
+       min_epoch, map_epoch, log_entries, false, tid);
+      if (_new_clone && pg.is_missing_on_peer(pg_shard, hoid)) {
+       // The head is in the push queue but hasn't been pushed yet.
+       // We need to ensure that the newly created clone will be 
+       // pushed as well, otherwise we might skip it.
+       // See: https://tracker.ceph.com/issues/68808
+       to_push_clone.push_back(pg_shard);
+      }
     }
+    pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+    // TODO: set more stuff. e.g., pg_states
+    sends->emplace_back(
+      shard_services.send_to_osd(
+       pg_shard.osd, std::move(m), map_epoch));
   }
 
   co_await pg.update_snap_map(log_entries, txn);
@@ -137,9 +157,16 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
       return seastar::now();
     }
     return peers->all_committed.get_shared_future();
-  }).then_interruptible([pending_txn, this] {
+  }).then_interruptible([pending_txn, this, _new_clone,
+                       to_push_clone=std::move(to_push_clone)] {
     auto acked_peers = std::move(pending_txn->second.acked_peers);
     pending_trans.erase(pending_txn);
+    if (_new_clone && !to_push_clone.empty()) {
+      pg.enqueue_push_for_backfill(
+       _new_clone->obs.oi.soid,
+       _new_clone->obs.oi.version,
+       to_push_clone);
+    }
     return seastar::make_ready_future<
       crimson::osd::acked_peers_t>(std::move(acked_peers));
   });
index dccc69da49aaa1bbd64b3db7998bdce13827524e..d5844b23a0c886fc410c411ae9797dcda5ce28d1 100644 (file)
@@ -35,6 +35,7 @@ private:
   rep_op_fut_t submit_transaction(
     const std::set<pg_shard_t> &pg_shards,
     const hobject_t& hoid,
+    crimson::osd::ObjectContextRef&& new_clone,
     ceph::os::Transaction&& txn,
     osd_op_params_t&& osd_op_p,
     epoch_t min_epoch, epoch_t max_epoch,
@@ -68,6 +69,7 @@ private:
     epoch_t min_epoch,
     epoch_t map_epoch,
     const std::vector<pg_log_entry_t> &log_entries,
+    bool send_op,
     ceph_tid_t tid);
 
   seastar::future<> request_committed(