]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: group lifecycle versioned deletes to reduce OLH contention 67700/head
authorMatthew N. Heler <matthew.heler@hotmail.com>
Fri, 6 Mar 2026 17:46:44 +0000 (11:46 -0600)
committerMatthew N. Heler <matthew.heler@hotmail.com>
Thu, 14 May 2026 21:25:39 +0000 (16:25 -0500)
When multiple versions of the same key expire together, each delete
does a read-modify-write of the OLH on the same bucket index shard.

Buffer versions of the same key during listing and flush on key change.
Groups with multiple versions get pre-evaluated, then hard deletes go
through rgw::multi_delete::dispatch() which skips OLH updates on all
but the last delete. LCOpRule::process() is split into evaluate() and
execute() to support this two-phase pattern.

Non-versioned buckets and single-version groups are unchanged.

Signed-off-by: Matthew N. Heler <matthew.heler@hotmail.com>
src/rgw/rgw_lc.cc

index 1bd8cbf91f27e784b17132fce7431ca3ecd335da..fdfbbc5abf25defff22e057acce7d7df187024a6 100644 (file)
@@ -32,6 +32,7 @@
 #include "rgw_lc.h"
 #include "rgw_string.h"
 #include "rgw_sal.h"
+#include "rgw_multi_del.h"
 #include "rgw_multipart_meta_filter.h"
 
 #include "fmt/format.h"
@@ -607,6 +608,7 @@ struct lc_op_ctx {
 
   std::unique_ptr<rgw::sal::PlacementTier> tier;
   const RGWObjTags* cached_tags{nullptr};
+  bool skip_update_olh{false};
 
   lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
            boost::optional<std::string> next_key_name,
@@ -782,6 +784,9 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp,
 
   uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone()))
                    ? rgw::sal::FLAG_LOG_OP : 0;
+  if (remove_indeed && oc.skip_update_olh) {
+    flags |= rgw::sal::FLAG_SKIP_UPDATE_OLH;
+  }
   ret =  del_op->delete_obj(dpp, y, flags);
   if (ret < 0) {
     ldpp_dout(dpp, 1) <<
@@ -824,6 +829,17 @@ public:
     return 0;
   }
 
+  /*
+   * True for actions that always hard-delete (remove_indeed=true).
+   * Used to route deletes through rgw::multi_delete::dispatch()
+   * so redundant OLH updates can be skipped. CurrentExpiration
+   * is excluded because on versioned buckets it creates a delete
+   * marker rather than hard-deleting.
+   */
+  virtual bool is_delete() const {
+    return false;
+  }
+
   friend class LCOpRule;
 }; /* LCOpAction */
 
@@ -867,6 +883,18 @@ public:
 
   void build();
   void update();
+
+  LCOpAction* evaluate(rgw_bucket_dir_entry& o,
+                      const DoutPrefixProvider *dpp,
+                      const RGWObjTags* cached_tags,
+                      optional_yield y);
+  int execute(LCOpAction& action,
+             rgw_bucket_dir_entry& o,
+             const DoutPrefixProvider *dpp,
+             LCBatchCounters* batch_counters,
+             const RGWObjTags* cached_tags,
+             bool skip_update_olh,
+             optional_yield y);
   int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
               LCBatchCounters* batch_counters, optional_yield y,
               const RGWObjTags* cached_tags = nullptr);
@@ -1050,6 +1078,36 @@ static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, b
   return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, y);
 }
 
