From 0697562bde4194c29ff911637ff13588813a0ef0 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 23 Jul 2024 22:10:58 -0700 Subject: [PATCH] crimson/osd: update snaps based on log entries on primary and replica Signed-off-by: Samuel Just Signed-off-by: Xuehan Xu --- src/crimson/osd/ec_backend.cc | 4 +- src/crimson/osd/ops_executer.cc | 8 -- src/crimson/osd/ops_executer.h | 21 +++-- .../osd/osd_operations/snaptrim_event.cc | 25 +++--- src/crimson/osd/pg.cc | 88 +++++++++++++++---- src/crimson/osd/pg.h | 6 +- src/crimson/osd/pg_backend.h | 3 +- src/crimson/osd/replicated_backend.cc | 21 +++-- 8 files changed, 124 insertions(+), 52 deletions(-) diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 26f7e5dedde0d..32eaaf02b3f37 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -32,6 +32,6 @@ ECBackend::submit_transaction(const std::set &pg_shards, std::vector&& log_entries) { // todo - return {seastar::now(), - seastar::make_ready_future()}; + return make_ready_future(seastar::now(), + seastar::make_ready_future()); } diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 7c46754866799..e5982680ca855 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -1020,14 +1020,6 @@ OpsExecuter::flush_clone_metadata( update_clone_overlap(); if (cloning_ctx) { std::move(*cloning_ctx).apply_to(log_entries, *obc); - const auto& coid = log_entries.front().soid; - const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap]; - maybe_snap_mapped = snap_map_clone( - coid, - std::set{std::begin(cloned_snaps), std::end(cloned_snaps)}, - snap_mapper, - osdriver, - txn); } if (snapc.seq > obc->ssc->snapset.seq) { // update snapset with latest snap context diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 834266ce68f05..812e246826688 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -272,6 +272,7 @@ private: OSDriver& osdriver, ceph::os::Transaction& txn); +public: static interruptible_future<> snap_map_remove( const hobject_t& soid, SnapMapper& snap_mapper, @@ -290,6 +291,7 @@ private: OSDriver& osdriver, ceph::os::Transaction& txn); +private: // this gizmo could be wrapped in std::optional for the sake of lazy // initialization. we don't need it for ops that doesn't have effect // TODO: verify the init overhead of chunked_fifo @@ -534,14 +536,17 @@ OpsExecuter::flush_changes_n_do_ops_effects( if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) { ceph_assert(log_rit->version == osd_op_params->at_version); } - auto [submitted, all_completed] = - std::forward(mut_func)(std::move(txn), - std::move(obc), - std::move(*osd_op_params), - std::move(log_entries)); - return interruptor::make_ready_future( - std::move(submitted), - osd_op_ierrorator::future<>(std::move(all_completed))); + return std::forward(mut_func)(std::move(txn), + std::move(obc), + std::move(*osd_op_params), + std::move(log_entries) + ).then_interruptible([](auto p) { + auto &submitted = std::get<0>(p); + auto &all_completed = std::get<1>(p); + return interruptor::make_ready_future( + std::move(submitted), + osd_op_ierrorator::future<>(std::move(all_completed))); + }); }); } apply_stats(); diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index f1247c3fa08f2..a8001e75c4c8f 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -409,20 +409,25 @@ SnapTrimObjSubEvent::start() logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid()); return remove_or_update( clone_obc, head_obc - ).safe_then_interruptible([clone_obc, this](auto&& txn) mutable { - auto [submitted, all_completed] = pg->submit_transaction( + ).safe_then_interruptible( + [clone_obc, this](auto&& txn) mutable { + return pg->submit_transaction( std::move(clone_obc), std::move(txn), std::move(osd_op_p), - std::move(log_entries)); - return submitted.then_interruptible( - [this, all_completed=std::move(all_completed)]() mutable { - return enter_stage( - client_pp().wait_repop - ).then_interruptible([all_completed=std::move(all_completed)]() mutable{ - return std::move(all_completed); + std::move(log_entries) + ).then_interruptible([this](auto p) { + auto &submitted = std::get<0>(p); + auto &all_completed = std::get<1>(p); + return submitted.then_interruptible( + [this, all_completed=std::move(all_completed)]() mutable { + return enter_stage( + client_pp().wait_repop + ).then_interruptible([all_completed=std::move(all_completed)]() mutable{ + return std::move(all_completed); + }); }); - }); + }); }); }); }, diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 291385de64991..cd362a0c9f047 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -894,8 +894,9 @@ void PG::mutate_object( } } -std::tuple, - PG::interruptible_future<>> +PG::interruptible_future< + std::tuple, + PG::interruptible_future<>>> PG::submit_transaction( ObjectContextRef&& obc, ceph::os::Transaction&& txn, @@ -903,9 +904,10 @@ PG::submit_transaction( std::vector&& log_entries) { if (__builtin_expect(stopping, false)) { - return {seastar::make_exception_future<>( - crimson::common::system_shutdown_exception()), - seastar::now()}; + co_return std::make_tuple( + interruptor::make_interruptible(seastar::make_exception_future<>( + crimson::common::system_shutdown_exception())), + interruptor::now()); } epoch_t map_epoch = get_osdmap_epoch(); @@ -917,7 +919,7 @@ PG::submit_transaction( ceph_assert(log_entries.rbegin()->version >= projected_last_update); projected_last_update = log_entries.rbegin()->version; - auto [submitted, all_completed] = backend->submit_transaction( + auto [submitted, all_completed] = co_await backend->submit_transaction( peering_state.get_acting_recovery_backfill(), obc->obs.oi.soid, std::move(txn), @@ -925,16 +927,19 @@ PG::submit_transaction( peering_state.get_last_peering_reset(), map_epoch, std::move(log_entries)); - return std::make_tuple(std::move(submitted), all_completed.then_interruptible( - [this, last_complete=peering_state.get_info().last_complete, + co_return std::make_tuple( + std::move(submitted), + all_completed.then_interruptible( + [this, last_complete=peering_state.get_info().last_complete, at_version=osd_op_p.at_version](auto acked) { - for (const auto& peer : acked) { - peering_state.update_peer_last_complete_ondisk( - peer.shard, peer.last_complete_ondisk); - } - peering_state.complete_write(at_version, last_complete); - return seastar::now(); - })); + for (const auto& peer : acked) { + peering_state.update_peer_last_complete_ondisk( + peer.shard, peer.last_complete_ondisk); + } + peering_state.complete_write(at_version, last_complete); + return seastar::now(); + }) + ); } PG::interruptible_future<> PG::repair_object( @@ -1453,6 +1458,11 @@ PG::interruptible_future<> PG::handle_rep_op(Ref req) std::vector log_entries; decode(log_entries, p); update_stats(req->pg_stats); + + co_await update_snap_map( + log_entries, + txn); + log_operation(std::move(log_entries), req->pg_trim_to, req->version, @@ -1477,6 +1487,54 @@ PG::interruptible_future<> PG::handle_rep_op(Ref req) ); co_return; } + +PG::interruptible_future<> PG::update_snap_map( + const std::vector &log_entries, + ObjectStore::Transaction& t) +{ + LOG_PREFIX(PG::update_snap_map); + for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) { + OSDriver::OSTransaction _t(osdriver.get_transaction(&t)); + if (i->soid.snap < CEPH_MAXSNAP) { + if (i->is_delete()) { + co_await OpsExecuter::snap_map_remove( + i->soid, + snap_mapper, + osdriver, + t); + } else if (i->is_update()) { + ceph_assert(i->snaps.length() > 0); + vector snaps; + bufferlist snapbl = i->snaps; + auto p = snapbl.cbegin(); + try { + decode(snaps, p); + } catch (...) { + ERRORDPP("Failed to decode snaps on {}", *this, *i); + snaps.clear(); + } + set _snaps(snaps.begin(), snaps.end()); + + if (i->is_clone() || i->is_promote()) { + co_await OpsExecuter::snap_map_clone( + i->soid, + _snaps, + snap_mapper, + osdriver, + t); + } else if (i->is_modify()) { + co_await OpsExecuter::snap_map_modify( + i->soid, + _snaps, + snap_mapper, + osdriver, + t); + } else { + ceph_assert(i->is_clean()); + } + } + } + } } void PG::log_operation( diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index b1e5e1fa22aa2..d4d3bb92e0034 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -591,6 +591,9 @@ public: interruptible_future<> handle_rep_op(Ref m); void update_stats(const pg_stat_t &stat); + interruptible_future<> update_snap_map( + const std::vector &log_entries, + ObjectStore::Transaction& t); void log_operation( std::vector&& logv, const eversion_t &trim_to, @@ -673,7 +676,8 @@ private: SuccessFunc&& success_func, FailureFunc&& failure_func); interruptible_future> do_pg_ops(Ref m); - std::tuple, interruptible_future<>> + interruptible_future< + std::tuple, interruptible_future<>>> submit_transaction( ObjectContextRef&& obc, ceph::os::Transaction&& txn, diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 21dce24b899ec..fa1f1405ffe0f 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -60,9 +60,10 @@ public: using interruptible_future = ::crimson::interruptible::interruptible_future< ::crimson::osd::IOInterruptCondition, T>; - using rep_op_fut_t = + using rep_op_ret_t = std::tuple, interruptible_future>; + using rep_op_fut_t = interruptible_future; PGBackend(shard_id_t shard, CollectionRef coll, crimson::osd::ShardServices &shard_services, DoutPrefixProvider &dpp); diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 6ec8d30d59678..cbb8c883e0752 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -5,6 +5,7 @@ #include "messages/MOSDRepOpReply.h" +#include "crimson/common/coroutine.h" #include "crimson/common/exception.h" #include "crimson/common/log.h" #include "crimson/os/futurized_store.h" @@ -38,13 +39,16 @@ ReplicatedBackend::_read(const hobject_t& hoid, ReplicatedBackend::rep_op_fut_t ReplicatedBackend::submit_transaction(const std::set& pg_shards, const hobject_t& hoid, - ceph::os::Transaction&& txn, - osd_op_params_t&& osd_op_p, + ceph::os::Transaction&& t, + osd_op_params_t&& opp, epoch_t min_epoch, epoch_t map_epoch, - std::vector&& log_entries) + std::vector&& logv) { LOG_PREFIX(ReplicatedBackend::submit_transaction); DEBUGDPP("object {}", dpp, hoid); + auto log_entries = std::move(logv); + auto txn = std::move(t); + auto osd_op_p = std::move(opp); const ceph_tid_t tid = shard_services.get_tid(); auto pending_txn = @@ -89,6 +93,8 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, } } + co_await pg.update_snap_map(log_entries, txn); + pg.log_operation( std::move(log_entries), osd_op_p.pg_trim_to, @@ -99,8 +105,8 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, false); auto all_completed = interruptor::make_interruptible( - shard_services.get_store().do_transaction(coll, std::move(txn)) - ).then_interruptible([FNAME, this, + shard_services.get_store().do_transaction(coll, std::move(txn)) + ).then_interruptible([FNAME, this, peers=pending_txn->second.weak_from_this()] { if (!peers) { // for now, only actingset_changed can cause peers @@ -117,13 +123,14 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, }).then_interruptible([pending_txn, this] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); - return seastar::make_ready_future(std::move(acked_peers)); + return seastar::make_ready_future< + crimson::osd::acked_peers_t>(std::move(acked_peers)); }); auto sends_complete = seastar::when_all_succeed( sends->begin(), sends->end() ).finally([sends=std::move(sends)] {}); - return {std::move(sends_complete), std::move(all_completed)}; + co_return std::make_tuple(std::move(sends_complete), std::move(all_completed)); } void ReplicatedBackend::on_actingset_changed(bool same_primary) -- 2.39.5