From 184c1861f8182850ffcd8ffe5e36b3f2e847969c Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Mon, 4 Nov 2024 17:30:10 +0800 Subject: [PATCH] crimson/osd/replicate_backend: add the skipped newly created clone object to the push queue after the clone request completes Fixes: https://tracker.ceph.com/issues/68808 Signed-off-by: Xuehan Xu --- src/crimson/osd/backfill_state.cc | 8 +++ src/crimson/osd/backfill_state.h | 9 +++ src/crimson/osd/ec_backend.cc | 1 + src/crimson/osd/ec_backend.h | 1 + src/crimson/osd/ops_executer.cc | 5 +- src/crimson/osd/ops_executer.h | 8 ++- .../osd/osd_operations/snaptrim_event.cc | 1 + src/crimson/osd/pg.cc | 21 ++++++- src/crimson/osd/pg.h | 7 +++ src/crimson/osd/pg_backend.h | 1 + src/crimson/osd/pg_recovery.h | 8 +-- src/crimson/osd/replicated_backend.cc | 61 +++++++++++++------ src/crimson/osd/replicated_backend.h | 2 + 13 files changed, 105 insertions(+), 28 deletions(-) diff --git a/src/crimson/osd/backfill_state.cc b/src/crimson/osd/backfill_state.cc index a77cbe8765249..0269627a2c83f 100644 --- a/src/crimson/osd/backfill_state.cc +++ b/src/crimson/osd/backfill_state.cc @@ -610,4 +610,12 @@ void BackfillState::ProgressTracker::complete_to( } } +void BackfillState::enqueue_standalone_push( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers) { + progress_tracker->enqueue_push(obj); + backfill_machine.backfill_listener.enqueue_push(obj, v, peers); +} + } // namespace crimson::osd diff --git a/src/crimson/osd/backfill_state.h b/src/crimson/osd/backfill_state.h index a49cbeaac068b..072c91e079d7f 100644 --- a/src/crimson/osd/backfill_state.h +++ b/src/crimson/osd/backfill_state.h @@ -304,6 +304,15 @@ public: backfill_machine.process_event(*std::move(evt)); } + void enqueue_standalone_push( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers); + + bool is_triggered() const { + return backfill_machine.triggering_event() != nullptr; + } + hobject_t get_last_backfill_started() const { return last_backfill_started; } diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 32eaaf02b3f37..007d0bf35f3d8 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -26,6 +26,7 @@ ECBackend::_read(const hobject_t& hoid, ECBackend::rep_op_fut_t ECBackend::submit_transaction(const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 90a7e2b1f4d7f..b14c78c9fc4a0 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -28,6 +28,7 @@ private: rep_op_fut_t submit_transaction(const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& req, epoch_t min_epoch, epoch_t max_epoch, diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 4e735c3b4cb96..97b241fdce40b 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -940,6 +940,7 @@ std::unique_ptr OpsExecuter::execute_clone( }; encode(cloned_snaps, cloning_ctx->log_entry.snaps); cloning_ctx->log_entry.clean_regions.mark_data_region_dirty(0, initial_obs.oi.size); + cloning_ctx->clone_obc = clone_obc; return cloning_ctx; } @@ -966,7 +967,7 @@ void OpsExecuter::update_clone_overlap() { void OpsExecuter::CloningContext::apply_to( std::vector& log_entries, - ObjectContext& processed_obc) && + ObjectContext& processed_obc) { log_entry.mtime = processed_obc.obs.oi.mtime; log_entries.insert(log_entries.begin(), std::move(log_entry)); @@ -983,7 +984,7 @@ OpsExecuter::flush_clone_metadata( assert(!txn.empty()); update_clone_overlap(); if (cloning_ctx) { - std::move(*cloning_ctx).apply_to(log_entries, *obc); + cloning_ctx->apply_to(log_entries, *obc); } if (snapc.seq > obc->ssc->snapset.seq) { // update snapset with latest snap context diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 3c8b961d4b9a3..94b64ccebb165 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -197,10 +197,11 @@ private: struct CloningContext { SnapSet new_snapset; pg_log_entry_t log_entry; + ObjectContextRef clone_obc; void apply_to( std::vector& log_entries, - ObjectContext& processed_obc) &&; + ObjectContext& processed_obc); }; std::unique_ptr cloning_ctx; @@ -520,7 +521,10 @@ OpsExecuter::flush_changes_n_do_ops_effects( std::move(txn), std::move(obc), std::move(*osd_op_params), - std::move(log_entries)); + std::move(log_entries), + cloning_ctx + ? std::move(cloning_ctx->clone_obc) + : nullptr); submitted = std::move(_submitted); all_completed = std::move(_all_completed); diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index c5bdcae47f237..8cab612568217 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -435,6 +435,7 @@ SnapTrimObjSubEvent::process_and_submit(ObjectContextRef head_obc, auto [submitted, all_completed] = co_await pg->submit_transaction( std::move(clone_obc), + nullptr, std::move(txn), std::move(osd_op_p), std::move(log_entries) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 544ffb6a6853d..1e2988efbbe9e 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -907,11 +907,23 @@ void PG::mutate_object( } } +void PG::enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers) +{ + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_push(obj, v, peers); +} + PG::interruptible_future< std::tuple, PG::interruptible_future<>>> PG::submit_transaction( ObjectContextRef&& obc, + ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, std::vector&& log_entries) @@ -940,6 +952,7 @@ PG::submit_transaction( auto [submitted, all_completed] = co_await backend->submit_transaction( peering_state.get_acting_recovery_backfill(), obc->obs.oi.soid, + std::move(new_clone), std::move(txn), std::move(osd_op_p), peering_state.get_last_peering_reset(), @@ -948,8 +961,8 @@ PG::submit_transaction( co_return std::make_tuple( std::move(submitted), all_completed.then_interruptible( - [this, at_version, - last_complete=peering_state.get_info().last_complete](auto acked) { + [this, last_complete=peering_state.get_info().last_complete, at_version] + (auto acked) { for (const auto& peer : acked) { peering_state.update_peer_last_complete_ondisk( peer.shard, peer.last_complete_ondisk); @@ -1154,11 +1167,13 @@ PG::submit_executer_fut PG::submit_executer( [FNAME, this](auto&& txn, auto&& obc, auto&& osd_op_p, - auto&& log_entries) { + auto&& log_entries, + auto&& new_clone) { DEBUGDPP("object {} submitting txn", *this, obc->get_oid()); mutate_object(obc, txn, osd_op_p); return submit_transaction( std::move(obc), + std::move(new_clone), std::move(txn), std::move(osd_op_p), std::move(log_entries)); diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 632683690a2ea..15aeec0e4f35c 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -45,6 +45,7 @@ class MQuery; class OSDMap; class PGBackend; +class ReplicatedBackend; class PGPeeringEvent; class osd_op_params_t; @@ -678,6 +679,7 @@ private: std::tuple, interruptible_future<>>> submit_transaction( ObjectContextRef&& obc, + ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& oop, std::vector&& log_entries); @@ -885,6 +887,10 @@ private: friend class SnapTrimObjSubEvent; private: + void enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers); void mutate_object( ObjectContextRef& obc, ceph::os::Transaction& txn, @@ -913,6 +919,7 @@ private: private: friend class IOInterruptCondition; + friend class ::ReplicatedBackend; struct log_update_t { std::set waiting_on; seastar::shared_promise<> all_committed; diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index fa1f1405ffe0f..813218983fdf7 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -414,6 +414,7 @@ public: virtual rep_op_fut_t submit_transaction(const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index 705b3176b9790..657e6d3e888c7 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -45,6 +45,10 @@ public: seastar::future<> stop() { return seastar::now(); } void on_pg_clean(); + void enqueue_push( + const hobject_t& obj, + const eversion_t& v, + const std::vector &peers) final; private: PGRecoveryListener* pg; size_t start_primary_recovery_ops( @@ -108,10 +112,6 @@ private: const hobject_t& end) final; void request_primary_scan( const hobject_t& begin) final; - void enqueue_push( - const hobject_t& obj, - const eversion_t& v, - const std::vector &peers) final; void enqueue_drop( const pg_shard_t& target, const hobject_t& obj, diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 681d4c9a81740..f09cd147ea9e4 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -44,6 +44,7 @@ MURef ReplicatedBackend::new_repop_msg( epoch_t min_epoch, epoch_t map_epoch, const std::vector &log_entries, + bool send_op, ceph_tid_t tid) { ceph_assert(pg_shard != whoami); @@ -57,7 +58,7 @@ MURef ReplicatedBackend::new_repop_msg( min_epoch, tid, osd_op_p.at_version); - if (pg.should_send_op(pg_shard, hoid)) { + if (send_op) { m->set_data(encoded_txn); } else { ceph::os::Transaction t; @@ -73,18 +74,21 @@ MURef ReplicatedBackend::new_repop_msg( } ReplicatedBackend::rep_op_fut_t -ReplicatedBackend::submit_transaction(const std::set& pg_shards, - const hobject_t& hoid, - ceph::os::Transaction&& t, - osd_op_params_t&& opp, - epoch_t min_epoch, epoch_t map_epoch, - std::vector&& logv) +ReplicatedBackend::submit_transaction( + const std::set &pg_shards, + const hobject_t& hoid, + crimson::osd::ObjectContextRef &&new_clone, + ceph::os::Transaction&& t, + osd_op_params_t&& opp, + epoch_t min_epoch, epoch_t map_epoch, + std::vector&& logv) { LOG_PREFIX(ReplicatedBackend::submit_transaction); DEBUGDPP("object {}", dpp, hoid); auto log_entries = std::move(logv); auto txn = std::move(t); auto osd_op_p = std::move(opp); + auto _new_clone = std::move(new_clone); const ceph_tid_t tid = shard_services.get_tid(); auto pending_txn = @@ -96,18 +100,34 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, le.mark_unrollbackable(); } + std::vector to_push_clone; auto sends = std::make_unique>>(); - for (auto pg_shard : pg_shards) { - if (pg_shard != whoami) { - auto m = new_repop_msg( + for (auto &pg_shard : pg_shards) { + if (pg_shard == whoami) { + continue; + } + MURef m; + if (pg.should_send_op(pg_shard, hoid)) { + m = new_repop_msg( + pg_shard, hoid, encoded_txn, osd_op_p, + min_epoch, map_epoch, log_entries, true, tid); + } else { + m = new_repop_msg( pg_shard, hoid, encoded_txn, osd_op_p, - min_epoch, map_epoch, log_entries, tid); - pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); - // TODO: set more stuff. e.g., pg_states - sends->emplace_back( - shard_services.send_to_osd( - pg_shard.osd, std::move(m), map_epoch)); + min_epoch, map_epoch, log_entries, false, tid); + if (_new_clone && pg.is_missing_on_peer(pg_shard, hoid)) { + // The head is in the push queue but hasn't been pushed yet. + // We need to ensure that the newly created clone will be + // pushed as well, otherwise we might skip it. + // See: https://tracker.ceph.com/issues/68808 + to_push_clone.push_back(pg_shard); + } } + pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); + // TODO: set more stuff. e.g., pg_states + sends->emplace_back( + shard_services.send_to_osd( + pg_shard.osd, std::move(m), map_epoch)); } co_await pg.update_snap_map(log_entries, txn); @@ -137,9 +157,16 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, return seastar::now(); } return peers->all_committed.get_shared_future(); - }).then_interruptible([pending_txn, this] { + }).then_interruptible([pending_txn, this, _new_clone, + to_push_clone=std::move(to_push_clone)] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); + if (_new_clone && !to_push_clone.empty()) { + pg.enqueue_push_for_backfill( + _new_clone->obs.oi.soid, + _new_clone->obs.oi.version, + to_push_clone); + } return seastar::make_ready_future< crimson::osd::acked_peers_t>(std::move(acked_peers)); }); diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index dccc69da49aaa..d5844b23a0c88 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -35,6 +35,7 @@ private: rep_op_fut_t submit_transaction( const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, @@ -68,6 +69,7 @@ private: epoch_t min_epoch, epoch_t map_epoch, const std::vector &log_entries, + bool send_op, ceph_tid_t tid); seastar::future<> request_committed( -- 2.39.5