From: Matthew N. Heler Date: Fri, 6 Mar 2026 17:46:44 +0000 (-0600) Subject: rgw: group lifecycle versioned deletes to reduce OLH contention X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F67700%2Fhead;p=ceph.git rgw: group lifecycle versioned deletes to reduce OLH contention When multiple versions of the same key expire together, each delete does a read-modify-write of the OLH on the same bucket index shard. Buffer versions of the same key during listing and flush on key change. Groups with multiple versions get pre-evaluated, then hard deletes go through rgw::multi_delete::dispatch() which skips OLH updates on all but the last delete. LCOpRule::process() is split into evaluate() and execute() to support this two-phase pattern. Non-versioned buckets and single-version groups are unchanged. Signed-off-by: Matthew N. Heler --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 1bd8cbf91f27..fdfbbc5abf25 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -32,6 +32,7 @@ #include "rgw_lc.h" #include "rgw_string.h" #include "rgw_sal.h" +#include "rgw_multi_del.h" #include "rgw_multipart_meta_filter.h" #include "fmt/format.h" @@ -607,6 +608,7 @@ struct lc_op_ctx { std::unique_ptr tier; const RGWObjTags* cached_tags{nullptr}; + bool skip_update_olh{false}; lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o, boost::optional next_key_name, @@ -782,6 +784,9 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone())) ? rgw::sal::FLAG_LOG_OP : 0; + if (remove_indeed && oc.skip_update_olh) { + flags |= rgw::sal::FLAG_SKIP_UPDATE_OLH; + } ret = del_op->delete_obj(dpp, y, flags); if (ret < 0) { ldpp_dout(dpp, 1) << @@ -824,6 +829,17 @@ public: return 0; } + /* + * True for actions that always hard-delete (remove_indeed=true). + * Used to route deletes through rgw::multi_delete::dispatch() + * so redundant OLH updates can be skipped. CurrentExpiration + * is excluded because on versioned buckets it creates a delete + * marker rather than hard-deleting. + */ + virtual bool is_delete() const { + return false; + } + friend class LCOpRule; }; /* LCOpAction */ @@ -867,6 +883,18 @@ public: void build(); void update(); + + LCOpAction* evaluate(rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, + const RGWObjTags* cached_tags, + optional_yield y); + int execute(LCOpAction& action, + rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, + LCBatchCounters* batch_counters, + const RGWObjTags* cached_tags, + bool skip_update_olh, + optional_yield y); int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, LCBatchCounters* batch_counters, optional_yield y, const RGWObjTags* cached_tags = nullptr); @@ -1050,6 +1078,36 @@ static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, b return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, y); } +/* + * Fetch and decode object tags. Returns a pointer into buf on success, + * or nullptr if the object is a delete marker, has no tags, or on error. + */ +static const RGWObjTags* fetch_obj_tags( + const DoutPrefixProvider* dpp, rgw::sal::Bucket* bucket, + const rgw_bucket_dir_entry& obj, + boost::optional& buf, optional_yield y) +{ + if (obj.is_delete_marker()) return nullptr; + + rgw_obj_key key = obj.key; + if (key.instance.empty() && bucket->versioned() && !obj.is_current()) + key.instance = "null"; + + bufferlist bl; + auto o = bucket->get_object(key); + if (read_obj_tags(dpp, o.get(), bl, y) != 0) + return nullptr; + + try { + buf.emplace(); + auto it = bl.cbegin(); + buf->decode(it); + return &*buf; + } catch (buffer::error&) { + return nullptr; + } +} + static bool is_valid_op(const lc_op& op) { return (op.status && @@ -1268,6 +1326,8 @@ public: LCOpAction_NonCurrentExpiration(op_env& env) {} + bool is_delete() const override { return true; } + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override { auto& o = oc.o; if (o.is_current()) { @@ -1320,6 +1380,8 @@ class LCOpAction_DMExpiration : public LCOpAction { public: LCOpAction_DMExpiration(op_env& env) {} + bool is_delete() const override { return true; } + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override { auto& o = oc.o; if (!o.is_delete_marker()) { @@ -1703,14 +1765,19 @@ void LCOpRule::update() effective_mtime = env.ol.get_prev_obj().meta.mtime; } -int LCOpRule::process(rgw_bucket_dir_entry& o, - const DoutPrefixProvider *dpp, - LCBatchCounters* batch_counters, - optional_yield y, - const RGWObjTags* cached_tags) +/* + * evaluate() and execute() each construct a fresh lc_op_ctx. If a + * check() mutates ctx, those mutations are discarded before execute() + * runs — check() implementations must not stash state in ctx. + */ +LCOpAction* LCOpRule::evaluate(rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, + const RGWObjTags* cached_tags, + optional_yield y) { - lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, batch_counters); + lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, nullptr); ctx.cached_tags = cached_tags; + shared_ptr *selected = nullptr; // n.b., req'd by sharing real_time exp; @@ -1725,45 +1792,70 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, } } - if (selected && - (*selected)->should_process()) { - - /* - * Calling filter checks after action checks because - * all action checks (as they are implemented now) do - * not access the objects themselves, but return result - * from info from bucket index listing. The current tags filter - * check does access the objects, so we avoid unnecessary rados calls - * having filters check later in the process. - */ - - bool cont = false; - for (auto& f : filters) { - if (f->check(dpp, ctx, y)) { - cont = true; - break; - } - } + if (!selected || !(*selected)->should_process()) { + return nullptr; + } - if (!cont) { - ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key - << ": no rule match, skipping" << dendl; - return 0; + /* + * Calling filter checks after action checks because + * all action checks (as they are implemented now) do + * not access the objects themselves, but return result + * from info from bucket index listing. The current tags filter + * check does access the objects, so we avoid unnecessary rados calls + * having filters check later in the process. + */ + bool cont = false; + for (auto& f : filters) { + if (f->check(dpp, ctx, y)) { + cont = true; + break; } + } - int r = (*selected)->process(ctx, y); - if (r < 0) { - ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " - << env.bucket << ":" << o.key - << " " << cpp_strerror(r) << dendl; - return r; - } - ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":" - << o.key << dendl; + if (!cont) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": no rule match, skipping" << dendl; + return nullptr; } + return selected->get(); +} + +int LCOpRule::execute(LCOpAction& action, + rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, + LCBatchCounters* batch_counters, + const RGWObjTags* cached_tags, + bool skip_update_olh, + optional_yield y) +{ + lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, batch_counters); + ctx.cached_tags = cached_tags; + ctx.skip_update_olh = skip_update_olh; + + int r = action.process(ctx, y); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " + << env.bucket << ":" << o.key + << " " << cpp_strerror(r) << dendl; + return r; + } + ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":" + << o.key << dendl; return 0; +} +int LCOpRule::process(rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, + LCBatchCounters* batch_counters, + optional_yield y, + const RGWObjTags* cached_tags) +{ + auto* action = evaluate(o, dpp, cached_tags, y); + if (!action) { + return 0; + } + return execute(*action, o, dpp, batch_counters, cached_tags, false, y); } int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, @@ -1962,6 +2054,149 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, rules.back().build(); // why can't ctor do it? } + /* + * Buffer versions of the same key so versioned deletes can + * skip redundant OLH updates via rgw::multi_delete::dispatch(). + */ + struct lc_obj_entry { + rgw_bucket_dir_entry obj; + std::vector rules; + }; + std::vector key_group; + + auto flush_key_group = [&] { + if (key_group.empty()) return; + + if (!bucket->versioned() || key_group.size() == 1) { + /* + * Non-versioned or single version: spawn per-object. + */ + for (auto& entry : key_group) { + // Spawn one coroutine per object to process all rules + workpool.spawn([&pf, &batch_counters, dpp=this, + rules_copy=std::move(entry.rules), + obj=std::move(entry.obj), bucket=bucket.get()] + (boost::asio::yield_context yield) mutable { + // Fetch tags once per object if any rule needs them + boost::optional cached_tags; + const RGWObjTags* cached_tags_ptr = nullptr; + if (std::any_of(rules_copy.begin(), rules_copy.end(), + [](const LCOpRule& r) { return r.needs_tags(); })) + cached_tags_ptr = fetch_obj_tags(dpp, bucket, obj, + cached_tags, yield); + + for (auto& rule : rules_copy) { + if (rule.needs_tags() && + !obj.is_delete_marker() && + cached_tags_ptr && + !has_all_tags(rule.get_op(), *cached_tags_ptr)) { + continue; + } + if (rule.needs_tags() && + !obj.is_delete_marker() && + !cached_tags_ptr) { + continue; + } + pf(dpp, yield, rule, obj, cached_tags_ptr); + } + batch_counters.decrement_pending(); + }); + } + } else { + /* + * Versioned with multiple versions: evaluate all entries, + * then pass hard deletes to rgw::multi_delete::dispatch() + * to skip redundant OLH updates. + */ + workpool.spawn( + [dpp=this, group=std::move(key_group), bucket=bucket.get(), + &batch_counters] + (boost::asio::yield_context yield) mutable { + + struct action_ref { LCOpAction* a; size_t ei, ri; }; + struct tag_entry { + boost::optional buf; + const RGWObjTags* ptr{nullptr}; + }; + + std::vector tags(group.size()); + std::vector dels, non_dels; + + for (size_t ei = 0; ei < group.size(); ++ei) { + auto& obj = group[ei].obj; + if (std::any_of(group[ei].rules.begin(), + group[ei].rules.end(), + [](const LCOpRule& r) { return r.needs_tags(); })) + tags[ei].ptr = fetch_obj_tags(dpp, bucket, obj, + tags[ei].buf, yield); + bool has_del = false; + std::vector entry_non_dels; + entry_non_dels.reserve(group[ei].rules.size()); + for (size_t ri = 0; ri < group[ei].rules.size(); ++ri) { + auto& rule = group[ei].rules[ri]; + if (rule.needs_tags() && + !obj.is_delete_marker() && + tags[ei].ptr && + !has_all_tags(rule.get_op(), *tags[ei].ptr)) + continue; + if (rule.needs_tags() && + !obj.is_delete_marker() && + !tags[ei].ptr) + continue; + auto* act = rule.evaluate( + group[ei].obj, dpp, tags[ei].ptr, yield); + if (!act) continue; + if (act->is_delete()) { + /* + * S3 lifecycle conflict resolution gives hard deletes + * precedence over transitions for the same version. + * Suppress queued non-delete work once one matches. + */ + dels.push_back({act, ei, ri}); + has_del = true; + entry_non_dels.clear(); + break; + } + entry_non_dels.push_back({act, ei, ri}); + } + if (!has_del) { + non_dels.insert(non_dels.end(), + entry_non_dels.begin(), + entry_non_dels.end()); + } + } + + for (auto& r : non_dels) + group[r.ei].rules[r.ri].execute( + *r.a, group[r.ei].obj, dpp, + &batch_counters, tags[r.ei].ptr, false, yield); + + std::vector items; + items.reserve(dels.size()); + for (size_t i = 0; i < dels.size(); ++i) + items.push_back({group[dels[i].ei].obj.key, i}); + + /* + * max_aio=1: all versions share a bucket index shard, + * so parallel deletes would contend on that shard. + */ + rgw::multi_delete::dispatch( + items, true, 1, yield, + [&](const rgw::multi_delete::Item& item, + bool skip_olh, boost::asio::yield_context y) { + auto& r = dels[item.index]; + group[r.ei].rules[r.ri].execute( + *r.a, group[r.ei].obj, dpp, + &batch_counters, tags[r.ei].ptr, skip_olh, y); + }); + + for (size_t i = 0; i < group.size(); ++i) + batch_counters.decrement_pending(); + }); + } + key_group.clear(); + }; + rgw_bucket_dir_entry* o{nullptr}; for (auto offset = 0; ol.get_obj(this, yield, &o /* , fetch_barrier */); ++offset, ol.next()) { const auto obj = *o; @@ -1971,60 +2206,13 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, rule.update(); } - // Spawn one coroutine per object to process all rules - workpool.spawn([&pf, &batch_counters, dpp=this, rules_copy=rules, obj, bucket=bucket.get()] - (boost::asio::yield_context yield) mutable { - // Check if any rule needs tags so we only fetch once per object - bool any_rule_needs_tags = std::any_of(rules_copy.begin(), rules_copy.end(), - [](const LCOpRule& r) { return r.needs_tags(); }); - - boost::optional cached_tags; - const RGWObjTags* cached_tags_ptr = nullptr; - - if (any_rule_needs_tags && !obj.is_delete_marker()) { - bufferlist tags_bl; - - rgw_obj_key obj_key = obj.key; - if (obj_key.instance.empty() && bucket->versioned() && !obj.is_current()) { - obj_key.instance = "null"; - } - - auto temp_obj = bucket->get_object(obj_key); - std::unique_ptr rop = temp_obj->get_read_op(); - int ret = rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, yield); - if (ret == 0) { - try { - cached_tags.emplace(); - auto iter = tags_bl.cbegin(); - cached_tags->decode(iter); - cached_tags_ptr = &*cached_tags; - } catch (buffer::error& err) { - ldpp_dout(dpp, 5) << "ERROR: decode tags for " << obj.key << dendl; - } - } - } - - for (auto& rule : rules_copy) { - if (rule.needs_tags() && - !obj.is_delete_marker() && - cached_tags_ptr && - !has_all_tags(rule.get_op(), *cached_tags_ptr)) { - continue; - } + // flush on key change, or at 1000 versions to bound memory + if (!key_group.empty() + && (obj.key.name != key_group.front().obj.key.name + || key_group.size() >= 1000)) + flush_key_group(); - if (rule.needs_tags() && - !obj.is_delete_marker() && - !cached_tags_ptr) { - continue; - } - - pf(dpp, yield, rule, const_cast(obj), - cached_tags_ptr); - } - - // Decrement pending once per object, after all rules are evaluated - batch_counters.decrement_pending(); - }); + key_group.push_back({obj, rules}); total_objects_scanned++; @@ -2047,10 +2235,12 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker=" << worker->ix << " bucket=" << bucket_name << dendl; + flush_key_group(); return 0; } } } + flush_key_group(); } ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool,