]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Hook up omap operations in EC pools
authorMatty Williams <Matty.Williams@ibm.com>
Mon, 18 May 2026 09:09:32 +0000 (10:09 +0100)
committerMatty Williams <Matty.Williams@ibm.com>
Tue, 9 Jun 2026 08:46:07 +0000 (09:46 +0100)
Add pool flag to determine if omap operations are supported in a pool.
- Currently disabled in EC pools (will later be enabled for Fast EC pools)
Require all osds to have umbrella or later release version to enable pool flag.
Change recovery reads to use journal updates.
Clear the journal for a new epoch.
Set omap_complete accurately before recovery.
Encode omap updates and add entry to journal.
Decode omap updates, apply updates to object store, then remove from journal.
Change omap reads in PrimaryLogPG to use PGBackend functions, including omap updates from journal.

Assisted-by: Bob
Used for debugging and copying patterns (e.g. implementing REPLACE type to match MODIFY).

Fixes: https://tracker.ceph.com/issues/74188
Signed-off-by: Matty Williams <Matty.Williams@ibm.com>
25 files changed:
src/crimson/osd/CMakeLists.txt
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h
src/osd/ECBackend.cc
src/osd/ECCommon.cc
src/osd/ECCommon.h
src/osd/ECTransaction.cc
src/osd/ECTransaction.h
src/osd/ECTransactionL.cc
src/osd/PG.h
src/osd/PGBackend.cc
src/osd/PGBackend.h
src/osd/PGLog.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h
src/osd/SnapMapper.cc
src/osd/osd_types.cc
src/osd/osd_types.h
src/test/osd/MockPGLogEntryHandler.h
src/test/osd/TestPGLog.cc

