From bc5d15e7fc5e1c86ea59d3a789c544873ae25fb3 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 9 Dec 2022 01:13:30 +0000 Subject: [PATCH] crimson/osd: simplify informing SnapMapper about new mappings Signed-off-by: Radoslaw Zarzynski --- src/crimson/osd/ops_executer.cc | 138 +++++----- src/crimson/osd/ops_executer.h | 46 ++-- .../osd/osd_operations/snaptrim_event.cc | 237 +++++++++--------- .../osd/osd_operations/snaptrim_event.h | 2 +- 4 files changed, 197 insertions(+), 226 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 544328f4df7..6306a0ac1d7 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -807,6 +807,9 @@ void OpsExecuter::fill_op_params_bump_pg_version() std::vector OpsExecuter::prepare_transaction( const std::vector& ops) { + // let's ensure we don't need to inform SnapMapper about this particular + // entry. + assert(obc->obs.oi.soid.snap >= CEPH_MAXSNAP); std::vector log_entries; log_entries.emplace_back( obc->obs.exists ? @@ -828,100 +831,55 @@ std::vector OpsExecuter::prepare_transaction( return log_entries; } -void OpsExecuter::snap_map_remove( +OpsExecuter::interruptible_future<> OpsExecuter::snap_map_remove( const hobject_t& soid, SnapMapper& snap_mapper, - SnapMapperTransaction& txn) + OSDriver& osdriver, + ceph::os::Transaction& txn) { logger().debug("{}: soid {}", __func__, soid); - const auto r = snap_mapper.remove_oid(soid, &txn); - if (r) { - logger().error("{}: remove_oid {} failed with {}", - __func__, soid, r); - } - // On removal tolerate missing key corruption - assert(r == 0 || r == -ENOENT); + return interruptor::async([soid, &snap_mapper, + _t=osdriver.get_transaction(&txn)]() mutable { + const auto r = snap_mapper.remove_oid(soid, &_t); + if (r) { + logger().error("{}: remove_oid {} failed with {}", + __func__, soid, r); + } + // On removal tolerate missing key corruption + assert(r == 0 || r == -ENOENT); + }); } -void OpsExecuter::snap_map_modify( +OpsExecuter::interruptible_future<> OpsExecuter::snap_map_modify( const hobject_t& soid, const std::set& snaps, SnapMapper& snap_mapper, - SnapMapperTransaction& txn) + OSDriver& osdriver, + ceph::os::Transaction& txn) { logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps); - assert(std::size(snaps) > 0); - [[maybe_unused]] const auto r = snap_mapper.update_snaps( - soid, snaps, 0, &txn); - assert(r == 0); + return interruptor::async([soid, snaps, &snap_mapper, + _t=osdriver.get_transaction(&txn)]() mutable { + assert(std::size(snaps) > 0); + [[maybe_unused]] const auto r = snap_mapper.update_snaps( + soid, snaps, 0, &_t); + assert(r == 0); + }); } -void OpsExecuter::snap_map_clone( +OpsExecuter::interruptible_future<> OpsExecuter::snap_map_clone( const hobject_t& soid, const std::set& snaps, SnapMapper& snap_mapper, - SnapMapperTransaction& txn) -{ - logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps); - assert(std::size(snaps) > 0); - snap_mapper.add_oid(soid, snaps, &txn); -} - -OpsExecuter::interruptible_future<> OpsExecuter::flush_snap_map( - const std::vector& log_entries, - SnapMapper& snap_mapper, OSDriver& osdriver, ceph::os::Transaction& txn) { - logger().debug("{} log_entries.size()={}", - __func__, std::size(log_entries)); - for (const auto& le : log_entries) { - if (le.soid.snap >= CEPH_MAXSNAP) { - logger().debug("{} {} >= CEPH_MAXSNAP", - __func__, le.soid); - continue; - } - return interruptor::async([_t=osdriver.get_transaction(&txn), - &le, &snap_mapper]() mutable { - if (le.is_delete()) { - logger().debug("flush_snap_map: is_delete()"); - snap_mapper.remove_oid( - le.soid, - &_t); - } else if (le.is_update()) { - assert(le.snaps.length() > 0); - std::vector snaps; - ceph::bufferlist snapbl = le.snaps; - auto p = snapbl.cbegin(); - try { - decode(snaps, p); - } catch (...) { - logger().error("flush_snap_map: decode snaps failure on {}", le); - snaps.clear(); - } - std::set _snaps(snaps.begin(), snaps.end()); - if (le.is_clone() || le.is_promote()) { - logger().debug("flush_snap_map: le.is_clone() || le.is_promote()"); - snap_mapper.add_oid( - le.soid, - _snaps, - &_t); - } else if (le.is_modify()) { - logger().debug("flush_snap_map: is_modify()"); - int r = snap_mapper.update_snaps( - le.soid, - _snaps, - 0, - &_t); - assert(r == 0); - } else { - assert(le.is_clean()); - logger().debug("flush_snap_map: is_clean()"); - } - } - }); - } - return seastar::now(); + logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps); + return interruptor::async([soid, snaps, &snap_mapper, + _t=osdriver.get_transaction(&txn)]() mutable { + assert(std::size(snaps) > 0); + snap_mapper.add_oid(soid, snaps, &_t); + }); } // Defined here because there is a circular dependency between OpsExecuter and PG @@ -1017,15 +975,29 @@ void OpsExecuter::CloningContext::apply_to( processed_obc.ssc->snapset = std::move(new_snapset); } -void OpsExecuter::flush_clone_metadata( - std::vector& log_entries) +OpsExecuter::interruptible_future> +OpsExecuter::flush_clone_metadata( + std::vector&& log_entries, + SnapMapper& snap_mapper, + OSDriver& osdriver, + ceph::os::Transaction& txn) { assert(!txn.empty()); + auto maybe_snap_mapped = interruptor::now(); if (cloning_ctx) { osd_op_params->at_version = pg->next_version(); - std::move(*cloning_ctx).apply_to(osd_op_params->at_version, - log_entries, - *obc); + std::move(*cloning_ctx).apply_to( + osd_op_params->at_version, + log_entries, + *obc); + const auto& coid = log_entries.back().soid; + const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap]; + maybe_snap_mapped = snap_map_clone( + coid, + std::set{std::begin(cloned_snaps), std::end(cloned_snaps)}, + snap_mapper, + osdriver, + txn); } if (snapc.seq > obc->ssc->snapset.seq) { // update snapset with latest snap context @@ -1034,6 +1006,12 @@ void OpsExecuter::flush_clone_metadata( } logger().debug("{} done, initial snapset={}, new snapset={}", __func__, obc->obs.oi.soid, obc->ssc->snapset); + return std::move( + maybe_snap_mapped + ).then_interruptible([log_entries=std::move(log_entries)]() mutable { + return interruptor::make_ready_future>( + std::move(log_entries)); + }); } // TODO: make this static diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index b6b976ab024..df0f67c3f94 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -34,8 +34,6 @@ struct ObjectState; struct OSDOp; class OSDriver; class SnapMapper; -using SnapMapperTransaction = - MapCacher::Transaction; namespace crimson::osd { class PG; @@ -249,29 +247,29 @@ private: && snapc.snaps[0] > initial_obc.ssc->snapset.seq; // existing obj is old } - void flush_clone_metadata( - std::vector& log_entries); - - interruptible_future<> flush_snap_map( - const std::vector& log_entries, + interruptible_future> flush_clone_metadata( + std::vector&& log_entries, SnapMapper& snap_mapper, OSDriver& osdriver, ceph::os::Transaction& txn); - static void snap_map_remove( + static interruptible_future<> snap_map_remove( const hobject_t& soid, SnapMapper& snap_mapper, - SnapMapperTransaction& txn); - static void snap_map_modify( + OSDriver& osdriver, + ceph::os::Transaction& txn); + static interruptible_future<> snap_map_modify( const hobject_t& soid, const std::set& snaps, SnapMapper& snap_mapper, - SnapMapperTransaction& txn); - static void snap_map_clone( + OSDriver& osdriver, + ceph::os::Transaction& txn); + static interruptible_future<> snap_map_clone( const hobject_t& soid, const std::set& snaps, SnapMapper& snap_mapper, - SnapMapperTransaction& txn); + OSDriver& osdriver, + ceph::os::Transaction& txn); // this gizmo could be wrapped in std::optional for the sake of lazy // initialization. we don't need it for ops that doesn't have effect @@ -505,16 +503,14 @@ OpsExecuter::flush_changes_n_do_ops_effects( if (user_modify) { osd_op_params->user_at_version = osd_op_params->at_version.version; } - auto log_entries = prepare_transaction(ops); - flush_clone_metadata(log_entries); - auto maybe_snap_mapped = flush_snap_map(std::as_const(log_entries), - snap_mapper, - osdriver, - txn); - apply_stats(); - maybe_mutated = maybe_snap_mapped.then_interruptible([mut_func=std::move(mut_func), - log_entries=std::move(log_entries), - this]() mutable { + maybe_mutated = flush_clone_metadata( + prepare_transaction(ops), + snap_mapper, + osdriver, + txn + ).then_interruptible([mut_func=std::move(mut_func), + this](auto&& log_entries) mutable { + apply_stats(); auto [submitted, all_completed] = std::forward(mut_func)(std::move(txn), std::move(obc), @@ -531,7 +527,9 @@ OpsExecuter::flush_changes_n_do_ops_effects( return maybe_mutated; } else { return maybe_mutated.then_unpack_interruptible( - [this, pg=std::move(pg)](auto&& submitted, auto&& all_completed) mutable { + // need extra ref pg due to apply_stats() which can be executed after + // informing snap mapper + [this, pg=this->pg](auto&& submitted, auto&& all_completed) mutable { return interruptor::make_ready_future( std::move(submitted), all_completed.safe_then_interruptible([this, pg=std::move(pg)] { diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index 960af069e0c..7a3165bd3c2 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -6,7 +6,6 @@ #include "crimson/osd/osd_operations/snaptrim_event.h" #include "crimson/osd/ops_executer.h" #include "crimson/osd/pg.h" -#include "include/expected.hpp" namespace { seastar::logger& logger() { @@ -213,31 +212,25 @@ seastar::future<> SnapTrimObjSubEvent::start() }); } -tl::expected +SnapTrimObjSubEvent::interruptible_future< + SnapTrimObjSubEvent::remove_or_update_ret_t> SnapTrimObjSubEvent::remove_or_update( ObjectContextRef obc, ObjectContextRef head_obc) { - ceph::os::Transaction txn{}; - std::vector log_entries{}; - - SnapSet& snapset = obc->ssc->snapset; - auto citer = snapset.clone_snaps.find(coid.snap); - if (citer == snapset.clone_snaps.end()) { + auto citer = obc->ssc->snapset.clone_snaps.find(coid.snap); + if (citer == obc->ssc->snapset.clone_snaps.end()) { logger().error("{}: No clone_snaps in snapset {} for object {}", *this, snapset, coid); - return tl::unexpected{-ENOENT}; } const auto& old_snaps = citer->second; if (old_snaps.empty()) { logger().error("{}: no object info snaps for object {}", *this, coid); - return tl::unexpected{-ENOENT}; } if (snapset.seq == 0) { logger().error("{}: no snapset.seq for object {}", *this, coid); - return tl::unexpected{-ENOENT}; } const OSDMapRef& osdmap = pg->get_osdmap(); std::set new_snaps; @@ -255,12 +248,16 @@ SnapTrimObjSubEvent::remove_or_update( if (p == snapset.clones.end()) { logger().error("{}: Snap {} not in clones", *this, coid.snap); - return tl::unexpected{-ENOENT}; } } + + return seastar::do_with(ceph::os::Transaction{}, [=, this](auto&& txn) { + std::vector log_entries{}; + int64_t num_objects_before_trim = delta_stats.num_objects; osd_op_p.at_version = pg->next_version(); object_info_t &coi = obc->obs.oi; + auto ret = interruptor::now(); if (new_snaps.empty()) { // remove clone logger().info("{}: {} snaps {} -> {} ... deleting", @@ -322,8 +319,7 @@ SnapTrimObjSubEvent::remove_or_update( coi = object_info_t(coid); - auto smtxn = pg->osdriver.get_transaction(&txn); - OpsExecuter::snap_map_remove(coid, pg->snap_mapper, smtxn); + ret = OpsExecuter::snap_map_remove(coid, pg->snap_mapper, pg->osdriver, txn); } else { // save adjusted snaps for this object logger().info("{}: {} snaps {} -> {}", @@ -353,93 +349,98 @@ SnapTrimObjSubEvent::remove_or_update( coi.mtime, 0} ); - auto smtxn = pg->osdriver.get_transaction(&txn); - OpsExecuter::snap_map_modify(coid, new_snaps, pg->snap_mapper, smtxn); + ret = OpsExecuter::snap_map_modify(coid, new_snaps, pg->snap_mapper, pg->osdriver, txn); } - - osd_op_p.at_version = pg->next_version(); - - // save head snapset - logger().debug("{}: {} new snapset {} on {}", - *this, coid, snapset, head_obc->obs.oi); - const auto head_oid = coid.get_head(); - if (snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) { - // NOTE: this arguably constitutes minor interference with the - // tiering agent if this is a cache tier since a snap trim event - // is effectively evicting a whiteout we might otherwise want to - // keep around. - logger().info("{}: {} removing {}", - *this, coid, head_oid); - log_entries.emplace_back( - pg_log_entry_t{ - pg_log_entry_t::DELETE, - head_oid, - osd_op_p.at_version, - head_obc->obs.oi.version, - 0, - osd_reqid_t(), - coi.mtime, // will be replaced in `apply_to()` - 0} - ); - logger().info("{}: remove snap head", *this); - object_info_t& oi = head_obc->obs.oi; - delta_stats.num_objects--; - if (oi.is_dirty()) { - delta_stats.num_objects_dirty--; + return std::move(ret).then_interruptible( + [&txn, &snapset, &coi, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this] mutable { + osd_op_p.at_version = pg->next_version(); + + // save head snapset + logger().debug("{}: {} new snapset {} on {}", + *this, coid, snapset, head_obc->obs.oi); + const auto head_oid = coid.get_head(); + if (snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) { + // NOTE: this arguably constitutes minor interference with the + // tiering agent if this is a cache tier since a snap trim event + // is effectively evicting a whiteout we might otherwise want to + // keep around. + logger().info("{}: {} removing {}", + *this, coid, head_oid); + log_entries.emplace_back( + pg_log_entry_t{ + pg_log_entry_t::DELETE, + head_oid, + osd_op_p.at_version, + head_obc->obs.oi.version, + 0, + osd_reqid_t(), + coi.mtime, // will be replaced in `apply_to()` + 0} + ); + logger().info("{}: remove snap head", *this); + object_info_t& oi = head_obc->obs.oi; + delta_stats.num_objects--; + if (oi.is_dirty()) { + delta_stats.num_objects_dirty--; + } + if (oi.is_omap()) { + delta_stats.num_objects_omap--; + } + if (oi.is_whiteout()) { + logger().debug("{}: trimming whiteout on {}", + *this, oi.soid); + delta_stats.num_whiteouts--; + } + head_obc->obs.exists = false; + head_obc->obs.oi = object_info_t(head_oid); + txn.remove(pg->get_collection_ref()->get_cid(), + ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}); + } else { + snapset.snaps.clear(); + logger().info("{}: writing updated snapset on {}, snapset is {}", + *this, head_oid, snapset); + log_entries.emplace_back( + pg_log_entry_t{ + pg_log_entry_t::MODIFY, + head_oid, + osd_op_p.at_version, + head_obc->obs.oi.version, + 0, + osd_reqid_t(), + coi.mtime, + 0} + ); + + head_obc->obs.oi.prior_version = head_obc->obs.oi.version; + head_obc->obs.oi.version = osd_op_p.at_version; + + std::map> attrs; + ceph::bufferlist bl; + encode(snapset, bl); + attrs[SS_ATTR] = std::move(bl); + + bl.clear(); + encode(head_obc->obs.oi, bl, + pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); + attrs[OI_ATTR] = std::move(bl); + txn.setattrs( + pg->get_collection_ref()->get_cid(), + ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}, + attrs); } - if (oi.is_omap()) { - delta_stats.num_objects_omap--; - } - if (oi.is_whiteout()) { - logger().debug("{}: trimming whiteout on {}", - *this, oi.soid); - delta_stats.num_whiteouts--; - } - head_obc->obs.exists = false; - head_obc->obs.oi = object_info_t(head_oid); - txn.remove(pg->get_collection_ref()->get_cid(), - ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}); - } else { - snapset.snaps.clear(); - logger().info("{}: writing updated snapset on {}, snapset is {}", - *this, head_oid, snapset); - log_entries.emplace_back( - pg_log_entry_t{ - pg_log_entry_t::MODIFY, - head_oid, - osd_op_p.at_version, - head_obc->obs.oi.version, - 0, - osd_reqid_t(), - coi.mtime, - 0} - ); - - head_obc->obs.oi.prior_version = head_obc->obs.oi.version; - head_obc->obs.oi.version = osd_op_p.at_version; - - std::map> attrs; - ceph::bufferlist bl; - encode(snapset, bl); - attrs[SS_ATTR] = std::move(bl); - - bl.clear(); - encode(head_obc->obs.oi, bl, - pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); - attrs[OI_ATTR] = std::move(bl); - txn.setattrs( - pg->get_collection_ref()->get_cid(), - ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}, - attrs); - } - // Stats reporting - Set number of objects trimmed - if (num_objects_before_trim > delta_stats.num_objects) { - int64_t num_objects_trimmed = - num_objects_before_trim - delta_stats.num_objects; - //add_objects_trimmed_count(num_objects_trimmed); - } - return std::make_pair(std::move(txn), std::move(log_entries)); + // Stats reporting - Set number of objects trimmed + if (num_objects_before_trim > delta_stats.num_objects) { + //int64_t num_objects_trimmed = + // num_objects_before_trim - delta_stats.num_objects; + //add_objects_trimmed_count(num_objects_trimmed); + } + }).then_interruptible( + [txn=std::move(txn), log_entries=std::move(log_entries)] () mutable { + return interruptor::make_ready_future( + std::make_pair(std::move(txn), std::move(log_entries))); + }); + }); } seastar::future<> SnapTrimObjSubEvent::with_pg( @@ -472,29 +473,23 @@ seastar::future<> SnapTrimObjSubEvent::with_pg( pp().process ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable { logger().debug("{}: processing clone_obc={}", *this, clone_obc); - auto head_obc = clone_obc->head; - return interruptor::async([=, this]() mutable { - if (auto ret = remove_or_update(clone_obc, head_obc); - !ret.has_value()) { - logger().error("{}: trimmig error {}", - *this, ret.error()); - //pg->state_set(PG_STATE_SNAPTRIM_ERROR); - } else { - auto [txn, log_entries] = std::move(ret).value(); - auto [submitted, all_completed] = pg->submit_transaction( - std::move(clone_obc), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - submitted.get(); - all_completed.get(); - } - }).then_interruptible([this] { - return enter_stage( - wait_repop - ); - }).then_interruptible([this, clone_obc=std::move(clone_obc)] { - return PG::load_obc_iertr::now(); + return remove_or_update( + clone_obc, clone_obc->head + ).then_unpack_interruptible([clone_obc, this] + (auto&& txn, auto&& log_entries) mutable { + auto [submitted, all_completed] = pg->submit_transaction( + std::move(clone_obc), + std::move(txn), + std::move(osd_op_p), + std::move(log_entries)); + return submitted.then_interruptible( + [all_completed=std::move(all_completed), this] () mutable { + return enter_stage( + wait_repop + ).then_interruptible([all_completed=std::move(all_completed)] () mutable { + return std::move(all_completed); + }); + }); }); }); }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] { diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h index 58eacb35a74..5c2f9fd70de 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.h +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -131,7 +131,7 @@ private: using remove_or_update_ret_t = std::pair>; - tl::expected + interruptible_future remove_or_update(ObjectContextRef obc, ObjectContextRef head_obc); // we don't need to synchronize with other instances started by -- 2.39.5