]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/lc: reuse prefetched object tags across lifecycle filter 66367/head
authorMatthew N. Heler <matthew.heler@hotmail.com>
Thu, 20 Nov 2025 18:14:14 +0000 (12:14 -0600)
committerMatthew N. Heler <matthew.heler@hotmail.com>
Fri, 12 Dec 2025 12:38:28 +0000 (06:38 -0600)
Prefetch tags once per object and pass them into LCOpRule, letting
the tag filter reuse the cached set and skip tag-based rules early
when tags are absent or mismatched.

Signed-off-by: Matthew N. Heler <matthew.heler@hotmail.com>
src/rgw/rgw_lc.cc

index 7d1339b85c2d4b23385fe9098688c0f2a9c5339d..1fd2b9d1480300c7e3ec1e3da6305afbb49b1cde 100644 (file)
@@ -509,6 +509,7 @@ struct lc_op_ctx {
   const DoutPrefixProvider *dpp;
 
   std::unique_ptr<rgw::sal::PlacementTier> tier;
+  const RGWObjTags* cached_tags{nullptr};
 
   lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
            boost::optional<std::string> next_key_name,
@@ -750,6 +751,14 @@ class LCOpRule {
 public:
   LCOpRule(op_env& _env) : env(_env) {}
 
+  bool needs_tags() const {
+    return env.op.obj_tags != boost::none;
+  }
+
+  const lc_op& get_op() const {
+    return env.op;
+  }
+
   boost::optional<std::string> get_next_key_name() {
     return next_key_name;
   }
@@ -761,7 +770,7 @@ public:
   void build();
   void update();
   int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
-             optional_yield y);
+             optional_yield y, const RGWObjTags* cached_tags = nullptr);
 }; /* LCOpRule */
 
 RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
@@ -953,6 +962,18 @@ static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip,
   if (op.obj_tags != boost::none) {
     *skip = true;
 
+    if (oc.cached_tags) {
+      if (! has_all_tags(op, *oc.cached_tags)) {
+        ldpp_dout(oc.dpp, 20) << __func__ << "() skipping obj " << oc.obj
+                       << " as tags do not match in rule: "
+                       << op.id << dendl;
+        return 0;
+      }
+
+      *skip = false;
+      return 0;
+    }
+
     bufferlist tags_bl;
     int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl, y);
     if (ret < 0) {
@@ -1531,9 +1552,11 @@ void LCOpRule::update()
 
 int LCOpRule::process(rgw_bucket_dir_entry& o,
                      const DoutPrefixProvider *dpp,
-                     optional_yield y)
+                     optional_yield y,
+                     const RGWObjTags* cached_tags)
 {
   lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp);
+  ctx.cached_tags = cached_tags;
   shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
   real_time exp;
 
@@ -1656,10 +1679,11 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
   rgw::sal::Zone* zone = driver->get_zone();
 
   auto pf = [&bucket_name](const DoutPrefixProvider* dpp, optional_yield y,
-                           LCOpRule& op_rule, rgw_bucket_dir_entry& o) {
+                           LCOpRule& op_rule, rgw_bucket_dir_entry& o,
+                           const RGWObjTags* cached_tags) {
     ldpp_dout(dpp, 20)
       << __func__ << "(): key=" << o.key << dendl;
-    int ret = op_rule.process(o, dpp, y);
+    int ret = op_rule.process(o, dpp, y, cached_tags);
     if (ret < 0) {
       ldpp_dout(dpp, 20)
        << "ERROR: orule.process() returned ret=" << ret
@@ -1747,10 +1771,54 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
       }
 
       // Spawn one coroutine per object to process all rules
-      workpool.spawn([&pf, dpp=this, rules_copy=rules, obj]
+      workpool.spawn([&pf, dpp=this, rules_copy=rules, obj, bucket=bucket.get()]
                      (boost::asio::yield_context yield) mutable {
+        // Check if any rule needs tags so we only fetch once per object
+        bool any_rule_needs_tags = std::any_of(rules_copy.begin(), rules_copy.end(),
+          [](const LCOpRule& r) { return r.needs_tags(); });
+
+        boost::optional<RGWObjTags> cached_tags;
+        const RGWObjTags* cached_tags_ptr = nullptr;
+
+        if (any_rule_needs_tags && !obj.is_delete_marker()) {
+          bufferlist tags_bl;
+
+          rgw_obj_key obj_key = obj.key;
+          if (obj_key.instance.empty() && bucket->versioned() && !obj.is_current()) {
+            obj_key.instance = "null";
+          }
+
+          auto temp_obj = bucket->get_object(obj_key);
+          std::unique_ptr<rgw::sal::Object::ReadOp> rop = temp_obj->get_read_op();
+          int ret = rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, yield);
+          if (ret == 0) {
+            try {
+              cached_tags.emplace();
+              auto iter = tags_bl.cbegin();
+              cached_tags->decode(iter);
+              cached_tags_ptr = &*cached_tags;
+            } catch (buffer::error& err) {
+              ldpp_dout(dpp, 5) << "ERROR: decode tags for " << obj.key << dendl;
+            }
+          }
+        }
+
         for (auto& rule : rules_copy) {
-          pf(dpp, yield, rule, const_cast<rgw_bucket_dir_entry&>(obj));
+          if (rule.needs_tags() &&
+              !obj.is_delete_marker() &&
+              cached_tags_ptr &&
+              !has_all_tags(rule.get_op(), *cached_tags_ptr)) {
+            continue;
+          }
+
+          if (rule.needs_tags() &&
+              !obj.is_delete_marker() &&
+              !cached_tags_ptr) {
+            continue;
+          }
+
+          pf(dpp, yield, rule, const_cast<rgw_bucket_dir_entry&>(obj),
+             cached_tags_ptr);
         }
       });