index c0ffa4053bcc3518a8dbdf7857e5bc80da90cf63..7e06215a6eb8a33ea6ee30444e1542039b72dffe 100644 (file)
@@ -53,6 +53,7 @@ add_executable(crimson-osd
   ${PROJECT_SOURCE_DIR}/src/osd/ECUtil.cc
   ${PROJECT_SOURCE_DIR}/src/osd/ECUtilL.cc
   ${PROJECT_SOURCE_DIR}/src/osd/ECTransaction.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/ECOmapJournal.cc
   ${PROJECT_SOURCE_DIR}/src/osd/osd_op_util.cc
   ${PROJECT_SOURCE_DIR}/src/osd/OSDCap.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
index 532ecd446a4da0837d40dee97883bbf577c5f07e..368d29c71479ce21f143bc3fc1c88f87f4e78958 100644 (file)
@@ -105,6 +105,7 @@ ECBackend::_read(const hobject_t& hoid,
 
 struct ECCrimsonOp : ECCommon::RMWPipeline::Op {
   PGTransactionUPtr t;
+  const PGLog &pg_log;
 
   static PGTransactionUPtr transate_transaction(
     ceph::os::Transaction&& t,
@@ -236,9 +237,11 @@ struct ECCrimsonOp : ECCommon::RMWPipeline::Op {
 
   ECCrimsonOp(ceph::os::Transaction&& t,
               crimson::osd::ObjectContextRef &&obc,
-             ECCommon::RMWPipeline& rmw_pipeline)
+              const PGLog &pg_log,
+       ECCommon::RMWPipeline& rmw_pipeline)
     : Op(rmw_pipeline),
-      t(transate_transaction(std::move(t), std::move(obc))) {
+      t(transate_transaction(std::move(t), std::move(obc))),
+      pg_log(pg_log) {
   }
 
   void generate_transactions(
@@ -249,7 +252,8 @@ struct ECCrimsonOp : ECCommon::RMWPipeline::Op {
       shard_id_map<ceph::os::Transaction> *transactions,
       DoutPrefixProvider *dpp,
       const OSDMapRef &osdmap,
-      bool &first_write_in_interval) final
+      bool &first_write_in_interval,
+      ECOmapJournal &ec_omap_journal) final
   {
     assert(t);
     ECTransaction::generate_transactions(
@@ -266,7 +270,9 @@ struct ECCrimsonOp : ECCommon::RMWPipeline::Op {
       &temp_cleared,
       dpp,
       osdmap,
-      first_write_in_interval);
+      first_write_in_interval,
+      ec_omap_journal,
+      pg_log);
   }
 
   bool skip_transaction(
@@ -302,12 +308,14 @@ ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
                               ceph::os::Transaction&& txn,
                               osd_op_params_t&& osd_op_p,
                               epoch_t min_epoch, epoch_t max_epoch,
-                             std::vector<pg_log_entry_t>&& log_entries)
+                             std::vector<pg_log_entry_t>&& log_entries,
+                              const PGLog &pg_log)
 {
   const hobject_t& hoid = obc->obs.oi.soid;
   logger().debug("{} hoid={} obc->attr_cache={}", __func__, hoid, obc->attr_cache);
   auto op =
-    std::make_unique<ECCrimsonOp>(std::move(txn), std::move(obc), rmw_pipeline);
+    std::make_unique<ECCrimsonOp>(
+      std::move(txn), std::move(obc), pg_log, rmw_pipeline);
   op->hoid = hoid;
   //op->delta_stats = delta_stats;
   op->version = osd_op_p.at_version;
index 8a13cff2ae5f06f440a6b3716263fdfd6cdefa58..cb76e4413c02d53785a52d9a05592f19b66dce05 100644 (file)
@@ -67,12 +67,13 @@ private:
         uint32_t flags) final;
   rep_op_fut_t
   submit_transaction(const std::set<pg_shard_t> &pg_shards,
-                    crimson::osd::ObjectContextRef&& obc,
-                    crimson::osd::ObjectContextRef&& new_clone,
-                    ceph::os::Transaction&& txn,
-                    osd_op_params_t&& req,
-                    epoch_t min_epoch, epoch_t max_epoch,
-                    std::vector<pg_log_entry_t>&& log_entries) final;
+       crimson::osd::ObjectContextRef&& obc,
+       crimson::osd::ObjectContextRef&& new_clone,
+       ceph::os::Transaction&& txn,
+       osd_op_params_t&& req,
+       epoch_t min_epoch, epoch_t max_epoch,
+       std::vector<pg_log_entry_t>&& log_entries,
+       const PGLog &pg_log) final;
   seastar::future<> request_committed(const osd_reqid_t& reqid,
                                       const eversion_t& version) final {
     return seastar::now();
index 09770190f80d8c43cb6a8348312816ea735c825c..d54cee09ae2143125ff9b81db3c68821212bd52d 100644 (file)
@@ -1052,7 +1052,8 @@ PG::submit_transaction(
       std::move(osd_op_p),
       peering_state.get_last_peering_reset(),
       map_epoch,
-      std::move(log_entries));
+      std::move(log_entries),
+      get_log());
   co_return std::make_tuple(
     std::move(submitted),
     std::move(all_completed)
index 1c847b03b8c077dc4e1cc95138c4919995eef9df..e24427a6a0de762360d537d44d0e71703086627e 100644 (file)
@@ -623,6 +623,9 @@ public:
     void trim(const pg_log_entry_t &entry) override {
       // TODO
     }
+    void trim_after_remove(const pg_log_entry_t &entry) override {
+      // TODO
+    }
     void partial_write(pg_info_t *info,
                        eversion_t previous_version,
                        const pg_log_entry_t &entry
index f49e182fca5857275204c6059f3c404d7ac2459d..ceecc354dce2fdd3531cdcf3fce6b564f7377591 100644 (file)
@@ -26,6 +26,7 @@
 
 struct hobject_t;
 struct ECListener;
+class PGLog;
 
 namespace ceph::os {
   class Transaction;
@@ -431,12 +432,13 @@ public:
 
   virtual rep_op_fut_t
   submit_transaction(const std::set<pg_shard_t> &pg_shards,
-                    crimson::osd::ObjectContextRef&& obc,
-                    crimson::osd::ObjectContextRef&& new_clone,
-                    ceph::os::Transaction&& txn,
-                    osd_op_params_t&& osd_op_p,
-                    epoch_t min_epoch, epoch_t max_epoch,
-                    std::vector<pg_log_entry_t>&& log_entries) = 0;
+       crimson::osd::ObjectContextRef&& obc,
+       crimson::osd::ObjectContextRef&& new_clone,
+       ceph::os::Transaction&& txn,
+       osd_op_params_t&& osd_op_p,
+       epoch_t min_epoch, epoch_t max_epoch,
+       std::vector<pg_log_entry_t>&& log_entries,
+       const PGLog &pg_log) = 0;
 
   virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
   virtual seastar::future<> stop() = 0;
index 89feb1e9136c821d133d3b87ace4d97830a41d0d..66bb3034e16f5d6335b98811bf24d048e211c13f 100644 (file)
@@ -101,7 +101,8 @@ ReplicatedBackend::submit_transaction(
   ceph::os::Transaction&& t,
   osd_op_params_t&& opp,
   epoch_t min_epoch, epoch_t map_epoch,
-  std::vector<pg_log_entry_t>&& logv)
+  std::vector<pg_log_entry_t>&& logv,
+  const PGLog &pg_log)
 {
   LOG_PREFIX(ReplicatedBackend::submit_transaction);
   const hobject_t& hoid = obc->obs.oi.soid;
index a726699ac00499d2ea71e9deb439610be5b23f76..c23860f1b10978dedbf23e92a604be454402a64b 100644 (file)
@@ -53,7 +53,8 @@ private:
     ceph::os::Transaction&& txn,
     osd_op_params_t&& osd_op_p,
     epoch_t min_epoch, epoch_t max_epoch,
-    std::vector<pg_log_entry_t>&& log_entries) final;
+    std::vector<pg_log_entry_t>&& log_entries,
+    const PGLog &pg_log) final;
   const pg_t pgid;
   class pending_on_t : public seastar::weakly_referencable<pending_on_t> {
   public:
index 3d9a054af8286fca12e43f0e8cb143195b059fb5..d12ab9d8fc5656c36493cbeb9a0632ddc2b92896 100644 (file)
@@ -422,6 +422,28 @@ void ECBackend::handle_sub_write(
   switcher->clear_temp_objs(op.temp_removed);
   dout(30) << __func__ << " missing before " <<
     get_parent()->get_log().get_missing().get_items() << dendl;
+
+  // Update EC omap journal on non-primary shards from log entries
+  // This ensures the journal has the correct generation info when transactions are applied
+  if (get_parent()->get_pool().supports_omap()) {
+    for (auto &&e: op.log_entries) {
+      if (e.is_delete() || e.is_lost_delete() || e.is_replace() || (e.is_clone() && !e.soid.is_snap())) {
+        if (!op.backfill_or_async_recovery) {
+          ec_omap_journal.append_delete(e.soid, e.version.version, e.is_lost_delete());
+          dout(20) << __func__ << " appending delete to journal: "
+                   << (e.is_clone() ? "clone" : "delete")
+                   << " " << e.soid << " version=" << e.version.version
+                   << " lost_delete=" << e.is_lost_delete() << dendl;
+        } else {
+          dout(20) << __func__ << " skipping journal append_delete during backfill/recovery: "
+                   << (e.is_clone() ? "clone" : "delete")
+                   << " " << e.soid << " version=" << e.version.version
+                   << " lost_delete=" << e.is_lost_delete() << dendl;
+        }
+      }
+    }
+  }
+
   // flag set to true during async recovery
   bool async = false;
   pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
@@ -435,6 +457,16 @@ void ECBackend::handle_sub_write(
       dout(30) << " entry is_delete " << e.is_delete() << dendl;
     }
   }
+
+  dout(20) << __func__ << " log_operation: "
+           << "log_entries.size=" << op.log_entries.size()
+           << " updated_hit_set_history=" << (op.updated_hit_set_history ? "present" : "none")
+           << " trim_to=" << op.trim_to
+           << " roll_forward_to=" << op.pg_committed_to
+           << " pg_committed_to=" << op.pg_committed_to
+           << " transaction_applied=" << !op.backfill_or_async_recovery
+           << " async=" << async
+           << dendl;
   get_parent()->log_operation(
     std::move(op.log_entries),
     op.updated_hit_set_history,
@@ -630,10 +662,11 @@ void ECBackend::handle_sub_read(
         continue;
       }
 
-      int r = switcher->store->omap_get_header(
+      int r = omap_get_header(
         switcher->ch,
         ghobject_t(*i, ghobject_t::NO_GEN, shard),
-        &reply->omap_headers_read[*i], false);
+        &reply->omap_headers_read[*i], false,
+        switcher->store);
       if (r < 0) {
         reply->attrs_read.erase(*i);
         reply->omap_headers_read.erase(*i);
@@ -661,7 +694,7 @@ void ECBackend::handle_sub_read(
       reply->omaps_complete[hoid] = false;
 
       uint64_t available = max_bytes;
-      const auto result = switcher->store->omap_iterate(
+      const auto result = omap_iterate(
         switcher->ch,
         ghobject_t(hoid, ghobject_t::NO_GEN, shard),
         ObjectStore::omap_iter_seek_t{
@@ -684,7 +717,7 @@ void ECBackend::handle_sub_read(
           current_batch.insert(make_pair(key, val_bl));
           available -= std::min(available, num_new_bytes);
           return ObjectStore::omap_iter_ret_t::NEXT;
-        });
+        }, switcher->store);
 
       if (result < 0) {
         reply->attrs_read.erase(hoid);
@@ -1032,6 +1065,7 @@ void ECBackend::check_recovery_sources(const OSDMapRef &osdmap) {
 }
 
 void ECBackend::on_change() {
+  ec_omap_journal.clear_all();
   rmw_pipeline.on_change();
   read_pipeline.on_change();
   rmw_pipeline.on_change2();
@@ -1076,7 +1110,8 @@ struct ECClassicalOp : ECCommon::RMWPipeline::Op {
     shard_id_map<ObjectStore::Transaction> *transactions,
     DoutPrefixProvider *dpp,
     const OSDMapRef &osdmap,
-    bool& first_write_in_interval) final {
+    bool& first_write_in_interval,
+    ECOmapJournal &ec_omap_journal) final {
     ceph_assert(t);
     ECTransaction::generate_transactions(
       t.get(),
@@ -1092,7 +1127,9 @@ struct ECClassicalOp : ECCommon::RMWPipeline::Op {
       &temp_cleared,
       dpp,
       osdmap,
-      first_write_in_interval);
+      first_write_in_interval,
+      ec_omap_journal,
+      pipeline->get_parent()->get_log());
   }
 
   bool skip_transaction(
@@ -1605,6 +1642,10 @@ int ECBackend::omap_iterate (
   const OmapIterFunction &f, ///< [in] function to call for each key/value pair
   ObjectStore *store
 ) {
+  if (!get_parent()->get_pool().supports_omap()) {
+    return -EOPNOTSUPP;
+  }
+
   // Updates in update_map take priority over removed_ranges
   auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj);
 
@@ -1674,6 +1715,10 @@ int ECBackend::omap_get_values(
   std::map<std::string, ceph::buffer::list> *out, ///< [out] returned key/values
   ObjectStore *store
 ) {
+  if (!get_parent()->get_pool().supports_omap()) {
+    return -EOPNOTSUPP;
+  }
+
   auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj);
   
   set<string> keys_still_to_get;
@@ -1702,11 +1747,20 @@ int ECBackend::omap_get_header(
   const bool allow_eio, ///< [in] don't assert on eio
   ObjectStore *store
 ) {
+  if (!get_parent()->get_pool().supports_omap()) {
+    return -EOPNOTSUPP;
+  }
+
   std::optional<ceph::buffer::list> header_from_journal = ec_omap_journal.get_updated_header(oid.hobj);
   if (header_from_journal) {
     *header = *header_from_journal;
+    dout(20) << __func__ << ": oid=" << oid
+            << " from_journal=true header_size=" << header->length() << dendl;
   } else {
+    header->clear();
     store->omap_get_header(c_, oid, header, allow_eio);
+    dout(20) << __func__ << ": oid=" << oid
+            << " from_journal=false header_size=" << header->length() << dendl;
   }
   return 0;
 }
@@ -1718,6 +1772,10 @@ int ECBackend::omap_get(
   std::map<std::string, ceph::buffer::list> *out, /// < [out] Key to value map
   ObjectStore *store
 ) {
+  if (!get_parent()->get_pool().supports_omap()) {
+    return -EOPNOTSUPP;
+  }
+
   // Update map takes priority over removed_ranges
   auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj);
   const auto updated_header = ec_omap_journal.get_updated_header(oid.hobj);
@@ -1730,6 +1788,11 @@ int ECBackend::omap_get(
   // Update header if present
   if (updated_header) {
     *header = *updated_header;
+    dout(20) << __func__ << ": oid=" << oid
+            << " updated_header=true header_size=" << header->length() << dendl;
+  } else {
+    dout(20) << __func__ << ": oid=" << oid
+            << " updated_header=false header_size=" << header->length() << dendl;
   }
 
   // Remove keys in removed_ranges
@@ -1758,6 +1821,10 @@ int ECBackend::omap_check_keys(
   std::set<std::string> *out,         ///< [out] Subset of keys defined on oid
   ObjectStore *store
 ) {
+  if (!get_parent()->get_pool().supports_omap()) {
+    return -EOPNOTSUPP;
+  }
+
   // Update map takes priority over removed_ranges
   auto [update_map, removed_ranges] = ec_omap_journal.get_value_updates(oid.hobj);
   auto updated_header = ec_omap_journal.get_updated_header(oid.hobj);
index bbbbe6e41cd0355ed4d48ab5398139299edfeb28..9982b0ffc0591cb0d0ae722f2409e9a372812a9b 100644 (file)
@@ -887,7 +887,8 @@ void ECCommon::RMWPipeline::cache_ready(Op &op) {
     &trans,
     get_parent()->get_dpp(),
     get_osdmap(),
-    first_write_in_interval);
+    first_write_in_interval,
+    ec_backend.ec_omap_journal);
 
   dout(20) << __func__ << ": written: " << written << ", op: " << op << dendl;
 
@@ -1025,7 +1026,8 @@ struct ECDummyOp final : ECCommon::RMWPipeline::Op {
       shard_id_map<ObjectStore::Transaction> *transactions,
       DoutPrefixProvider *dpp,
       const OSDMapRef &osdmap,
-      bool &first_write_in_interval
+      bool &first_write_in_interval,
+      ECOmapJournal &ec_omap_journal
     ) override {
     // NOP, as -- in contrast to ECClassicalOp -- there is no
     // transaction involved
index 68b45d89a30603be5921e06834b17e4eed4b31bc..1da3390d205a83533460c1b146517e0c56887e6d 100644 (file)
@@ -590,7 +590,8 @@ struct ECCommon {
           shard_id_map<ceph::os::Transaction> *transactions,
           DoutPrefixProvider *dpp,
           const OSDMapRef &osdmap,
-          bool &first_write_in_interval) = 0;
+          bool &first_write_in_interval,
+          ECOmapJournal &ec_omap_journal) = 0;
 
       virtual bool skip_transaction(
           std::set<shard_id_t> &pending_roll_forward,
index fedda8ca49328dd75b75d2ec382388f288ae3488..8368a219040465dfd9ff1d9c56b6ae294011203f 100644 (file)
@@ -22,6 +22,7 @@
 #include "ECUtil.h"
 #include "os/ObjectStore.h"
 #include "common/inline_variant.h"
+#include "PGLog.h"
 
 #ifndef WITH_CRIMSON
 #include "osd/osd_internal_types.h"
@@ -407,6 +408,25 @@ void ECTransaction::Generate::process_init() {
           ghobject_t(oid, ghobject_t::NO_GEN, shard));
       }
 
+      // Check ECOmapJournal first to see if there are pending omap updates
+      // This avoids costly PG log traversal when not necessary
+      if (osdmap->get_pg_pool(pgid.pool())->supports_omap() &&
+          ec_omap_journal.has_omap_updates(cop.source)) {
+        // There are incomplete omap updates which need to be applied to the clone
+        eversion_t can_rollback_to = pg_log.get_can_rollback_to();
+        OmapCloneVisitor omap_visitor(transactions, pgid, cop.source, oid, sinfo, ec_omap_journal, dpp);
+
+        for (auto &log_entry : get_incomplete_ec_omap_log_entries(cop.source, can_rollback_to)) {
+          // Only apply updates after can_rollback_to version
+          if (log_entry->version > can_rollback_to && log_entry->mod_desc.can_rollback()) {
+            log_entry->mod_desc.visit(&omap_visitor);
+          }
+        }
+        
+        // Apply accumulated omap updates to clone
+        omap_visitor.apply_to_clone();
+      }
+
       if (obc) {
         auto cobciter = t.obc_map.find(cop.source);
         ceph_assert(cobciter != t.obc_map.end());
@@ -458,6 +478,193 @@ void alloc_hint(PGTransaction::ObjectOperation& op,
   }
 }
 
+void ECTransaction::accumulate_omap_updates(
+  bool clear_omap,
+  const std::optional<ceph::buffer::list>& header,
+  const std::vector<std::pair<OmapUpdateType, ceph::buffer::list>>& updates,
+  std::optional<ceph::buffer::list>& out_header,
+  std::map<std::string, std::optional<ceph::buffer::list>>& key_updates,
+  std::list<std::pair<std::string, std::optional<std::string>>>& removed_ranges)
+{
+  // Handle clear_omap flag
+  if (clear_omap) {
+    key_updates.clear();
+    removed_ranges.clear();
+    removed_ranges.emplace_back("", std::nullopt);
+    out_header.reset();
+  }
+  
+  // Handle omap header
+  if (header) {
+    out_header = header;
+  }
+  
+  // Decode and accumulate updates
+  for (auto &[type, bl] : updates) {
+    auto p = bl.cbegin();
+    
+    switch (type) {
+      case OmapUpdateType::Insert: {
+        std::map<std::string, ceph::buffer::list> kv_map;
+        decode(kv_map, p);
+        for (auto &[key, value] : kv_map) {
+          key_updates[key] = value;
+        }
+        break;
+      }
+      case OmapUpdateType::Remove: {
+        std::set<std::string> keys_to_remove;
+        decode(keys_to_remove, p);
+        for (auto &key : keys_to_remove) {
+          key_updates[key] = std::nullopt;
+        }
+        break;
+      }
+      case OmapUpdateType::RemoveRange: {
+        std::string range_start, range_end;
+        decode(range_start, p);
+        decode(range_end, p);
+        removed_ranges.emplace_back(range_start, range_end);
+        
+        // Mark keys within range as removed
+        auto map_it = key_updates.lower_bound(range_start);
+        while (map_it != key_updates.end()) {
+          if (map_it->first >= range_end) break;
+          map_it->second = std::nullopt;
+          ++map_it;
+        }
+        break;
+      }
+    }
+  }
+}
+
+void ECTransaction::apply_omap_to_transactions(
+  shard_id_map<ceph::os::Transaction>& transactions,
+  const pg_t& pgid,
+  const hobject_t& target_oid,
+  const ECUtil::stripe_info_t& sinfo,
+  bool clear_omap,
+  const std::optional<ceph::buffer::list>& header,
+  const std::map<std::string, std::optional<ceph::buffer::list>>& key_updates,
+  const std::list<std::pair<std::string, std::optional<std::string>>>& removed_ranges,
+  const DoutPrefixProvider* dpp)
+{
+  for (auto &&[shard, t] : transactions) {
+    // Only primary capable-shards store omap
+    if (sinfo.is_nonprimary_shard(shard)) {
+      continue;
+    }
+    
+    coll_t coll(spg_t(pgid, shard));
+    ghobject_t goid(target_oid, ghobject_t::NO_GEN, shard);
+    
+    // Apply clear_omap if needed
+    if (clear_omap) {
+      t.omap_clear(coll, goid);
+    }
+    
+    // Apply removed ranges
+    for (auto &[range_start, range_end] : removed_ranges) {
+      if (range_end) {
+        t.omap_rmkeyrange(coll, goid, range_start, *range_end);
+      } else {
+        t.omap_rmkeyrange(coll, goid, range_start, "");
+      }
+    }
+    
+    // Apply header update if present
+    if (header) {
+      ldpp_dout(dpp, 20) << __func__ << ": omap_setheader oid="
+                << target_oid << " header_size=" << header->length() << dendl;
+      t.omap_setheader(coll, goid, *header);
+    }
+    
+    // Apply key updates
+    if (!key_updates.empty()) {
+      std::map<std::string, ceph::buffer::list> to_set;
+      std::set<std::string> to_remove;
+      
+      for (auto &[key, val] : key_updates) {
+        if (val) {
+          to_set[key] = *val;
+        } else {
+          to_remove.insert(key);
+        }
+      }
+      
+      if (!to_set.empty()) {
+        t.omap_setkeys(coll, goid, to_set);
+      }
+      if (!to_remove.empty()) {
+        t.omap_rmkeys(coll, goid, to_remove);
+      }
+    }
+  }
+}
+
+
+void ECTransaction::OmapCloneVisitor::ec_omap(
+  bool clear_omap,
+  std::optional<ceph::buffer::list> header,
+  std::vector<std::pair<OmapUpdateType, ceph::buffer::list>> &updates) {
+  
+  ldpp_dout(dpp, 20) << __func__ << ": src=" << source_oid
+          << " dest=" << dest_oid << " clear_omap=" << clear_omap
+          << " header_size=" << (header ? header->length() : 0) << dendl;
+  
+  accumulate_omap_updates(
+    clear_omap,
+    header,
+    updates,
+    omap_header,
+    omap_updates,
+    removed_ranges);
+  
+  if (clear_omap) {
+    has_clear_omap = true;
+  }
+}
+
+void ECTransaction::OmapCloneVisitor::apply_to_clone() {
+  apply_omap_to_transactions(
+    transactions,
+    pgid,
+    dest_oid,
+    sinfo,
+    has_clear_omap,
+    omap_header,
+    omap_updates,
+    removed_ranges,
+    dpp);
+}
+
+void ECTransaction::Generate::apply_omap_updates_without_journal() {
+  std::map<std::string, std::optional<ceph::buffer::list>> key_updates;
+  std::list<std::pair<std::string, std::optional<std::string>>> removed_ranges;
+  std::optional<ceph::buffer::list> header_out;
+  
+  accumulate_omap_updates(
+    op.clear_omap,
+    op.omap_header,
+    op.omap_updates,
+    header_out,
+    key_updates,
+    removed_ranges);
+  
+  apply_omap_to_transactions(
+    transactions,
+    pgid,
+    oid,
+    sinfo,
+    op.clear_omap,
+    header_out,
+    key_updates,
+    removed_ranges,
+    dpp);
+}
+
+
 ECTransaction::Generate::Generate(PGTransaction &t,
     ErasureCodeInterfaceRef &ec_impl,
     pg_t &pgid,
@@ -471,7 +678,9 @@ ECTransaction::Generate::Generate(PGTransaction &t,
     WritePlanObj &plan,
     DoutPrefixProvider *dpp,
     pg_log_entry_t *entry,
-    bool &first_write_in_interval)
+    bool &first_write_in_interval,
+    ECOmapJournal &ec_omap_journal,
+    const PGLog &pg_log)
   : t(t),
     ec_impl(ec_impl),
     pgid(pgid),
@@ -484,7 +693,9 @@ ECTransaction::Generate::Generate(PGTransaction &t,
     op(op),
     plan(plan),
     read_sem(&sinfo),
-    to_write(&sinfo) {
+    to_write(&sinfo),
+    ec_omap_journal(ec_omap_journal),
+    pg_log(pg_log) {
   ldpp_dout(dpp, 20) << __func__ << ": " << oid
                     << " partial_extents=" << partial_extents
                     << " written_map=" << *written_map
@@ -507,7 +718,7 @@ ECTransaction::Generate::Generate(PGTransaction &t,
     ceph_assert(oid.is_temp());
   }
 
-  if (entry && entry->is_modify() && op.updated_snaps) {
+  if (entry && (entry->is_modify() || entry->is_replace()) && op.updated_snaps) {
     bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
     encode(op.updated_snaps->second, bl);
     entry->snaps.swap(bl);
@@ -535,13 +746,13 @@ ECTransaction::Generate::Generate(PGTransaction &t,
 
   if (op.is_fresh_object() && entry) {
     entry->mod_desc.create();
+    if (osdmap->get_pg_pool(pgid.pool())->supports_omap()) {
+      ec_omap_journal.append_create(plan.hoid);
+    }
   }
 
   process_init();
 
-  // omap not supported (except 0, handled above)
-  ceph_assert(!(op.clear_omap) && !(op.omap_header) && op.omap_updates.empty());
-
   if (op.alloc_hint) {
     all_shards_written();
     alloc_hint(op, transactions, pgid, oid, sinfo);
@@ -579,15 +790,32 @@ ECTransaction::Generate::Generate(PGTransaction &t,
   // we want to update OI on all shards
   bool size_change = plan.orig_size != plan.projected_size;
   bool clear_whiteout = false;
+  bool create_whiteout = false;
 
-  // If we are updating the OI and we have a cache of the previous OI values
-  if (op.attr_updates.contains(OI_ATTR) && obc && obc->attr_cache.contains(OI_ATTR))
-  {
-    object_info_t oi_cache((obc->attr_cache[OI_ATTR]));
-    if (oi_cache.test_flag(object_info_t::FLAG_WHITEOUT))
-    {
-      object_info_t oi_updates(*(op.attr_updates[OI_ATTR]));
-      clear_whiteout = !oi_updates.test_flag(object_info_t::FLAG_WHITEOUT);
+  if (op.attr_updates.contains(OI_ATTR)) {
+    bufferlist &bl = op.attr_updates.find(OI_ATTR)->second.value();
+    auto p = bl.cbegin();
+    object_info_t new_oi;
+    decode(new_oi, p);
+
+    if (new_oi.is_whiteout()) {
+      create_whiteout = true;
+    }
+
+    if (obc && obc->attr_cache.contains(OI_ATTR)) {
+      object_info_t oi_cache((obc->attr_cache[OI_ATTR]));
+      if (oi_cache.test_flag(object_info_t::FLAG_WHITEOUT))
+      {
+        object_info_t oi_updates(*(op.attr_updates[OI_ATTR]));
+        clear_whiteout = !oi_updates.test_flag(object_info_t::FLAG_WHITEOUT);
+      }
+    }
+  }
+
+  if (create_whiteout) {
+    ldpp_dout(dpp, 10) << __func__ << " detecting whiteout creation for " << oid << dendl;
+    if (osdmap->get_pg_pool(pgid.pool())->supports_omap()) {
+      ec_omap_journal.append_whiteout(plan.hoid);
     }
   }
 
@@ -606,6 +834,20 @@ ECTransaction::Generate::Generate(PGTransaction &t,
     attr_updates();
   }
 
+  if (!op.omap_updates.empty() || op.clear_omap || op.omap_header) {
+    ceph_assert(osdmap->get_pg_pool(pgid.pool())->supports_omap());
+    if (entry) {
+      ECOmapJournalEntry new_entry(entry->version, op.clear_omap, op.omap_header, op.omap_updates);
+      entry->mod_desc.ec_omap(
+        op.clear_omap,
+        op.omap_header,
+        op.omap_updates);
+      ec_omap_journal.add_entry(plan.hoid, new_entry);
+    } else {
+      apply_omap_updates_without_journal();
+    }
+  }
+
   if (!entry) {
     return;
   }
@@ -1016,7 +1258,9 @@ void ECTransaction::generate_transactions(
     set<hobject_t> *temp_removed,
     DoutPrefixProvider *dpp,
     const OSDMapRef &osdmap,
-    bool &first_write_in_interval) {
+    bool &first_write_in_interval,
+    ECOmapJournal &ec_omap_journal,
+    const PGLog &pg_log) {
   ceph_assert(written_map);
   ceph_assert(transactions);
   ceph_assert(temp_added);
@@ -1049,8 +1293,50 @@ void ECTransaction::generate_transactions(
       ceph_assert(plan.hoid == oid);
 
       Generate generate(t, ec_impl, pgid, sinfo, partial_extents, written_map,
-        *transactions, osdmap, oid, op, plan, dpp, entry, first_write_in_interval);
+        *transactions, osdmap, oid, op, plan, dpp, entry,
+        first_write_in_interval, ec_omap_journal, pg_log);
 
       plans.plans.pop_front();
   });
 }
+
+std::vector<const pg_log_entry_t*> ECTransaction::Generate::get_incomplete_ec_omap_log_entries(
+  const hobject_t &hoid,
+  eversion_t can_rollback_to) {
+  
+  std::vector<const pg_log_entry_t*> result;
+  
+  // Helper visitor to check if a log entry has EC omap updates
+  struct OmapChecker : public ObjectModDesc::Visitor {
+    bool has_omap = false;
+    
+    void ec_omap(
+      bool clear_omap,
+      std::optional<ceph::buffer::list> header,
+      std::vector<std::pair<OmapUpdateType, ceph::buffer::list>> &updates) override {
+      has_omap = true;
+    }
+  };
+  
+  // Iterate through the PG log to find incomplete entries for this object
+  for (const auto &entry : pg_log.get_log().log) {
+    // Filter by object, version, and rollback capability
+    if (entry.soid == hoid &&
+        entry.version > can_rollback_to &&
+        entry.mod_desc.can_rollback()) {
+      
+      // Check if this entry has EC omap updates
+      OmapChecker checker;
+      entry.mod_desc.visit(&checker);
+      if (checker.has_omap) {
+        result.push_back(&entry);
+      }
+    }
+  }
+  
+  ldpp_dout(dpp, 20) << __func__ << ": found " << result.size()
+                     << " incomplete log entries with EC omap updates for " << hoid
+                     << " after version " << can_rollback_to << dendl;
+  
+  return result;
+}
index 9e633fde5eecfea0ae41395f794fdbbc4d450c77..7a2f24ee80dce64519c16c56965e82dff1d9b2f9 100644 (file)
@@ -22,6 +22,9 @@
 #include "os/Transaction.h"
 #include "OSDMap.h"
 #include "PGTransaction.h"
+#include "osd/ECOmapJournal.h"
+
+class PGLog;
 
 namespace ECTransaction {
 class WritePlanObj {
@@ -77,6 +80,85 @@ struct WritePlan {
   }
 };
 
+/**
+ * Decode and accumulate omap updates from encoded operation list.
+ * Handles Insert, Remove, and RemoveRange operations.
+ */
+void accumulate_omap_updates(
+  bool clear_omap,
+  const std::optional<ceph::buffer::list>& header,
+  const std::vector<std::pair<OmapUpdateType, ceph::buffer::list>>& updates,
+  std::optional<ceph::buffer::list>& out_header,
+  std::map<std::string, std::optional<ceph::buffer::list>>& key_updates,
+  std::list<std::pair<std::string, std::optional<std::string>>>& removed_ranges);
+
+/**
+ * Apply accumulated omap updates to primary-capable shard transactions.
+ */
+void apply_omap_to_transactions(
+  shard_id_map<ceph::os::Transaction>& transactions,
+  const pg_t& pgid,
+  const hobject_t& target_oid,
+  const ECUtil::stripe_info_t& sinfo,
+  bool clear_omap,
+  const std::optional<ceph::buffer::list>& header,
+  const std::map<std::string, std::optional<ceph::buffer::list>>& key_updates,
+  const std::list<std::pair<std::string, std::optional<std::string>>>& removed_ranges,
+  const DoutPrefixProvider* dpp);
+
+
+/**
+ * OmapCloneVisitor - Visitor to extract and apply omap updates to clone transactions
+ *
+ * This visitor implements ObjectModDesc::Visitor to traverse PG log entries and
+ * accumulate omap updates (key-value pairs, range removals, header changes) that
+ * need to be applied to a cloned object. It ensures that incomplete omap updates
+ * from the PG log are properly transferred to the clone.
+ */
+class OmapCloneVisitor : public ObjectModDesc::Visitor {
+private:
+  shard_id_map<ceph::os::Transaction> &transactions;
+  const pg_t &pgid;
+  const hobject_t &source_oid;
+  const hobject_t &dest_oid;
+  const ECUtil::stripe_info_t &sinfo;
+  ECOmapJournal &ec_omap_journal;
+  const DoutPrefixProvider *dpp;
+  
+  // Accumulated omap state
+  bool has_clear_omap = false;
+  std::optional<ceph::buffer::list> omap_header;
+  std::map<std::string, std::optional<ceph::buffer::list>> omap_updates;
+  std::list<std::pair<std::string, std::optional<std::string>>> removed_ranges;
+
+public:
+  OmapCloneVisitor(
+    shard_id_map<ceph::os::Transaction> &txns,
+    const pg_t &pg,
+    const hobject_t &src,
+    const hobject_t &dst,
+    const ECUtil::stripe_info_t &stripe_info,
+    ECOmapJournal &journal,
+    const DoutPrefixProvider *dpp)
+    : transactions(txns), pgid(pg), source_oid(src), dest_oid(dst),
+      sinfo(stripe_info), ec_omap_journal(journal), dpp(dpp) {}
+
+  /**
+   * Called by ObjectModDesc::visit() when an ec_omap modification is encountered
+   * Accumulates omap updates from the PG log entry
+   */
+  void ec_omap(
+    bool clear_omap,
+    std::optional<ceph::buffer::list> header,
+    std::vector<std::pair<OmapUpdateType, ceph::buffer::list>> &updates) override;
+  
+  /**
+   * Apply accumulated omap updates to the clone transaction
+   * This should be called after visiting all relevant log entries
+   */
+  void apply_to_clone();
+};
+
 class Generate {
   PGTransaction &t;
   const ErasureCodeInterfaceRef &ec_impl;
@@ -97,6 +179,8 @@ class Generate {
   std::vector<shard_id_set> rollback_shards;
   uint32_t fadvise_flags = 0;
   bool written_shards_final{false};
+  ECOmapJournal &ec_omap_journal;
+  const PGLog &pg_log;
 
   void all_shards_written();
   void shard_written(const shard_id_t shard);
@@ -110,6 +194,15 @@ class Generate {
   void appends_and_clone_ranges();
   void written_shards();
   void attr_updates();
+  std::vector<const pg_log_entry_t*> get_incomplete_ec_omap_log_entries(
+    const hobject_t &hoid,
+    eversion_t can_rollback_to);
+  /**
+   * Apply omap updates directly to transactions without journaling.
+   * Should only be called when entry is null (temporary/non-journaled operations).
+   * For journaled operations, use entry->mod_desc.ec_omap() instead.
+   */
+  void apply_omap_updates_without_journal();
 
  public:
   Generate(PGTransaction &t,
@@ -123,7 +216,9 @@ class Generate {
     WritePlanObj &plan,
     DoutPrefixProvider *dpp,
     pg_log_entry_t *entry,
-    bool &first_write_in_interval);
+    bool &first_write_in_interval,
+    ECOmapJournal &ec_omap_journal,
+    const PGLog &pg_log);
 };
 
 void generate_transactions(
@@ -140,6 +235,8 @@ void generate_transactions(
     std::set<hobject_t> *temp_removed,
     DoutPrefixProvider *dpp,
     const OSDMapRef &osdmap,
-    bool &first_write_in_interval
+    bool &first_write_in_interval,
+    ECOmapJournal &ec_omap_journal,
+    const PGLog &pg_log
   );
 }
index 10f4a5d8827bbf68eeed6be4854686fe904be2cc..5e8170d5fa125ba23f58ba6fc5dd349bbc44d065 100644 (file)
@@ -162,12 +162,12 @@ void ECTransactionL::generate_transactions(
       }
 
       if (entry &&
-         entry->is_modify() &&
-         op.updated_snaps) {
-       bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
-       encode(op.updated_snaps->second, bl);
-       entry->snaps.swap(bl);
-       entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
+          (entry->is_modify() || entry->is_replace()) &&
+          op.updated_snaps) {
+        bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
+        encode(op.updated_snaps->second, bl);
+        entry->snaps.swap(bl);
+        entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
       }
 
       ldpp_dout(dpp, 20) << "generate_transactions: "
index 4432ccaf8fcde127038dea0b9fea8ade21482dd9..df99e42d1707bd3a0d7a0c9f175e7c07dcaf3d70 100644 (file)
@@ -1144,6 +1144,9 @@ protected:
     void trim(const pg_log_entry_t &entry) override {
       pg->get_pgbackend()->trim(entry, t);
     }
+    void trim_after_remove(const pg_log_entry_t &entry) override {
+      pg->get_pgbackend()->trim_after_remove(entry, t);
+    }
     void partial_write(pg_info_t *info, eversion_t previous_version,
                        const pg_log_entry_t &entry
       ) override {
index 5be44f75db619f17c1576b03748632cd2f2c90c7..053836c7e3365bb0dd01d1a8c14edd2ed98289e7 100644 (file)
@@ -337,12 +337,12 @@ void PGBackend::rollback(
   t->append(vis.t);
 }
 
-struct Trimmer : public ObjectModDesc::Visitor {
+struct TrimmerPostRemove : public ObjectModDesc::Visitor {
   const hobject_t &soid;
   PGBackend *pg;
   ObjectStore::Transaction *t;
   const pg_log_entry_t &entry;
-  Trimmer(
+  TrimmerPostRemove(
     PGBackend *pg,
     ObjectStore::Transaction *t,
     const pg_log_entry_t &entry)
@@ -352,6 +352,11 @@ struct Trimmer : public ObjectModDesc::Visitor {
       soid,
       old_version,
       t);
+
+    if (pg->get_parent()->get_pool().allows_ecoptimizations()
+        && pg->get_parent()->get_pool().supports_omap()) {
+      pg->omap_trim_delete_from_journal(soid, old_version);
+    }
   }
   // try_rmobject defaults to rmobject
   void rollback_extents(
@@ -378,9 +383,19 @@ struct Trimmer : public ObjectModDesc::Visitor {
       }
     }
   }
+};
 
-  void ec_omap(bool clear_omap, std::optional<ceph::buffer::list> omap_header, 
-    std::vector<std::pair<OmapUpdateType, ceph::buffer::list>> &omap_updates) override {
+struct Trimmer : TrimmerPostRemove {
+  Trimmer(
+    PGBackend *pg,
+    ObjectStore::Transaction *t,
+    const pg_log_entry_t &entry)
+    : TrimmerPostRemove(pg, t, entry) {}
+  void ec_omap(bool clear_omap, std::optional<ceph::buffer::list> omap_header,
+    std::vector<std::pair<OmapUpdateType, ceph::buffer::list>> &omap_updates) override
+  {
+    ceph_assert(pg->get_parent()->get_pool().allows_ecoptimizations());
+    ceph_assert(pg->get_parent()->get_pool().supports_omap());
 
     auto shard = pg->get_parent()->whoami_shard().shard;
     spg_t spg = pg->get_parent()->whoami_spg_t();
@@ -476,6 +491,16 @@ void PGBackend::trim(
   entry.mod_desc.visit(&trimmer);
 }
 
+void PGBackend::trim_after_remove(
+  const pg_log_entry_t &entry,
+  ObjectStore::Transaction *t)
+{
+  if (!entry.can_rollback())
+    return;
+  TrimmerPostRemove trimmer(this, t, entry);
+  entry.mod_desc.visit(&trimmer);
+}
+
 void PGBackend::try_stash(
   const hobject_t &hoid,
   version_t v,
index 57cc018eaa7b80fdd20802e428012b11a1496b5e..985ddac00a587de866b6dd636e292c0ca980a2f7 100644 (file)
@@ -552,6 +552,10 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
      const pg_log_entry_t &entry,
      ObjectStore::Transaction *t);
 
+   void trim_after_remove(
+     const pg_log_entry_t &entry,
+     ObjectStore::Transaction *t);
+
    void partial_write(
      pg_info_t *info,
      eversion_t previous_version,
index c31c17f899a12c544c93d21eb0dd7e679bd3f4b8..74f182e06b8447f601bdd23a1064bc71c4980257 100644 (file)
@@ -147,6 +147,8 @@ struct PGLog : DoutPrefixProvider {
       const pg_log_entry_t &entry) = 0;
     virtual void trim(
       const pg_log_entry_t &entry) = 0;
+    virtual void trim_after_remove(
+      const pg_log_entry_t &entry) = 0;
     virtual void remove(
       const hobject_t &hoid) = 0;
     virtual void try_stash(
@@ -1226,7 +1228,7 @@ protected:
           rollbacker->remove(hoid);
         }
         for (auto &&i: entries) {
-          rollbacker->trim(i);
+          rollbacker->trim_after_remove(i);
         }
       }
       return;
@@ -1247,7 +1249,7 @@ protected:
           rollbacker->remove(hoid);
         }
         for (auto &&i: entries) {
-          rollbacker->trim(i);
+          rollbacker->trim_after_remove(i);
         }
       }
       return;
@@ -1278,7 +1280,7 @@ protected:
       }
       if (rollbacker) {
         for (auto &&i: entries) {
-          rollbacker->trim(i);
+          rollbacker->trim_after_remove(i);
         }
       }
       return;
@@ -1324,7 +1326,7 @@ protected:
         if (!object_not_in_store)
           rollbacker->remove(hoid);
         for (auto &&i: entries) {
-          rollbacker->trim(i);
+          rollbacker->trim_after_remove(i);
         }
       }
       missing.add(hoid, prior_version, eversion_t(), false);
index 24deee836d42c64ba4ec4ec48bde7665d952194d..58b661ce856850ba3c5d3163954900e1d2ace6a1 100644 (file)
@@ -7893,8 +7893,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        uint32_t num = 0;
        bool truncated = false;
        if (oi.is_omap()) {
-          const auto result = osd->store->omap_iterate(
-            ch, ghobject_t(soid),
+          const auto result = get_pgbackend()->omap_iterate(
+            ch, ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard),
             ObjectStore::omap_iter_seek_t{
               .seek_position = start_after,
               .seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND
@@ -7949,8 +7949,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        bufferlist bl;
        if (oi.is_omap()) {
          using omap_iter_seek_t = ObjectStore::omap_iter_seek_t;
-         const auto result = osd->store->omap_iterate(
-           ch, ghobject_t(soid),
+         const auto result = get_pgbackend()->omap_iterate(
+           ch, ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard),
            // try to seek as many keys-at-once as possible for the sake of performance.
            // note complexity should be logarithmic, so seek(n/2) + seek(n/2) is worse
            // than just seek(n).
@@ -7994,7 +7994,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       ++ctx->num_read;
       {
-       osd->store->omap_get_header(ch, ghobject_t(soid), &osd_op.outdata);
+       get_pgbackend()->omap_get_header(ch,
+         ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard),
+         &osd_op.outdata, false);
        ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
        ctx->delta_stats.num_rd++;
       }
@@ -8015,7 +8017,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        tracepoint(osd, do_osd_op_pre_omapgetvalsbykeys, soid.oid.name.c_str(), soid.snap.val, list_entries(keys_to_get).c_str());
        map<string, bufferlist> out;
        if (oi.is_omap()) {
-         osd->store->omap_get_values(ch, ghobject_t(soid), keys_to_get, &out);
+         get_pgbackend()->omap_get_values(ch,
+           ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard), keys_to_get, &out);
        } // else return empty omap entries
        encode(out, osd_op.outdata);
        ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
@@ -8050,8 +8053,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
               i != assertions.end();
               ++i)
            to_get.insert(i->first);
-         int r = osd->store->omap_get_values(ch, ghobject_t(soid),
-                                             to_get, &out);
+         int r = get_pgbackend()->omap_get_values(ch,
+           ghobject_t(soid, ghobject_t::NO_GEN, whoami_shard().shard),
+           to_get, &out);
          if (r < 0) {
            result = r;
            break;
@@ -8148,10 +8152,12 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       ++ctx->num_write;
       result = 0;
       {
-       maybe_create_new_object(ctx);
-       t->omap_setheader(soid, osd_op.indata);
-       ctx->clean_regions.mark_omap_dirty();
-       ctx->delta_stats.num_wr++;
+         maybe_create_new_object(ctx);
+         dout(20) << __func__ << ": omap_setheader soid=" << soid
+                 << " header_size=" << osd_op.indata.length() << dendl;
+         t->omap_setheader(soid, osd_op.indata);
+         ctx->clean_regions.mark_omap_dirty();
+         ctx->delta_stats.num_wr++;
       }
       obs.oi.set_flag(object_info_t::FLAG_OMAP);
       obs.oi.clear_omap_digest();
@@ -8171,6 +8177,8 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        }
        if (oi.is_omap()) {
+         dout(20) << __func__ << ": omap_clear soid=" << soid
+                 << " (header will be cleared)" << dendl;
          t->omap_clear(soid);
          ctx->clean_regions.mark_omap_dirty();
          ctx->delta_stats.num_wr++;
@@ -8319,6 +8327,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            result = -EINVAL;
            break;
          }
+         dout(20) << __func__ << " COPY_FROM operation starting on OSD "
+                  << osd->whoami << " dest=" << soid << " src=" << src
+                  << " src_version=" << src_version << dendl;
          CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op);
          if (have_truncate)
            cb->set_truncate(truncate_seq, truncate_size);
@@ -9181,10 +9192,15 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   if (soid.snap == CEPH_NOSNAP)
     make_writeable(ctx);
 
-  finish_ctx(ctx,
-            ctx->new_obs.exists ? pg_log_entry_t::MODIFY :
-            pg_log_entry_t::DELETE,
-            result);
+  int log_op_type;
+  if (ctx->use_replace_op) {
+    log_op_type = pg_log_entry_t::REPLACE;
+  } else {
+    log_op_type = ctx->new_obs.exists ? pg_log_entry_t::MODIFY :
+                  pg_log_entry_t::DELETE;
+  }
+
+  finish_ctx(ctx, log_op_type, result);
 
   return result;
 }
@@ -9543,47 +9559,79 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp,
 
   // omap
   uint32_t omap_keys = 0;
+  dout(20) << __func__ << " omap section start - cursor.omap_offset='" << cursor.omap_offset
+           << "' left=" << left << " cursor.omap_complete=" << cursor.omap_complete
+           << " supports_omap=" << pool.info.supports_omap()
+           << " is_omap=" << oi.is_omap() << dendl;
   if (!pool.info.supports_omap() || !oi.is_omap()) {
     cursor.omap_complete = true;
+    dout(20) << __func__ << " omap_complete set to true (pool doesn't support omap or object has no omap)" << dendl;
   } else {
     if (left > 0 && !cursor.omap_complete) {
       ceph_assert(cursor.data_complete);
       if (cursor.omap_offset.empty()) {
-       osd->store->omap_get_header(ch, ghobject_t(oi.soid),
-                                   &reply_obj.omap_header);
+        dout(20) << __func__ << " reading omap header (cursor.omap_offset is empty)" << dendl;
+        int r = get_pgbackend()->omap_get_header(ch,
+          ghobject_t(oi.soid, ghobject_t::NO_GEN, whoami_shard().shard),
+          &reply_obj.omap_header, false);
+        if (r < 0) {
+          ceph_abort();
+        }
+        dout(20) << __func__ << " omap_get_header result=" << r
+                 << " header_length=" << reply_obj.omap_header.length() << dendl;
+      } else {
+        dout(20) << __func__ << " NOT reading omap header (cursor.omap_offset='"
+                 << cursor.omap_offset << "' is not empty)" << dendl;
       }
       bufferlist omap_data;
-      const auto result = osd->store->omap_iterate(
-        ch, ghobject_t(oi.soid),
+      dout(20) << __func__ << " calling omap_iterate with cursor.omap_offset='"
+               << cursor.omap_offset << "'" << dendl;
+      const auto result = get_pgbackend()->omap_iterate(
+        ch, ghobject_t(oi.soid, ghobject_t::NO_GEN, whoami_shard().shard),
         ObjectStore::omap_iter_seek_t{
           .seek_position = cursor.omap_offset,
           .seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND
         },
         [&omap_data, &omap_keys, &left, &cursor]
         (std::string_view key, std::string_view value) mutable {
-         ++omap_keys;
-         encode(key, omap_data);
-         encode(value, omap_data);
-         left -= key.length() + 4 + value.length() + 4;
-         if (left <= 0) {
-           cursor.omap_offset = key;
+          ++omap_keys;
+          encode(key, omap_data);
+          encode(value, omap_data);
+          left -= key.length() + 4 + value.length() + 4;
+          if (left <= 0) {
+            cursor.omap_offset = key;
             return ObjectStore::omap_iter_ret_t::STOP;
-         }
+          }
           return ObjectStore::omap_iter_ret_t::NEXT;
         });
+      dout(20) << __func__ << " omap_iterate result=" << result << " omap_keys=" << omap_keys << dendl;
       if (result < 0) {
-       ceph_abort();
+        ceph_abort();
       } else if (const auto more = static_cast<bool>(result); !more) {
-       cursor.omap_complete = true;
-       dout(20) << " got omap" << dendl;
+        cursor.omap_complete = true;
+        dout(20) << __func__ << " omap_complete set to true (no more keys, omap_iterate returned !more)" << dendl;
+        dout(20) << " got omap" << dendl;
+      } else {
+        dout(20) << __func__ << " omap NOT complete (more keys available, stopped due to left=" << left << ")" << dendl;
       }
       if (omap_keys) {
-       encode(omap_keys, reply_obj.omap_data);
-       reply_obj.omap_data.claim_append(omap_data);
+        encode(omap_keys, reply_obj.omap_data);
+        reply_obj.omap_data.claim_append(omap_data);
+        dout(20) << __func__ << " encoded " << omap_keys << " omap keys, total omap_data length="
+                 << reply_obj.omap_data.length() << dendl;
+      } else {
+        dout(20) << __func__ << " no omap keys read in this iteration" << dendl;
       }
+    } else {
+      dout(20) << __func__ << " skipping omap read (left=" << left
+               << " cursor.omap_complete=" << cursor.omap_complete << ")" << dendl;
     }
   }
 
+  dout(20) << __func__ << " omap iteration summary: omap_keys_read="
+           << omap_keys << " cursor.omap_complete=" << cursor.omap_complete
+           << " cursor.omap_offset='" << cursor.omap_offset << "'" << dendl;
+
   if (cursor.is_complete()) {
     // include reqids only in the final step.  this is a bit fragile
     // but it works...
@@ -10222,17 +10270,21 @@ void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
   if (pool.info.supports_omap()) {
     if (!cop->temp_cursor.omap_complete) {
       if (cop->omap_header.length()) {
-       t->omap_setheader(
-         cop->results.temp_oid,
-         cop->omap_header);
-       cop->omap_header.clear();
+        dout(20) << __func__ << " writing omap_header to transaction, length="
+                 << cop->omap_header.length() << " temp_oid=" << cop->results.temp_oid << dendl;
+        t->omap_setheader(
+          cop->results.temp_oid,
+          cop->omap_header);
+        cop->omap_header.clear();
       }
       if (cop->omap_data.length()) {
-       map<string,bufferlist> omap;
-       bufferlist::const_iterator p = cop->omap_data.begin();
-       decode(omap, p);
-       t->omap_setkeys(cop->results.temp_oid, omap);
-       cop->omap_data.clear();
+        map<string,bufferlist> omap;
+        bufferlist::const_iterator p = cop->omap_data.begin();
+        decode(omap, p);
+        dout(20) << __func__ << " writing omap_data to transaction, "
+                 << omap.size() << " keys, temp_oid=" << cop->results.temp_oid << dendl;
+        t->omap_setkeys(cop->results.temp_oid, omap);
+        cop->omap_data.clear();
       }
     }
   } else {
@@ -10251,6 +10303,7 @@ void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
   if (obs.exists) {
     dout(20) << __func__ << ": exists, removing" << dendl;
     ctx->op_t->remove(obs.oi.soid);
+    ctx->use_replace_op = true;
   } else {
     ctx->delta_stats.num_objects++;
     obs.exists = true;
index d094d4f60ad99b5411471484913b21cc1ed86aab..bff6b0b372de9c97cb3c8463536360c526cd24ef 100644 (file)
@@ -697,6 +697,7 @@ public:
     bool ignore_cache;    ///< true if IGNORE_CACHE flag is std::set
     bool ignore_log_op_stats;  // don't log op stats
     bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
+    bool use_replace_op = false;  ///< use REPLACE op type instead of MODIFY/DELETE (set by finish_copyfrom)
     ObjectCleanRegions clean_regions;
 
     // side effects
index aa1e557667e39e3b7e68f18cda75263b1cfcb111..615dc3a19a3b35b12879821fcb8fe524fa5a30f0 100644 (file)
@@ -855,7 +855,7 @@ void SnapMapper::update_snap_map(
         i.soid,
         _snaps,
         _t);
-    } else if (i.is_modify()) {
+    } else if (i.is_modify() || i.is_replace()) {
       int r = update_snaps(
         i.soid,
         _snaps,
index d0b82e6509cfe55b78a5713d4be26bb44d4b9005..237645fbd4c833993067504c53961df42549a8a3 100644 (file)
@@ -4772,6 +4772,16 @@ void ObjectModDesc::visit(Visitor *visitor) const
        visitor->rollback_extents(gen, extents, object_size, shards);
        break;
       }
+      case EC_OMAP: {
+        bool clear_omap;
+        std::optional<ceph::buffer::list> omap_header;
+        std::vector<std::pair<OmapUpdateType, ceph::buffer::list>> omap_updates;
+        decode(clear_omap, bp);
+        decode(omap_header, bp);
+        decode(omap_updates, bp);
+        visitor->ec_omap(clear_omap, omap_header, omap_updates);
+        break;
+      }
       default:
        ceph_abort_msg("Invalid rollback code");
       }
index 55d533be8c8db0a935a194c4aeced91a0cf4855e..7b30617a84e6e4288c3461095504432660bd4fe9 100644 (file)
@@ -4533,6 +4533,7 @@ struct pg_log_entry_t {
     PROMOTE = 8,     // promoted object from another tier
     CLEAN = 9,       // mark an object clean
     ERROR = 10,      // write that returned an error
+    REPLACE = 11,    // replace (delete + recreate) operation
   };
   static const char *get_op_name(int op) {
     switch (op) {
@@ -4554,6 +4555,8 @@ struct pg_log_entry_t {
       return "clean";
     case ERROR:
       return "error";
+    case REPLACE:
+      return "replace";
     default:
       return "unknown";
     }
@@ -4610,11 +4613,12 @@ struct pg_log_entry_t {
   bool is_lost_delete() const { return op == LOST_DELETE; }
   bool is_lost_mark() const { return op == LOST_MARK; }
   bool is_error() const { return op == ERROR; }
+  bool is_replace() const { return op == REPLACE; }
 
   bool is_update() const {
     return
       is_clone() || is_modify() || is_promote() || is_clean() ||
-      is_lost_revert() || is_lost_mark();
+      is_lost_revert() || is_lost_mark() || is_replace();
   }
   bool is_delete() const {
     return op == DELETE || op == LOST_DELETE;
@@ -4642,7 +4646,7 @@ struct pg_log_entry_t {
 
   bool reqid_is_indexed() const {
     return reqid != osd_reqid_t() &&
-      (op == MODIFY || op == DELETE || op == ERROR);
+      (op == MODIFY || op == DELETE || op == ERROR || op == REPLACE);
   }
 
   void set_op_returns(const std::vector<OSDOp>& ops) {
index 79651d9870cffadc2bb313da47ef910ed5698853..43d161d882accd07fb782fbaa730fe1563458792 100644 (file)
@@ -54,6 +54,10 @@ class MockPGLogEntryHandler : public PGLog::LogEntryHandler {
     lgeneric_dout(g_ceph_context, 0) << "MockPGLogEntryHandler::trim " << entry << dendl;
     backend->trim(entry, t);
   }
+  void trim_after_remove(const pg_log_entry_t &entry) override {
+    lgeneric_dout(g_ceph_context, 0) << "MockPGLogEntryHandler::trim_after_remove " << entry << dendl;
+    backend->trim_after_remove(entry, t);
+  }
   void partial_write(pg_info_t *info, eversion_t previous_version,
                       const pg_log_entry_t &entry
     ) override {
index 946c3c0de6d4c0f2507ef091384fca5d6475c554..26076c4d303e0f9fb58afc791d89bc8a9a0239d1 100644 (file)
@@ -244,6 +244,8 @@ public:
     }
     void trim(
       const pg_log_entry_t &entry) override {}
+    void trim_after_remove(
+      const pg_log_entry_t &entry) override {}
     void partial_write(
         pg_info_t *info,
         eversion_t previous_version,
@@ -373,6 +375,8 @@ struct TestHandler : public PGLog::LogEntryHandler {
   }
   void trim(
     const pg_log_entry_t &entry) override {}
+  void trim_after_remove(
+    const pg_log_entry_t &entry) override {}
   void partial_write(
       pg_info_t *info,
       eversion_t previous_version,