From: Matty Williams Date: Mon, 18 May 2026 09:09:32 +0000 (+0100) Subject: osd: Hook up omap operations in EC pools X-Git-Tag: v21.0.1^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6c023483422ba4fcedccf19cfca986c44c5ca657;p=ceph.git osd: Hook up omap operations in EC pools Add pool flag to determine if omap operations are supported in a pool. - Currently disabled in EC pools (will later be enabled for Fast EC pools) Require all osds to have umbrella or later release version to enable pool flag. Change recovery reads to use journal updates. Clear the journal for a new epoch. Set omap_complete accurately before recovery. Encode omap updates and add entry to journal. Decode omap updates, apply updates to object store, then remove from journal. Change omap reads in PrimaryLogPG to use PGBackend functions, including omap updates from journal. Assisted-by: Bob Used for debugging and copying patterns (e.g. implementing REPLACE type to match MODIFY). Fixes: https://tracker.ceph.com/issues/74188 Signed-off-by: Matty Williams --- diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index c0ffa4053bc..7e06215a6eb 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -53,6 +53,7 @@ add_executable(crimson-osd ${PROJECT_SOURCE_DIR}/src/osd/ECUtil.cc ${PROJECT_SOURCE_DIR}/src/osd/ECUtilL.cc ${PROJECT_SOURCE_DIR}/src/osd/ECTransaction.cc + ${PROJECT_SOURCE_DIR}/src/osd/ECOmapJournal.cc ${PROJECT_SOURCE_DIR}/src/osd/osd_op_util.cc ${PROJECT_SOURCE_DIR}/src/osd/OSDCap.cc ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 532ecd446a4..368d29c7147 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -105,6 +105,7 @@ ECBackend::_read(const hobject_t& hoid, struct ECCrimsonOp : ECCommon::RMWPipeline::Op { PGTransactionUPtr t; + const PGLog &pg_log; static PGTransactionUPtr transate_transaction( ceph::os::Transaction&& t, @@ -236,9 +237,11 @@ struct ECCrimsonOp : ECCommon::RMWPipeline::Op { ECCrimsonOp(ceph::os::Transaction&& t, crimson::osd::ObjectContextRef &&obc, - ECCommon::RMWPipeline& rmw_pipeline) + const PGLog &pg_log, + ECCommon::RMWPipeline& rmw_pipeline) : Op(rmw_pipeline), - t(transate_transaction(std::move(t), std::move(obc))) { + t(transate_transaction(std::move(t), std::move(obc))), + pg_log(pg_log) { } void generate_transactions( @@ -249,7 +252,8 @@ struct ECCrimsonOp : ECCommon::RMWPipeline::Op { shard_id_map *transactions, DoutPrefixProvider *dpp, const OSDMapRef &osdmap, - bool &first_write_in_interval) final + bool &first_write_in_interval, + ECOmapJournal &ec_omap_journal) final { assert(t); ECTransaction::generate_transactions( @@ -266,7 +270,9 @@ struct ECCrimsonOp : ECCommon::RMWPipeline::Op { &temp_cleared, dpp, osdmap, - first_write_in_interval); + first_write_in_interval, + ec_omap_journal, + pg_log); } bool skip_transaction( @@ -302,12 +308,14 @@ ECBackend::submit_transaction(const std::set &pg_shards, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, - std::vector&& log_entries) + std::vector&& log_entries, + const PGLog &pg_log) { const hobject_t& hoid = obc->obs.oi.soid; logger().debug("{} hoid={} obc->attr_cache={}", __func__, hoid, obc->attr_cache); auto op = - std::make_unique(std::move(txn), std::move(obc), rmw_pipeline); + std::make_unique( + std::move(txn), std::move(obc), pg_log, rmw_pipeline); op->hoid = hoid; //op->delta_stats = delta_stats; op->version = osd_op_p.at_version; diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 8a13cff2ae5..cb76e4413c0 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -67,12 +67,13 @@ private: uint32_t flags) final; rep_op_fut_t submit_transaction(const std::set &pg_shards, - crimson::osd::ObjectContextRef&& obc, - crimson::osd::ObjectContextRef&& new_clone, - ceph::os::Transaction&& txn, - osd_op_params_t&& req, - epoch_t min_epoch, epoch_t max_epoch, - std::vector&& log_entries) final; + crimson::osd::ObjectContextRef&& obc, + crimson::osd::ObjectContextRef&& new_clone, + ceph::os::Transaction&& txn, + osd_op_params_t&& req, + epoch_t min_epoch, epoch_t max_epoch, + std::vector&& log_entries, + const PGLog &pg_log) final; seastar::future<> request_committed(const osd_reqid_t& reqid, const eversion_t& version) final { return seastar::now(); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 09770190f80..d54cee09ae2 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1052,7 +1052,8 @@ PG::submit_transaction( std::move(osd_op_p), peering_state.get_last_peering_reset(), map_epoch, - std::move(log_entries)); + std::move(log_entries), + get_log()); co_return std::make_tuple( std::move(submitted), std::move(all_completed) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 1c847b03b8c..e24427a6a0d 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -623,6 +623,9 @@ public: void trim(const pg_log_entry_t &entry) override { // TODO } + void trim_after_remove(const pg_log_entry_t &entry) override { + // TODO + } void partial_write(pg_info_t *info, eversion_t previous_version, const pg_log_entry_t &entry diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index f49e182fca5..ceecc354dce 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -26,6 +26,7 @@ struct hobject_t; struct ECListener; +class PGLog; namespace ceph::os { class Transaction; @@ -431,12 +432,13 @@ public: virtual rep_op_fut_t submit_transaction(const std::set &pg_shards, - crimson::osd::ObjectContextRef&& obc, - crimson::osd::ObjectContextRef&& new_clone, - ceph::os::Transaction&& txn, - osd_op_params_t&& osd_op_p, - epoch_t min_epoch, epoch_t max_epoch, - std::vector&& log_entries) = 0; + crimson::osd::ObjectContextRef&& obc, + crimson::osd::ObjectContextRef&& new_clone, + ceph::os::Transaction&& txn, + osd_op_params_t&& osd_op_p, + epoch_t min_epoch, epoch_t max_epoch, + std::vector&& log_entries, + const PGLog &pg_log) = 0; virtual void got_rep_op_reply(const MOSDRepOpReply&) {} virtual seastar::future<> stop() = 0; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 89feb1e9136..66bb3034e16 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -101,7 +101,8 @@ ReplicatedBackend::submit_transaction( ceph::os::Transaction&& t, osd_op_params_t&& opp, epoch_t min_epoch, epoch_t map_epoch, - std::vector&& logv) + std::vector&& logv, + const PGLog &pg_log) { LOG_PREFIX(ReplicatedBackend::submit_transaction); const hobject_t& hoid = obc->obs.oi.soid; diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index a726699ac00..c23860f1b10 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -53,7 +53,8 @@ private: ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, - std::vector&& log_entries) final; + std::vector&& log_entries, + const PGLog &pg_log) final; const pg_t pgid; class pending_on_t : public seastar::weakly_referencable { public: diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 3d9a054af82..d12ab9d8fc5 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -422,6 +422,28 @@ void ECBackend::handle_sub_write( switcher->clear_temp_objs(op.temp_removed); dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl; + + // Update EC omap journal on non-primary shards from log entries + // This ensures the journal has the correct generation info when transactions are applied + if (get_parent()->get_pool().supports_omap()) { + for (auto &&e: op.log_entries) { + if (e.is_delete() || e.is_lost_delete() || e.is_replace() || (e.is_clone() && !e.soid.is_snap())) { + if (!op.backfill_or_async_recovery) { + ec_omap_journal.append_delete(e.soid, e.version.version, e.is_lost_delete()); + dout(20) << __func__ << " appending delete to journal: " + << (e.is_clone() ? "clone" : "delete") + << " " << e.soid << " version=" << e.version.version + << " lost_delete=" << e.is_lost_delete() << dendl; + } else { + dout(20) << __func__ << " skipping journal append_delete during backfill/recovery: " + << (e.is_clone() ? "clone" : "delete") + << " " << e.soid << " version=" << e.version.version + << " lost_delete=" << e.is_lost_delete() << dendl; + } + } + } + } + // flag set to true during async recovery bool async = false; pg_missing_tracker_t pmissing = get_parent()->get_local_missing(); @@ -435,6 +457,16 @@ void ECBackend::handle_sub_write( dout(30) << " entry is_delete " << e.is_delete() << dendl; } } + + dout(20) << __func__ << " log_operation: " + << "log_entries.size=" << op.log_entries.size() + << " updated_hit_set_history=" << (op.updated_hit_set_history ? "present" : "none") + << " trim_to=" << op.trim_to + << " roll_forward_to=" << op.pg_committed_to + << " pg_committed_to=" << op.pg_committed_to + << " transaction_applied=" << !op.backfill_or_async_recovery + << " async=" << async + << dendl; get_parent()->log_operation( std::move(op.log_entries), op.updated_hit_set_history, @@ -630,10 +662,11 @@ void ECBackend::handle_sub_read( continue; } - int r = switcher->store->omap_get_header( + int r = omap_get_header( switcher->ch, ghobject_t(*i, ghobject_t::NO_GEN, shard), - &reply->omap_headers_read[*i], false); + &reply->omap_headers_read[*i], false, + switcher->store); if (r < 0) { reply->attrs_read.erase(*i); reply->omap_headers_read.erase(*i); @@ -661,7 +694,7 @@ void ECBackend::handle_sub_read( reply->omaps_complete[hoid] = false; uint64_t available = max_bytes; - const auto result = switcher->store->omap_iterate( + const auto result = omap_iterate( switcher->ch, ghobject_t(hoid, ghobject_t::NO_GEN, shard), ObjectStore::omap_iter_seek_t{ @@ -684,7 +717,7 @@ void ECBackend::handle_sub_read( current_batch.insert(make_pair(key, val_bl)); available -= std::min(available, num_new_bytes); return ObjectStore::omap_iter_ret_t::NEXT; - }); + }, switcher->store); if (result < 0) { reply->attrs_read.erase(hoid); @@ -1032,6 +1065,7 @@ void ECBackend::check_recovery_sources(const OSDMapRef &osdmap) { } void ECBackend::on_change() { + ec_omap_journal.clear_all(); rmw_pipeline.on_change(); read_pipeline.on_change(); rmw_pipeline.on_change2(); @@ -1076,7 +1110,8 @@ struct ECClassicalOp : ECCommon::RMWPipeline::Op { shard_id_map *transactions, DoutPrefixProvider *dpp, const OSDMapRef &osdmap, - bool& first_write_in_interval) final { + bool& first_write_in_interval, + ECOmapJournal &ec_omap_journal) final { ceph_assert(t); ECTransaction::generate_transactions( t.get(), @@ -1092,7 +1127,9 @@ struct ECClassicalOp : ECCommon::RMWPipeline::Op { &temp_cleared, dpp, osdmap, - first_write_in_interval); + first_write_in_interval, + ec_omap_journal, + pipeline->get_parent()->get_log()); } bool skip_transaction( @@ -1605,6 +1642,10 @@ int ECBackend::omap_iterate ( const OmapIterFunction &f, ///< [in] function to call for each key/value pair ObjectStore *store ) { + if (!get_parent()->get_pool().supports_omap()) { + return -EOPNOTSUPP; + } + // Updates in update_map take priority over removed_ranges auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj); @@ -1674,6 +1715,10 @@ int ECBackend::omap_get_values( std::map *out, ///< [out] returned key/values ObjectStore *store ) { + if (!get_parent()->get_pool().supports_omap()) { + return -EOPNOTSUPP; + } + auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj); set keys_still_to_get; @@ -1702,11 +1747,20 @@ int ECBackend::omap_get_header( const bool allow_eio, ///< [in] don't assert on eio ObjectStore *store ) { + if (!get_parent()->get_pool().supports_omap()) { + return -EOPNOTSUPP; + } + std::optional header_from_journal = ec_omap_journal.get_updated_header(oid.hobj); if (header_from_journal) { *header = *header_from_journal; + dout(20) << __func__ << ": oid=" << oid + << " from_journal=true header_size=" << header->length() << dendl; } else { + header->clear(); store->omap_get_header(c_, oid, header, allow_eio); + dout(20) << __func__ << ": oid=" << oid + << " from_journal=false header_size=" << header->length() << dendl; } return 0; } @@ -1718,6 +1772,10 @@ int ECBackend::omap_get( std::map *out, /// < [out] Key to value map ObjectStore *store ) { + if (!get_parent()->get_pool().supports_omap()) { + return -EOPNOTSUPP; + } + // Update map takes priority over removed_ranges auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj); const auto updated_header = ec_omap_journal.get_updated_header(oid.hobj); @@ -1730,6 +1788,11 @@ int ECBackend::omap_get( // Update header if present if (updated_header) { *header = *updated_header; + dout(20) << __func__ << ": oid=" << oid + << " updated_header=true header_size=" << header->length() << dendl; + } else { + dout(20) << __func__ << ": oid=" << oid + << " updated_header=false header_size=" << header->length() << dendl; } // Remove keys in removed_ranges @@ -1758,6 +1821,10 @@ int ECBackend::omap_check_keys( std::set *out, ///< [out] Subset of keys defined on oid ObjectStore *store ) { + if (!get_parent()->get_pool().supports_omap()) { + return -EOPNOTSUPP; + } + // Update map takes priority over removed_ranges auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj); auto updated_header = ec_omap_journal.get_updated_header(oid.hobj); diff --git a/src/osd/ECCommon.cc b/src/osd/ECCommon.cc index bbbbe6e41cd..9982b0ffc05 100644 --- a/src/osd/ECCommon.cc +++ b/src/osd/ECCommon.cc @@ -887,7 +887,8 @@ void ECCommon::RMWPipeline::cache_ready(Op &op) { &trans, get_parent()->get_dpp(), get_osdmap(), - first_write_in_interval); + first_write_in_interval, + ec_backend.ec_omap_journal); dout(20) << __func__ << ": written: " << written << ", op: " << op << dendl; @@ -1025,7 +1026,8 @@ struct ECDummyOp final : ECCommon::RMWPipeline::Op { shard_id_map *transactions, DoutPrefixProvider *dpp, const OSDMapRef &osdmap, - bool &first_write_in_interval + bool &first_write_in_interval, + ECOmapJournal &ec_omap_journal ) override { // NOP, as -- in contrast to ECClassicalOp -- there is no // transaction involved diff --git a/src/osd/ECCommon.h b/src/osd/ECCommon.h index 68b45d89a30..1da3390d205 100644 --- a/src/osd/ECCommon.h +++ b/src/osd/ECCommon.h @@ -590,7 +590,8 @@ struct ECCommon { shard_id_map *transactions, DoutPrefixProvider *dpp, const OSDMapRef &osdmap, - bool &first_write_in_interval) = 0; + bool &first_write_in_interval, + ECOmapJournal &ec_omap_journal) = 0; virtual bool skip_transaction( std::set &pending_roll_forward, diff --git a/src/osd/ECTransaction.cc b/src/osd/ECTransaction.cc index fedda8ca493..8368a219040 100644 --- a/src/osd/ECTransaction.cc +++ b/src/osd/ECTransaction.cc @@ -22,6 +22,7 @@ #include "ECUtil.h" #include "os/ObjectStore.h" #include "common/inline_variant.h" +#include "PGLog.h" #ifndef WITH_CRIMSON #include "osd/osd_internal_types.h" @@ -407,6 +408,25 @@ void ECTransaction::Generate::process_init() { ghobject_t(oid, ghobject_t::NO_GEN, shard)); } + // Check ECOmapJournal first to see if there are pending omap updates + // This avoids costly PG log traversal when not necessary + if (osdmap->get_pg_pool(pgid.pool())->supports_omap() && + ec_omap_journal.has_omap_updates(cop.source)) { + // There are incomplete omap updates which need to be applied to the clone + eversion_t can_rollback_to = pg_log.get_can_rollback_to(); + OmapCloneVisitor omap_visitor(transactions, pgid, cop.source, oid, sinfo, ec_omap_journal, dpp); + + for (auto &log_entry : get_incomplete_ec_omap_log_entries(cop.source, can_rollback_to)) { + // Only apply updates after can_rollback_to version + if (log_entry->version > can_rollback_to && log_entry->mod_desc.can_rollback()) { + log_entry->mod_desc.visit(&omap_visitor); + } + } + + // Apply accumulated omap updates to clone + omap_visitor.apply_to_clone(); + } + if (obc) { auto cobciter = t.obc_map.find(cop.source); ceph_assert(cobciter != t.obc_map.end()); @@ -458,6 +478,193 @@ void alloc_hint(PGTransaction::ObjectOperation& op, } } +void ECTransaction::accumulate_omap_updates( + bool clear_omap, + const std::optional& header, + const std::vector>& updates, + std::optional& out_header, + std::map>& key_updates, + std::list>>& removed_ranges) +{ + // Handle clear_omap flag + if (clear_omap) { + key_updates.clear(); + removed_ranges.clear(); + removed_ranges.emplace_back("", std::nullopt); + out_header.reset(); + } + + // Handle omap header + if (header) { + out_header = header; + } + + // Decode and accumulate updates + for (auto &[type, bl] : updates) { + auto p = bl.cbegin(); + + switch (type) { + case OmapUpdateType::Insert: { + std::map kv_map; + decode(kv_map, p); + for (auto &[key, value] : kv_map) { + key_updates[key] = value; + } + break; + } + case OmapUpdateType::Remove: { + std::set keys_to_remove; + decode(keys_to_remove, p); + for (auto &key : keys_to_remove) { + key_updates[key] = std::nullopt; + } + break; + } + case OmapUpdateType::RemoveRange: { + std::string range_start, range_end; + decode(range_start, p); + decode(range_end, p); + removed_ranges.emplace_back(range_start, range_end); + + // Mark keys within range as removed + auto map_it = key_updates.lower_bound(range_start); + while (map_it != key_updates.end()) { + if (map_it->first >= range_end) break; + map_it->second = std::nullopt; + ++map_it; + } + break; + } + } + } +} + +void ECTransaction::apply_omap_to_transactions( + shard_id_map& transactions, + const pg_t& pgid, + const hobject_t& target_oid, + const ECUtil::stripe_info_t& sinfo, + bool clear_omap, + const std::optional& header, + const std::map>& key_updates, + const std::list>>& removed_ranges, + const DoutPrefixProvider* dpp) +{ + for (auto &&[shard, t] : transactions) { + // Only primary capable-shards store omap + if (sinfo.is_nonprimary_shard(shard)) { + continue; + } + + coll_t coll(spg_t(pgid, shard)); + ghobject_t goid(target_oid, ghobject_t::NO_GEN, shard); + + // Apply clear_omap if needed + if (clear_omap) { + t.omap_clear(coll, goid); + } + + // Apply removed ranges + for (auto &[range_start, range_end] : removed_ranges) { + if (range_end) { + t.omap_rmkeyrange(coll, goid, range_start, *range_end); + } else { + t.omap_rmkeyrange(coll, goid, range_start, ""); + } + } + + // Apply header update if present + if (header) { + ldpp_dout(dpp, 20) << __func__ << ": omap_setheader oid=" + << target_oid << " header_size=" << header->length() << dendl; + t.omap_setheader(coll, goid, *header); + } + + // Apply key updates + if (!key_updates.empty()) { + std::map to_set; + std::set to_remove; + + for (auto &[key, val] : key_updates) { + if (val) { + to_set[key] = *val; + } else { + to_remove.insert(key); + } + } + + if (!to_set.empty()) { + t.omap_setkeys(coll, goid, to_set); + } + if (!to_remove.empty()) { + t.omap_rmkeys(coll, goid, to_remove); + } + } + } +} + + +void ECTransaction::OmapCloneVisitor::ec_omap( + bool clear_omap, + std::optional header, + std::vector> &updates) { + + ldpp_dout(dpp, 20) << __func__ << ": src=" << source_oid + << " dest=" << dest_oid << " clear_omap=" << clear_omap + << " header_size=" << (header ? header->length() : 0) << dendl; + + accumulate_omap_updates( + clear_omap, + header, + updates, + omap_header, + omap_updates, + removed_ranges); + + if (clear_omap) { + has_clear_omap = true; + } +} + +void ECTransaction::OmapCloneVisitor::apply_to_clone() { + apply_omap_to_transactions( + transactions, + pgid, + dest_oid, + sinfo, + has_clear_omap, + omap_header, + omap_updates, + removed_ranges, + dpp); +} + +void ECTransaction::Generate::apply_omap_updates_without_journal() { + std::map> key_updates; + std::list>> removed_ranges; + std::optional header_out; + + accumulate_omap_updates( + op.clear_omap, + op.omap_header, + op.omap_updates, + header_out, + key_updates, + removed_ranges); + + apply_omap_to_transactions( + transactions, + pgid, + oid, + sinfo, + op.clear_omap, + header_out, + key_updates, + removed_ranges, + dpp); +} + + ECTransaction::Generate::Generate(PGTransaction &t, ErasureCodeInterfaceRef &ec_impl, pg_t &pgid, @@ -471,7 +678,9 @@ ECTransaction::Generate::Generate(PGTransaction &t, WritePlanObj &plan, DoutPrefixProvider *dpp, pg_log_entry_t *entry, - bool &first_write_in_interval) + bool &first_write_in_interval, + ECOmapJournal &ec_omap_journal, + const PGLog &pg_log) : t(t), ec_impl(ec_impl), pgid(pgid), @@ -484,7 +693,9 @@ ECTransaction::Generate::Generate(PGTransaction &t, op(op), plan(plan), read_sem(&sinfo), - to_write(&sinfo) { + to_write(&sinfo), + ec_omap_journal(ec_omap_journal), + pg_log(pg_log) { ldpp_dout(dpp, 20) << __func__ << ": " << oid << " partial_extents=" << partial_extents << " written_map=" << *written_map @@ -507,7 +718,7 @@ ECTransaction::Generate::Generate(PGTransaction &t, ceph_assert(oid.is_temp()); } - if (entry && entry->is_modify() && op.updated_snaps) { + if (entry && (entry->is_modify() || entry->is_replace()) && op.updated_snaps) { bufferlist bl(op.updated_snaps->second.size() * 8 + 8); encode(op.updated_snaps->second, bl); entry->snaps.swap(bl); @@ -535,13 +746,13 @@ ECTransaction::Generate::Generate(PGTransaction &t, if (op.is_fresh_object() && entry) { entry->mod_desc.create(); + if (osdmap->get_pg_pool(pgid.pool())->supports_omap()) { + ec_omap_journal.append_create(plan.hoid); + } } process_init(); - // omap not supported (except 0, handled above) - ceph_assert(!(op.clear_omap) && !(op.omap_header) && op.omap_updates.empty()); - if (op.alloc_hint) { all_shards_written(); alloc_hint(op, transactions, pgid, oid, sinfo); @@ -579,15 +790,32 @@ ECTransaction::Generate::Generate(PGTransaction &t, // we want to update OI on all shards bool size_change = plan.orig_size != plan.projected_size; bool clear_whiteout = false; + bool create_whiteout = false; - // If we are updating the OI and we have a cache of the previous OI values - if (op.attr_updates.contains(OI_ATTR) && obc && obc->attr_cache.contains(OI_ATTR)) - { - object_info_t oi_cache((obc->attr_cache[OI_ATTR])); - if (oi_cache.test_flag(object_info_t::FLAG_WHITEOUT)) - { - object_info_t oi_updates(*(op.attr_updates[OI_ATTR])); - clear_whiteout = !oi_updates.test_flag(object_info_t::FLAG_WHITEOUT); + if (op.attr_updates.contains(OI_ATTR)) { + bufferlist &bl = op.attr_updates.find(OI_ATTR)->second.value(); + auto p = bl.cbegin(); + object_info_t new_oi; + decode(new_oi, p); + + if (new_oi.is_whiteout()) { + create_whiteout = true; + } + + if (obc && obc->attr_cache.contains(OI_ATTR)) { + object_info_t oi_cache((obc->attr_cache[OI_ATTR])); + if (oi_cache.test_flag(object_info_t::FLAG_WHITEOUT)) + { + object_info_t oi_updates(*(op.attr_updates[OI_ATTR])); + clear_whiteout = !oi_updates.test_flag(object_info_t::FLAG_WHITEOUT); + } + } + } + + if (create_whiteout) { + ldpp_dout(dpp, 10) << __func__ << " detecting whiteout creation for " << oid << dendl; + if (osdmap->get_pg_pool(pgid.pool())->supports_omap()) { + ec_omap_journal.append_whiteout(plan.hoid); } } @@ -606,6 +834,20 @@ ECTransaction::Generate::Generate(PGTransaction &t, attr_updates(); } + if (!op.omap_updates.empty() || op.clear_omap || op.omap_header) { + ceph_assert(osdmap->get_pg_pool(pgid.pool())->supports_omap()); + if (entry) { + ECOmapJournalEntry new_entry(entry->version, op.clear_omap, op.omap_header, op.omap_updates); + entry->mod_desc.ec_omap( + op.clear_omap, + op.omap_header, + op.omap_updates); + ec_omap_journal.add_entry(plan.hoid, new_entry); + } else { + apply_omap_updates_without_journal(); + } + } + if (!entry) { return; } @@ -1016,7 +1258,9 @@ void ECTransaction::generate_transactions( set *temp_removed, DoutPrefixProvider *dpp, const OSDMapRef &osdmap, - bool &first_write_in_interval) { + bool &first_write_in_interval, + ECOmapJournal &ec_omap_journal, + const PGLog &pg_log) { ceph_assert(written_map); ceph_assert(transactions); ceph_assert(temp_added); @@ -1049,8 +1293,50 @@ void ECTransaction::generate_transactions( ceph_assert(plan.hoid == oid); Generate generate(t, ec_impl, pgid, sinfo, partial_extents, written_map, - *transactions, osdmap, oid, op, plan, dpp, entry, first_write_in_interval); + *transactions, osdmap, oid, op, plan, dpp, entry, + first_write_in_interval, ec_omap_journal, pg_log); plans.plans.pop_front(); }); } + +std::vector ECTransaction::Generate::get_incomplete_ec_omap_log_entries( + const hobject_t &hoid, + eversion_t can_rollback_to) { + + std::vector result; + + // Helper visitor to check if a log entry has EC omap updates + struct OmapChecker : public ObjectModDesc::Visitor { + bool has_omap = false; + + void ec_omap( + bool clear_omap, + std::optional header, + std::vector> &updates) override { + has_omap = true; + } + }; + + // Iterate through the PG log to find incomplete entries for this object + for (const auto &entry : pg_log.get_log().log) { + // Filter by object, version, and rollback capability + if (entry.soid == hoid && + entry.version > can_rollback_to && + entry.mod_desc.can_rollback()) { + + // Check if this entry has EC omap updates + OmapChecker checker; + entry.mod_desc.visit(&checker); + if (checker.has_omap) { + result.push_back(&entry); + } + } + } + + ldpp_dout(dpp, 20) << __func__ << ": found " << result.size() + << " incomplete log entries with EC omap updates for " << hoid + << " after version " << can_rollback_to << dendl; + + return result; +} diff --git a/src/osd/ECTransaction.h b/src/osd/ECTransaction.h index 9e633fde5ee..7a2f24ee80d 100644 --- a/src/osd/ECTransaction.h +++ b/src/osd/ECTransaction.h @@ -22,6 +22,9 @@ #include "os/Transaction.h" #include "OSDMap.h" #include "PGTransaction.h" +#include "osd/ECOmapJournal.h" + +class PGLog; namespace ECTransaction { class WritePlanObj { @@ -77,6 +80,85 @@ struct WritePlan { } }; +/** + * Decode and accumulate omap updates from encoded operation list. + * Handles Insert, Remove, and RemoveRange operations. + */ +void accumulate_omap_updates( + bool clear_omap, + const std::optional& header, + const std::vector>& updates, + std::optional& out_header, + std::map>& key_updates, + std::list>>& removed_ranges); + +/** + * Apply accumulated omap updates to primary-capable shard transactions. + */ +void apply_omap_to_transactions( + shard_id_map& transactions, + const pg_t& pgid, + const hobject_t& target_oid, + const ECUtil::stripe_info_t& sinfo, + bool clear_omap, + const std::optional& header, + const std::map>& key_updates, + const std::list>>& removed_ranges, + const DoutPrefixProvider* dpp); + + +/** + * OmapCloneVisitor - Visitor to extract and apply omap updates to clone transactions + * + * This visitor implements ObjectModDesc::Visitor to traverse PG log entries and + * accumulate omap updates (key-value pairs, range removals, header changes) that + * need to be applied to a cloned object. It ensures that incomplete omap updates + * from the PG log are properly transferred to the clone. + */ +class OmapCloneVisitor : public ObjectModDesc::Visitor { +private: + shard_id_map &transactions; + const pg_t &pgid; + const hobject_t &source_oid; + const hobject_t &dest_oid; + const ECUtil::stripe_info_t &sinfo; + ECOmapJournal &ec_omap_journal; + const DoutPrefixProvider *dpp; + + // Accumulated omap state + bool has_clear_omap = false; + std::optional omap_header; + std::map> omap_updates; + std::list>> removed_ranges; + +public: + OmapCloneVisitor( + shard_id_map &txns, + const pg_t &pg, + const hobject_t &src, + const hobject_t &dst, + const ECUtil::stripe_info_t &stripe_info, + ECOmapJournal &journal, + const DoutPrefixProvider *dpp) + : transactions(txns), pgid(pg), source_oid(src), dest_oid(dst), + sinfo(stripe_info), ec_omap_journal(journal), dpp(dpp) {} + + /** + * Called by ObjectModDesc::visit() when an ec_omap modification is encountered + * Accumulates omap updates from the PG log entry + */ + void ec_omap( + bool clear_omap, + std::optional header, + std::vector> &updates) override; + + /** + * Apply accumulated omap updates to the clone transaction + * This should be called after visiting all relevant log entries + */ + void apply_to_clone(); +}; + class Generate { PGTransaction &t; const ErasureCodeInterfaceRef &ec_impl; @@ -97,6 +179,8 @@ class Generate { std::vector rollback_shards; uint32_t fadvise_flags = 0; bool written_shards_final{false}; + ECOmapJournal &ec_omap_journal; + const PGLog &pg_log; void all_shards_written(); void shard_written(const shard_id_t shard); @@ -110,6 +194,15 @@ class Generate { void appends_and_clone_ranges(); void written_shards(); void attr_updates(); + std::vector get_incomplete_ec_omap_log_entries( + const hobject_t &hoid, + eversion_t can_rollback_to); + /** + * Apply omap updates directly to transactions without journaling. + * Should only be called when entry is null (temporary/non-journaled operations). + * For journaled operations, use entry->mod_desc.ec_omap() instead. + */ + void apply_omap_updates_without_journal(); public: Generate(PGTransaction &t, @@ -123,7 +216,9 @@ class Generate { WritePlanObj &plan, DoutPrefixProvider *dpp, pg_log_entry_t *entry, - bool &first_write_in_interval); + bool &first_write_in_interval, + ECOmapJournal &ec_omap_journal, + const PGLog &pg_log); }; void generate_transactions( @@ -140,6 +235,8 @@ void generate_transactions( std::set *temp_removed, DoutPrefixProvider *dpp, const OSDMapRef &osdmap, - bool &first_write_in_interval + bool &first_write_in_interval, + ECOmapJournal &ec_omap_journal, + const PGLog &pg_log ); } diff --git a/src/osd/ECTransactionL.cc b/src/osd/ECTransactionL.cc index 10f4a5d8827..5e8170d5fa1 100644 --- a/src/osd/ECTransactionL.cc +++ b/src/osd/ECTransactionL.cc @@ -162,12 +162,12 @@ void ECTransactionL::generate_transactions( } if (entry && - entry->is_modify() && - op.updated_snaps) { - bufferlist bl(op.updated_snaps->second.size() * 8 + 8); - encode(op.updated_snaps->second, bl); - entry->snaps.swap(bl); - entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog); + (entry->is_modify() || entry->is_replace()) && + op.updated_snaps) { + bufferlist bl(op.updated_snaps->second.size() * 8 + 8); + encode(op.updated_snaps->second, bl); + entry->snaps.swap(bl); + entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog); } ldpp_dout(dpp, 20) << "generate_transactions: " diff --git a/src/osd/PG.h b/src/osd/PG.h index 4432ccaf8fc..df99e42d170 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1144,6 +1144,9 @@ protected: void trim(const pg_log_entry_t &entry) override { pg->get_pgbackend()->trim(entry, t); } + void trim_after_remove(const pg_log_entry_t &entry) override { + pg->get_pgbackend()->trim_after_remove(entry, t); + } void partial_write(pg_info_t *info, eversion_t previous_version, const pg_log_entry_t &entry ) override { diff --git a/src/osd/PGBackend.cc b/src/osd/PGBackend.cc index 5be44f75db6..053836c7e33 100644 --- a/src/osd/PGBackend.cc +++ b/src/osd/PGBackend.cc @@ -337,12 +337,12 @@ void PGBackend::rollback( t->append(vis.t); } -struct Trimmer : public ObjectModDesc::Visitor { +struct TrimmerPostRemove : public ObjectModDesc::Visitor { const hobject_t &soid; PGBackend *pg; ObjectStore::Transaction *t; const pg_log_entry_t &entry; - Trimmer( + TrimmerPostRemove( PGBackend *pg, ObjectStore::Transaction *t, const pg_log_entry_t &entry) @@ -352,6 +352,11 @@ struct Trimmer : public ObjectModDesc::Visitor { soid, old_version, t); + + if (pg->get_parent()->get_pool().allows_ecoptimizations() + && pg->get_parent()->get_pool().supports_omap()) { + pg->omap_trim_delete_from_journal(soid, old_version); + } } // try_rmobject defaults to rmobject void rollback_extents( @@ -378,9 +383,19 @@ struct Trimmer : public ObjectModDesc::Visitor { } } } +}; - void ec_omap(bool clear_omap, std::optional omap_header, - std::vector> &omap_updates) override { +struct Trimmer : TrimmerPostRemove { + Trimmer( + PGBackend *pg, + ObjectStore::Transaction *t, + const pg_log_entry_t &entry) + : TrimmerPostRemove(pg, t, entry) {} + void ec_omap(bool clear_omap, std::optional omap_header, + std::vector> &omap_updates) override + { + ceph_assert(pg->get_parent()->get_pool().allows_ecoptimizations()); + ceph_assert(pg->get_parent()->get_pool().supports_omap()); auto shard = pg->get_parent()->whoami_shard().shard; spg_t spg = pg->get_parent()->whoami_spg_t(); @@ -476,6 +491,16 @@ void PGBackend::trim( entry.mod_desc.visit(&trimmer); } +void PGBackend::trim_after_remove( + const pg_log_entry_t &entry, + ObjectStore::Transaction *t) +{ + if (!entry.can_rollback()) + return; + TrimmerPostRemove trimmer(this, t, entry); + entry.mod_desc.visit(&trimmer); +} + void PGBackend::try_stash( const hobject_t &hoid, version_t v, diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 57cc018eaa7..985ddac00a5 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -552,6 +552,10 @@ typedef std::shared_ptr OSDMapRef; const pg_log_entry_t &entry, ObjectStore::Transaction *t); + void trim_after_remove( + const pg_log_entry_t &entry, + ObjectStore::Transaction *t); + void partial_write( pg_info_t *info, eversion_t previous_version, diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h index c31c17f899a..74f182e06b8 100644 --- a/src/osd/PGLog.h +++ b/src/osd/PGLog.h @@ -147,6 +147,8 @@ struct PGLog : DoutPrefixProvider { const pg_log_entry_t &entry) = 0; virtual void trim( const pg_log_entry_t &entry) = 0; + virtual void trim_after_remove( + const pg_log_entry_t &entry) = 0; virtual void remove( const hobject_t &hoid) = 0; virtual void try_stash( @@ -1226,7 +1228,7 @@ protected: rollbacker->remove(hoid); } for (auto &&i: entries) { - rollbacker->trim(i); + rollbacker->trim_after_remove(i); } } return; @@ -1247,7 +1249,7 @@ protected: rollbacker->remove(hoid); } for (auto &&i: entries) { - rollbacker->trim(i); + rollbacker->trim_after_remove(i); } } return; @@ -1278,7 +1280,7 @@ protected: } if (rollbacker) { for (auto &&i: entries) { - rollbacker->trim(i); + rollbacker->trim_after_remove(i); } } return; @@ -1324,7 +1326,7 @@ protected: if (!object_not_in_store) rollbacker->remove(hoid); for (auto &&i: entries) { - rollbacker->trim(i); + rollbacker->trim_after_remove(i); } } missing.add(hoid, prior_version, eversion_t(), false); diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 24deee836d4..58b661ce856 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -7893,8 +7893,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) uint32_t num = 0; bool truncated = false; if (oi.is_omap()) { - const auto result = osd->store->omap_iterate( - ch, ghobject_t(soid), + const auto result = get_pgbackend()->omap_iterate( + ch, ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard), ObjectStore::omap_iter_seek_t{ .seek_position = start_after, .seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND @@ -7949,8 +7949,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) bufferlist bl; if (oi.is_omap()) { using omap_iter_seek_t = ObjectStore::omap_iter_seek_t; - const auto result = osd->store->omap_iterate( - ch, ghobject_t(soid), + const auto result = get_pgbackend()->omap_iterate( + ch, ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard), // try to seek as many keys-at-once as possible for the sake of performance. // note complexity should be logarithmic, so seek(n/2) + seek(n/2) is worse // than just seek(n). @@ -7994,7 +7994,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) } ++ctx->num_read; { - osd->store->omap_get_header(ch, ghobject_t(soid), &osd_op.outdata); + get_pgbackend()->omap_get_header(ch, + ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard), + &osd_op.outdata, false); ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); ctx->delta_stats.num_rd++; } @@ -8015,7 +8017,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) tracepoint(osd, do_osd_op_pre_omapgetvalsbykeys, soid.oid.name.c_str(), soid.snap.val, list_entries(keys_to_get).c_str()); map out; if (oi.is_omap()) { - osd->store->omap_get_values(ch, ghobject_t(soid), keys_to_get, &out); + get_pgbackend()->omap_get_values(ch, + ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard), keys_to_get, &out); } // else return empty omap entries encode(out, osd_op.outdata); ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); @@ -8050,8 +8053,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) i != assertions.end(); ++i) to_get.insert(i->first); - int r = osd->store->omap_get_values(ch, ghobject_t(soid), - to_get, &out); + int r = get_pgbackend()->omap_get_values(ch, + ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard), + to_get, &out); if (r < 0) { result = r; break; @@ -8148,10 +8152,12 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) ++ctx->num_write; result = 0; { - maybe_create_new_object(ctx); - t->omap_setheader(soid, osd_op.indata); - ctx->clean_regions.mark_omap_dirty(); - ctx->delta_stats.num_wr++; + maybe_create_new_object(ctx); + dout(20) << __func__ << ": omap_setheader soid=" << soid + << " header_size=" << osd_op.indata.length() << dendl; + t->omap_setheader(soid, osd_op.indata); + ctx->clean_regions.mark_omap_dirty(); + ctx->delta_stats.num_wr++; } obs.oi.set_flag(object_info_t::FLAG_OMAP); obs.oi.clear_omap_digest(); @@ -8171,6 +8177,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) break; } if (oi.is_omap()) { + dout(20) << __func__ << ": omap_clear soid=" << soid + << " (header will be cleared)" << dendl; t->omap_clear(soid); ctx->clean_regions.mark_omap_dirty(); ctx->delta_stats.num_wr++; @@ -8319,6 +8327,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) result = -EINVAL; break; } + dout(20) << __func__ << " COPY_FROM operation starting on OSD " + << osd->whoami << " dest=" << soid << " src=" << src + << " src_version=" << src_version << dendl; CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op); if (have_truncate) cb->set_truncate(truncate_seq, truncate_size); @@ -9181,10 +9192,15 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx) if (soid.snap == CEPH_NOSNAP) make_writeable(ctx); - finish_ctx(ctx, - ctx->new_obs.exists ? pg_log_entry_t::MODIFY : - pg_log_entry_t::DELETE, - result); + int log_op_type; + if (ctx->use_replace_op) { + log_op_type = pg_log_entry_t::REPLACE; + } else { + log_op_type = ctx->new_obs.exists ? pg_log_entry_t::MODIFY : + pg_log_entry_t::DELETE; + } + + finish_ctx(ctx, log_op_type, result); return result; } @@ -9543,47 +9559,79 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp, // omap uint32_t omap_keys = 0; + dout(20) << __func__ << " omap section start - cursor.omap_offset='" << cursor.omap_offset + << "' left=" << left << " cursor.omap_complete=" << cursor.omap_complete + << " supports_omap=" << pool.info.supports_omap() + << " is_omap=" << oi.is_omap() << dendl; if (!pool.info.supports_omap() || !oi.is_omap()) { cursor.omap_complete = true; + dout(20) << __func__ << " omap_complete set to true (pool doesn't support omap or object has no omap)" << dendl; } else { if (left > 0 && !cursor.omap_complete) { ceph_assert(cursor.data_complete); if (cursor.omap_offset.empty()) { - osd->store->omap_get_header(ch, ghobject_t(oi.soid), - &reply_obj.omap_header); + dout(20) << __func__ << " reading omap header (cursor.omap_offset is empty)" << dendl; + int r = get_pgbackend()->omap_get_header(ch, + ghobject_t(oi.soid, ghobject_t::NO_GEN, whoami_shard().shard), + &reply_obj.omap_header, false); + if (r < 0) { + ceph_abort(); + } + dout(20) << __func__ << " omap_get_header result=" << r + << " header_length=" << reply_obj.omap_header.length() << dendl; + } else { + dout(20) << __func__ << " NOT reading omap header (cursor.omap_offset='" + << cursor.omap_offset << "' is not empty)" << dendl; } bufferlist omap_data; - const auto result = osd->store->omap_iterate( - ch, ghobject_t(oi.soid), + dout(20) << __func__ << " calling omap_iterate with cursor.omap_offset='" + << cursor.omap_offset << "'" << dendl; + const auto result = get_pgbackend()->omap_iterate( + ch, ghobject_t(oi.soid, ghobject_t::NO_GEN, whoami_shard().shard), ObjectStore::omap_iter_seek_t{ .seek_position = cursor.omap_offset, .seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND }, [&omap_data, &omap_keys, &left, &cursor] (std::string_view key, std::string_view value) mutable { - ++omap_keys; - encode(key, omap_data); - encode(value, omap_data); - left -= key.length() + 4 + value.length() + 4; - if (left <= 0) { - cursor.omap_offset = key; + ++omap_keys; + encode(key, omap_data); + encode(value, omap_data); + left -= key.length() + 4 + value.length() + 4; + if (left <= 0) { + cursor.omap_offset = key; return ObjectStore::omap_iter_ret_t::STOP; - } + } return ObjectStore::omap_iter_ret_t::NEXT; }); + dout(20) << __func__ << " omap_iterate result=" << result << " omap_keys=" << omap_keys << dendl; if (result < 0) { - ceph_abort(); + ceph_abort(); } else if (const auto more = static_cast(result); !more) { - cursor.omap_complete = true; - dout(20) << " got omap" << dendl; + cursor.omap_complete = true; + dout(20) << __func__ << " omap_complete set to true (no more keys, omap_iterate returned !more)" << dendl; + dout(20) << " got omap" << dendl; + } else { + dout(20) << __func__ << " omap NOT complete (more keys available, stopped due to left=" << left << ")" << dendl; } if (omap_keys) { - encode(omap_keys, reply_obj.omap_data); - reply_obj.omap_data.claim_append(omap_data); + encode(omap_keys, reply_obj.omap_data); + reply_obj.omap_data.claim_append(omap_data); + dout(20) << __func__ << " encoded " << omap_keys << " omap keys, total omap_data length=" + << reply_obj.omap_data.length() << dendl; + } else { + dout(20) << __func__ << " no omap keys read in this iteration" << dendl; } + } else { + dout(20) << __func__ << " skipping omap read (left=" << left + << " cursor.omap_complete=" << cursor.omap_complete << ")" << dendl; } } + dout(20) << __func__ << " omap iteration summary: omap_keys_read=" + << omap_keys << " cursor.omap_complete=" << cursor.omap_complete + << " cursor.omap_offset='" << cursor.omap_offset << "'" << dendl; + if (cursor.is_complete()) { // include reqids only in the final step. this is a bit fragile // but it works... @@ -10222,17 +10270,21 @@ void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t) if (pool.info.supports_omap()) { if (!cop->temp_cursor.omap_complete) { if (cop->omap_header.length()) { - t->omap_setheader( - cop->results.temp_oid, - cop->omap_header); - cop->omap_header.clear(); + dout(20) << __func__ << " writing omap_header to transaction, length=" + << cop->omap_header.length() << " temp_oid=" << cop->results.temp_oid << dendl; + t->omap_setheader( + cop->results.temp_oid, + cop->omap_header); + cop->omap_header.clear(); } if (cop->omap_data.length()) { - map omap; - bufferlist::const_iterator p = cop->omap_data.begin(); - decode(omap, p); - t->omap_setkeys(cop->results.temp_oid, omap); - cop->omap_data.clear(); + map omap; + bufferlist::const_iterator p = cop->omap_data.begin(); + decode(omap, p); + dout(20) << __func__ << " writing omap_data to transaction, " + << omap.size() << " keys, temp_oid=" << cop->results.temp_oid << dendl; + t->omap_setkeys(cop->results.temp_oid, omap); + cop->omap_data.clear(); } } } else { @@ -10251,6 +10303,7 @@ void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb) if (obs.exists) { dout(20) << __func__ << ": exists, removing" << dendl; ctx->op_t->remove(obs.oi.soid); + ctx->use_replace_op = true; } else { ctx->delta_stats.num_objects++; obs.exists = true; diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index d094d4f60ad..bff6b0b372d 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -697,6 +697,7 @@ public: bool ignore_cache; ///< true if IGNORE_CACHE flag is std::set bool ignore_log_op_stats; // don't log op stats bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection + bool use_replace_op = false; ///< use REPLACE op type instead of MODIFY/DELETE (set by finish_copyfrom) ObjectCleanRegions clean_regions; // side effects diff --git a/src/osd/SnapMapper.cc b/src/osd/SnapMapper.cc index aa1e557667e..615dc3a19a3 100644 --- a/src/osd/SnapMapper.cc +++ b/src/osd/SnapMapper.cc @@ -855,7 +855,7 @@ void SnapMapper::update_snap_map( i.soid, _snaps, _t); - } else if (i.is_modify()) { + } else if (i.is_modify() || i.is_replace()) { int r = update_snaps( i.soid, _snaps, diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index d0b82e6509c..237645fbd4c 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -4772,6 +4772,16 @@ void ObjectModDesc::visit(Visitor *visitor) const visitor->rollback_extents(gen, extents, object_size, shards); break; } + case EC_OMAP: { + bool clear_omap; + std::optional omap_header; + std::vector> omap_updates; + decode(clear_omap, bp); + decode(omap_header, bp); + decode(omap_updates, bp); + visitor->ec_omap(clear_omap, omap_header, omap_updates); + break; + } default: ceph_abort_msg("Invalid rollback code"); } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 55d533be8c8..7b30617a84e 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -4533,6 +4533,7 @@ struct pg_log_entry_t { PROMOTE = 8, // promoted object from another tier CLEAN = 9, // mark an object clean ERROR = 10, // write that returned an error + REPLACE = 11, // replace (delete + recreate) operation }; static const char *get_op_name(int op) { switch (op) { @@ -4554,6 +4555,8 @@ struct pg_log_entry_t { return "clean"; case ERROR: return "error"; + case REPLACE: + return "replace"; default: return "unknown"; } @@ -4610,11 +4613,12 @@ struct pg_log_entry_t { bool is_lost_delete() const { return op == LOST_DELETE; } bool is_lost_mark() const { return op == LOST_MARK; } bool is_error() const { return op == ERROR; } + bool is_replace() const { return op == REPLACE; } bool is_update() const { return is_clone() || is_modify() || is_promote() || is_clean() || - is_lost_revert() || is_lost_mark(); + is_lost_revert() || is_lost_mark() || is_replace(); } bool is_delete() const { return op == DELETE || op == LOST_DELETE; @@ -4642,7 +4646,7 @@ struct pg_log_entry_t { bool reqid_is_indexed() const { return reqid != osd_reqid_t() && - (op == MODIFY || op == DELETE || op == ERROR); + (op == MODIFY || op == DELETE || op == ERROR || op == REPLACE); } void set_op_returns(const std::vector& ops) { diff --git a/src/test/osd/MockPGLogEntryHandler.h b/src/test/osd/MockPGLogEntryHandler.h index 79651d9870c..43d161d882a 100644 --- a/src/test/osd/MockPGLogEntryHandler.h +++ b/src/test/osd/MockPGLogEntryHandler.h @@ -54,6 +54,10 @@ class MockPGLogEntryHandler : public PGLog::LogEntryHandler { lgeneric_dout(g_ceph_context, 0) << "MockPGLogEntryHandler::trim " << entry << dendl; backend->trim(entry, t); } + void trim_after_remove(const pg_log_entry_t &entry) override { + lgeneric_dout(g_ceph_context, 0) << "MockPGLogEntryHandler::trim_after_remove " << entry << dendl; + backend->trim_after_remove(entry, t); + } void partial_write(pg_info_t *info, eversion_t previous_version, const pg_log_entry_t &entry ) override { diff --git a/src/test/osd/TestPGLog.cc b/src/test/osd/TestPGLog.cc index 946c3c0de6d..26076c4d303 100644 --- a/src/test/osd/TestPGLog.cc +++ b/src/test/osd/TestPGLog.cc @@ -244,6 +244,8 @@ public: } void trim( const pg_log_entry_t &entry) override {} + void trim_after_remove( + const pg_log_entry_t &entry) override {} void partial_write( pg_info_t *info, eversion_t previous_version, @@ -373,6 +375,8 @@ struct TestHandler : public PGLog::LogEntryHandler { } void trim( const pg_log_entry_t &entry) override {} + void trim_after_remove( + const pg_log_entry_t &entry) override {} void partial_write( pg_info_t *info, eversion_t previous_version,