+/*
+ * Fetch and decode object tags. Returns a pointer into buf on success,
+ * or nullptr if the object is a delete marker, has no tags, or on error.
+ */
+static const RGWObjTags* fetch_obj_tags(
+    const DoutPrefixProvider* dpp, rgw::sal::Bucket* bucket,
+    const rgw_bucket_dir_entry& obj,
+    boost::optional<RGWObjTags>& buf, optional_yield y)
+{
+  if (obj.is_delete_marker()) return nullptr;
+
+  rgw_obj_key key = obj.key;
+  if (key.instance.empty() && bucket->versioned() && !obj.is_current())
+    key.instance = "null";
+
+  bufferlist bl;
+  auto o = bucket->get_object(key);
+  if (read_obj_tags(dpp, o.get(), bl, y) != 0)
+    return nullptr;
+
+  try {
+    buf.emplace();
+    auto it = bl.cbegin();
+    buf->decode(it);
+    return &*buf;
+  } catch (buffer::error&) {
+    return nullptr;
+  }
+}
+
 static bool is_valid_op(const lc_op& op)
 {
       return (op.status &&
@@ -1268,6 +1326,8 @@ public:
   LCOpAction_NonCurrentExpiration(op_env& env)
     {}
 
+  bool is_delete() const override { return true; }
+
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
     auto& o = oc.o;
     if (o.is_current()) {
@@ -1320,6 +1380,8 @@ class LCOpAction_DMExpiration : public LCOpAction {
 public:
   LCOpAction_DMExpiration(op_env& env) {}
 
+  bool is_delete() const override { return true; }
+
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
     auto& o = oc.o;
     if (!o.is_delete_marker()) {
@@ -1703,14 +1765,19 @@ void LCOpRule::update()
   effective_mtime = env.ol.get_prev_obj().meta.mtime;
 }
 
-int LCOpRule::process(rgw_bucket_dir_entry& o,
-                     const DoutPrefixProvider *dpp,
-                     LCBatchCounters* batch_counters,
-                      optional_yield y,
-                     const RGWObjTags* cached_tags)
+/*
+ * evaluate() and execute() each construct a fresh lc_op_ctx. If a
+ * check() mutates ctx, those mutations are discarded before execute()
+ * runs — check() implementations must not stash state in ctx.
+ */
+LCOpAction* LCOpRule::evaluate(rgw_bucket_dir_entry& o,
+                              const DoutPrefixProvider *dpp,
+                              const RGWObjTags* cached_tags,
+                              optional_yield y)
 {
-  lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, batch_counters);
+  lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, nullptr);
   ctx.cached_tags = cached_tags;
+
   shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
   real_time exp;
 
@@ -1725,45 +1792,70 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
     }
   }
 
-  if (selected &&
-      (*selected)->should_process()) {
-
-    /*
-     * Calling filter checks after action checks because
-     * all action checks (as they are implemented now) do
-     * not access the objects themselves, but return result
-     * from info from bucket index listing. The current tags filter
-     * check does access the objects, so we avoid unnecessary rados calls
-     * having filters check later in the process.
-     */
-
-    bool cont = false;
-    for (auto& f : filters) {
-      if (f->check(dpp, ctx, y)) {
-        cont = true;
-        break;
-      }
-    }
+  if (!selected || !(*selected)->should_process()) {
+    return nullptr;
+  }
 
-    if (!cont) {
-      ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
-                        << ": no rule match, skipping" << dendl;
-      return 0;
+  /*
+   * Calling filter checks after action checks because
+   * all action checks (as they are implemented now) do
+   * not access the objects themselves, but return result
+   * from info from bucket index listing. The current tags filter
+   * check does access the objects, so we avoid unnecessary rados calls
+   * having filters check later in the process.
+   */
+  bool cont = false;
+  for (auto& f : filters) {
+    if (f->check(dpp, ctx, y)) {
+      cont = true;
+      break;
     }
+  }
 
-    int r = (*selected)->process(ctx, y);
-    if (r < 0) {
-      ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " 
-                       << env.bucket << ":" << o.key
-                       << " " << cpp_strerror(r) << dendl;
-      return r;
-    }
-    ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
-                      << o.key << dendl;
+  if (!cont) {
+    ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
+                      << ": no rule match, skipping" << dendl;
+    return nullptr;
   }
 
+  return selected->get();
+}
+
+int LCOpRule::execute(LCOpAction& action,
+                     rgw_bucket_dir_entry& o,
+                     const DoutPrefixProvider *dpp,
+                     LCBatchCounters* batch_counters,
+                     const RGWObjTags* cached_tags,
+                     bool skip_update_olh,
+                     optional_yield y)
+{
+  lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, batch_counters);
+  ctx.cached_tags = cached_tags;
+  ctx.skip_update_olh = skip_update_olh;
+
+  int r = action.process(ctx, y);
+  if (r < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
+                     << env.bucket << ":" << o.key
+                     << " " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
+                    << o.key << dendl;
   return 0;
+}
 
+int LCOpRule::process(rgw_bucket_dir_entry& o,
+                     const DoutPrefixProvider *dpp,
+                     LCBatchCounters* batch_counters,
+                     optional_yield y,
+                     const RGWObjTags* cached_tags)
+{
+  auto* action = evaluate(o, dpp, cached_tags, y);
+  if (!action) {
+    return 0;
+  }
+  return execute(*action, o, dpp, batch_counters, cached_tags, false, y);
 }
 
 int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
@@ -1962,6 +2054,149 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
       rules.back().build(); // why can't ctor do it?
     }
 
+    /*
+     * Buffer versions of the same key so versioned deletes can
+     * skip redundant OLH updates via rgw::multi_delete::dispatch().
+     */
+    struct lc_obj_entry {
+      rgw_bucket_dir_entry obj;
+      std::vector<LCOpRule> rules;
+    };
+    std::vector<lc_obj_entry> key_group;
+
+    auto flush_key_group = [&] {
+      if (key_group.empty()) return;
+
+      if (!bucket->versioned() || key_group.size() == 1) {
+        /*
+         * Non-versioned or single version: spawn per-object.
+         */
+        for (auto& entry : key_group) {
+          // Spawn one coroutine per object to process all rules
+          workpool.spawn([&pf, &batch_counters, dpp=this,
+                          rules_copy=std::move(entry.rules),
+                          obj=std::move(entry.obj), bucket=bucket.get()]
+                         (boost::asio::yield_context yield) mutable {
+            // Fetch tags once per object if any rule needs them
+            boost::optional<RGWObjTags> cached_tags;
+            const RGWObjTags* cached_tags_ptr = nullptr;
+            if (std::any_of(rules_copy.begin(), rules_copy.end(),
+                    [](const LCOpRule& r) { return r.needs_tags(); }))
+              cached_tags_ptr = fetch_obj_tags(dpp, bucket, obj,
+                                               cached_tags, yield);
+
+            for (auto& rule : rules_copy) {
+              if (rule.needs_tags() &&
+                  !obj.is_delete_marker() &&
+                  cached_tags_ptr &&
+                  !has_all_tags(rule.get_op(), *cached_tags_ptr)) {
+                continue;
+              }
+              if (rule.needs_tags() &&
+                  !obj.is_delete_marker() &&
+                  !cached_tags_ptr) {
+                continue;
+              }
+              pf(dpp, yield, rule, obj, cached_tags_ptr);
+            }
+            batch_counters.decrement_pending();
+          });
+        }
+      } else {
+        /*
+         * Versioned with multiple versions: evaluate all entries,
+         * then pass hard deletes to rgw::multi_delete::dispatch()
+         * to skip redundant OLH updates.
+         */
+        workpool.spawn(
+            [dpp=this, group=std::move(key_group), bucket=bucket.get(),
+             &batch_counters]
+            (boost::asio::yield_context yield) mutable {
+
+              struct action_ref { LCOpAction* a; size_t ei, ri; };
+              struct tag_entry {
+                boost::optional<RGWObjTags> buf;
+                const RGWObjTags* ptr{nullptr};
+              };
+
+              std::vector<tag_entry> tags(group.size());
+              std::vector<action_ref> dels, non_dels;
+
+              for (size_t ei = 0; ei < group.size(); ++ei) {
+                auto& obj = group[ei].obj;
+                if (std::any_of(group[ei].rules.begin(),
+                        group[ei].rules.end(),
+                        [](const LCOpRule& r) { return r.needs_tags(); }))
+                  tags[ei].ptr = fetch_obj_tags(dpp, bucket, obj,
+                                                tags[ei].buf, yield);
+                bool has_del = false;
+                std::vector<action_ref> entry_non_dels;
+                entry_non_dels.reserve(group[ei].rules.size());
+                for (size_t ri = 0; ri < group[ei].rules.size(); ++ri) {
+                  auto& rule = group[ei].rules[ri];
+                  if (rule.needs_tags() &&
+                      !obj.is_delete_marker() &&
+                      tags[ei].ptr &&
+                      !has_all_tags(rule.get_op(), *tags[ei].ptr))
+                    continue;
+                  if (rule.needs_tags() &&
+                      !obj.is_delete_marker() &&
+                      !tags[ei].ptr)
+                    continue;
+                  auto* act = rule.evaluate(
+                      group[ei].obj, dpp, tags[ei].ptr, yield);
+                  if (!act) continue;
+                  if (act->is_delete()) {
+                    /*
+                     * S3 lifecycle conflict resolution gives hard deletes
+                     * precedence over transitions for the same version.
+                     * Suppress queued non-delete work once one matches.
+                     */
+                    dels.push_back({act, ei, ri});
+                    has_del = true;
+                    entry_non_dels.clear();
+                    break;
+                  }
+                  entry_non_dels.push_back({act, ei, ri});
+                }
+                if (!has_del) {
+                  non_dels.insert(non_dels.end(),
+                                  entry_non_dels.begin(),
+                                  entry_non_dels.end());
+                }
+              }
+
+              for (auto& r : non_dels)
+                group[r.ei].rules[r.ri].execute(
+                    *r.a, group[r.ei].obj, dpp,
+                    &batch_counters, tags[r.ei].ptr, false, yield);
+
+              std::vector<rgw::multi_delete::Item> items;
+              items.reserve(dels.size());
+              for (size_t i = 0; i < dels.size(); ++i)
+                items.push_back({group[dels[i].ei].obj.key, i});
+
+              /*
+               * max_aio=1: all versions share a bucket index shard,
+               * so parallel deletes would contend on that shard.
+               */
+              rgw::multi_delete::dispatch(
+                  items, true, 1, yield,
+                  [&](const rgw::multi_delete::Item& item,
+                      bool skip_olh, boost::asio::yield_context y) {
+                    auto& r = dels[item.index];
+                    group[r.ei].rules[r.ri].execute(
+                        *r.a, group[r.ei].obj, dpp,
+                        &batch_counters, tags[r.ei].ptr, skip_olh, y);
+                  });
+
+              for (size_t i = 0; i < group.size(); ++i)
+                batch_counters.decrement_pending();
+            });
+      }
+      key_group.clear();
+    };
+
     rgw_bucket_dir_entry* o{nullptr};
     for (auto offset = 0; ol.get_obj(this, yield, &o /* , fetch_barrier */); ++offset, ol.next()) {
       const auto obj = *o;
@@ -1971,60 +2206,13 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
         rule.update();
       }
 
-      // Spawn one coroutine per object to process all rules
-      workpool.spawn([&pf, &batch_counters, dpp=this, rules_copy=rules, obj, bucket=bucket.get()]
-                     (boost::asio::yield_context yield) mutable {
-        // Check if any rule needs tags so we only fetch once per object
-        bool any_rule_needs_tags = std::any_of(rules_copy.begin(), rules_copy.end(),
-          [](const LCOpRule& r) { return r.needs_tags(); });
-
-        boost::optional<RGWObjTags> cached_tags;
-        const RGWObjTags* cached_tags_ptr = nullptr;
-
-        if (any_rule_needs_tags && !obj.is_delete_marker()) {
-          bufferlist tags_bl;
-
-          rgw_obj_key obj_key = obj.key;
-          if (obj_key.instance.empty() && bucket->versioned() && !obj.is_current()) {
-            obj_key.instance = "null";
-          }
-
-          auto temp_obj = bucket->get_object(obj_key);
-          std::unique_ptr<rgw::sal::Object::ReadOp> rop = temp_obj->get_read_op();
-          int ret = rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, yield);
-          if (ret == 0) {
-            try {
-              cached_tags.emplace();
-              auto iter = tags_bl.cbegin();
-              cached_tags->decode(iter);
-              cached_tags_ptr = &*cached_tags;
-            } catch (buffer::error& err) {
-              ldpp_dout(dpp, 5) << "ERROR: decode tags for " << obj.key << dendl;
-            }
-          }
-        }
-
-        for (auto& rule : rules_copy) {
-          if (rule.needs_tags() &&
-              !obj.is_delete_marker() &&
-              cached_tags_ptr &&
-              !has_all_tags(rule.get_op(), *cached_tags_ptr)) {
-            continue;
-          }
+      // flush on key change, or at 1000 versions to bound memory
+      if (!key_group.empty()
+          && (obj.key.name != key_group.front().obj.key.name
+              || key_group.size() >= 1000))
+        flush_key_group();
 
-          if (rule.needs_tags() &&
-              !obj.is_delete_marker() &&
-              !cached_tags_ptr) {
-            continue;
-          }
-
-          pf(dpp, yield, rule, const_cast<rgw_bucket_dir_entry&>(obj),
-             cached_tags_ptr);
-        }
-
-        // Decrement pending once per object, after all rules are evaluated
-        batch_counters.decrement_pending();
-      });
+      key_group.push_back({obj, rules});
 
       total_objects_scanned++;
 
@@ -2047,10 +2235,12 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
          ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker="
                             << worker->ix << " bucket=" << bucket_name
                             << dendl;
+         flush_key_group();
          return 0;
        }
       }
     }
+    flush_key_group();
   }
 
   ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool,