#include "rgw_lc.h"
#include "rgw_string.h"
#include "rgw_sal.h"
+#include "rgw_multi_del.h"
#include "rgw_multipart_meta_filter.h"
#include "fmt/format.h"
std::unique_ptr<rgw::sal::PlacementTier> tier;
const RGWObjTags* cached_tags{nullptr};
+ bool skip_update_olh{false};
lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
boost::optional<std::string> next_key_name,
uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone()))
? rgw::sal::FLAG_LOG_OP : 0;
+ if (remove_indeed && oc.skip_update_olh) {
+ flags |= rgw::sal::FLAG_SKIP_UPDATE_OLH;
+ }
ret = del_op->delete_obj(dpp, y, flags);
if (ret < 0) {
ldpp_dout(dpp, 1) <<
return 0;
}
+ /*
+ * True for actions that always hard-delete (remove_indeed=true).
+ * Used to route deletes through rgw::multi_delete::dispatch()
+ * so redundant OLH updates can be skipped. CurrentExpiration
+ * is excluded because on versioned buckets it creates a delete
+ * marker rather than hard-deleting.
+ */
+ virtual bool is_delete() const {
+ return false;
+ }
+
friend class LCOpRule;
}; /* LCOpAction */
void build();
void update();
+
+ LCOpAction* evaluate(rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp,
+ const RGWObjTags* cached_tags,
+ optional_yield y);
+ int execute(LCOpAction& action,
+ rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp,
+ LCBatchCounters* batch_counters,
+ const RGWObjTags* cached_tags,
+ bool skip_update_olh,
+ optional_yield y);
int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
LCBatchCounters* batch_counters, optional_yield y,
const RGWObjTags* cached_tags = nullptr);
return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, y);
}
+/*
+ * Fetch and decode object tags. Returns a pointer into buf on success,
+ * or nullptr if the object is a delete marker, has no tags, or on error.
+ */
+static const RGWObjTags* fetch_obj_tags(
+ const DoutPrefixProvider* dpp, rgw::sal::Bucket* bucket,
+ const rgw_bucket_dir_entry& obj,
+ boost::optional<RGWObjTags>& buf, optional_yield y)
+{
+ if (obj.is_delete_marker()) return nullptr;
+
+ rgw_obj_key key = obj.key;
+ if (key.instance.empty() && bucket->versioned() && !obj.is_current())
+ key.instance = "null";
+
+ bufferlist bl;
+ auto o = bucket->get_object(key);
+ if (read_obj_tags(dpp, o.get(), bl, y) != 0)
+ return nullptr;
+
+ try {
+ buf.emplace();
+ auto it = bl.cbegin();
+ buf->decode(it);
+ return &*buf;
+ } catch (buffer::error&) {
+ return nullptr;
+ }
+}
+
static bool is_valid_op(const lc_op& op)
{
return (op.status &&
LCOpAction_NonCurrentExpiration(op_env& env)
{}
+ bool is_delete() const override { return true; }
+
bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
auto& o = oc.o;
if (o.is_current()) {
public:
LCOpAction_DMExpiration(op_env& env) {}
+ bool is_delete() const override { return true; }
+
bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
auto& o = oc.o;
if (!o.is_delete_marker()) {
effective_mtime = env.ol.get_prev_obj().meta.mtime;
}
-int LCOpRule::process(rgw_bucket_dir_entry& o,
- const DoutPrefixProvider *dpp,
- LCBatchCounters* batch_counters,
- optional_yield y,
- const RGWObjTags* cached_tags)
+/*
+ * evaluate() and execute() each construct a fresh lc_op_ctx. If a
+ * check() mutates ctx, those mutations are discarded before execute()
+ * runs — check() implementations must not stash state in ctx.
+ */
+LCOpAction* LCOpRule::evaluate(rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp,
+ const RGWObjTags* cached_tags,
+ optional_yield y)
{
- lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, batch_counters);
+ lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, nullptr);
ctx.cached_tags = cached_tags;
+
shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
real_time exp;
}
}
- if (selected &&
- (*selected)->should_process()) {
-
- /*
- * Calling filter checks after action checks because
- * all action checks (as they are implemented now) do
- * not access the objects themselves, but return result
- * from info from bucket index listing. The current tags filter
- * check does access the objects, so we avoid unnecessary rados calls
- * having filters check later in the process.
- */
-
- bool cont = false;
- for (auto& f : filters) {
- if (f->check(dpp, ctx, y)) {
- cont = true;
- break;
- }
- }
+ if (!selected || !(*selected)->should_process()) {
+ return nullptr;
+ }
- if (!cont) {
- ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
- << ": no rule match, skipping" << dendl;
- return 0;
+ /*
+ * Calling filter checks after action checks because
+ * all action checks (as they are implemented now) do
+ * not access the objects themselves, but return result
+ * from info from bucket index listing. The current tags filter
+ * check does access the objects, so we avoid unnecessary rados calls
+ * having filters check later in the process.
+ */
+ bool cont = false;
+ for (auto& f : filters) {
+ if (f->check(dpp, ctx, y)) {
+ cont = true;
+ break;
}
+ }
- int r = (*selected)->process(ctx, y);
- if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
- << env.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
- return r;
- }
- ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
- << o.key << dendl;
+ if (!cont) {
+ ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
+ << ": no rule match, skipping" << dendl;
+ return nullptr;
}
+ return selected->get();
+}
+
+int LCOpRule::execute(LCOpAction& action,
+ rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp,
+ LCBatchCounters* batch_counters,
+ const RGWObjTags* cached_tags,
+ bool skip_update_olh,
+ optional_yield y)
+{
+ lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, batch_counters);
+ ctx.cached_tags = cached_tags;
+ ctx.skip_update_olh = skip_update_olh;
+
+ int r = action.process(ctx, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
+ << env.bucket << ":" << o.key
+ << " " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
+ << o.key << dendl;
return 0;
+}
+int LCOpRule::process(rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp,
+ LCBatchCounters* batch_counters,
+ optional_yield y,
+ const RGWObjTags* cached_tags)
+{
+ auto* action = evaluate(o, dpp, cached_tags, y);
+ if (!action) {
+ return 0;
+ }
+ return execute(*action, o, dpp, batch_counters, cached_tags, false, y);
}
int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
rules.back().build(); // why can't ctor do it?
}
+ /*
+ * Buffer versions of the same key so versioned deletes can
+ * skip redundant OLH updates via rgw::multi_delete::dispatch().
+ */
+ struct lc_obj_entry {
+ rgw_bucket_dir_entry obj;
+ std::vector<LCOpRule> rules;
+ };
+ std::vector<lc_obj_entry> key_group;
+
+ auto flush_key_group = [&] {
+ if (key_group.empty()) return;
+
+ if (!bucket->versioned() || key_group.size() == 1) {
+ /*
+ * Non-versioned or single version: spawn per-object.
+ */
+ for (auto& entry : key_group) {
+ // Spawn one coroutine per object to process all rules
+ workpool.spawn([&pf, &batch_counters, dpp=this,
+ rules_copy=std::move(entry.rules),
+ obj=std::move(entry.obj), bucket=bucket.get()]
+ (boost::asio::yield_context yield) mutable {
+ // Fetch tags once per object if any rule needs them
+ boost::optional<RGWObjTags> cached_tags;
+ const RGWObjTags* cached_tags_ptr = nullptr;
+ if (std::any_of(rules_copy.begin(), rules_copy.end(),
+ [](const LCOpRule& r) { return r.needs_tags(); }))
+ cached_tags_ptr = fetch_obj_tags(dpp, bucket, obj,
+ cached_tags, yield);
+
+ for (auto& rule : rules_copy) {
+ if (rule.needs_tags() &&
+ !obj.is_delete_marker() &&
+ cached_tags_ptr &&
+ !has_all_tags(rule.get_op(), *cached_tags_ptr)) {
+ continue;
+ }
+ if (rule.needs_tags() &&
+ !obj.is_delete_marker() &&
+ !cached_tags_ptr) {
+ continue;
+ }
+ pf(dpp, yield, rule, obj, cached_tags_ptr);
+ }
+ batch_counters.decrement_pending();
+ });
+ }
+ } else {
+ /*
+ * Versioned with multiple versions: evaluate all entries,
+ * then pass hard deletes to rgw::multi_delete::dispatch()
+ * to skip redundant OLH updates.
+ */
+ workpool.spawn(
+ [dpp=this, group=std::move(key_group), bucket=bucket.get(),
+ &batch_counters]
+ (boost::asio::yield_context yield) mutable {
+
+ struct action_ref { LCOpAction* a; size_t ei, ri; };
+ struct tag_entry {
+ boost::optional<RGWObjTags> buf;
+ const RGWObjTags* ptr{nullptr};
+ };
+
+ std::vector<tag_entry> tags(group.size());
+ std::vector<action_ref> dels, non_dels;
+
+ for (size_t ei = 0; ei < group.size(); ++ei) {
+ auto& obj = group[ei].obj;
+ if (std::any_of(group[ei].rules.begin(),
+ group[ei].rules.end(),
+ [](const LCOpRule& r) { return r.needs_tags(); }))
+ tags[ei].ptr = fetch_obj_tags(dpp, bucket, obj,
+ tags[ei].buf, yield);
+ bool has_del = false;
+ std::vector<action_ref> entry_non_dels;
+ entry_non_dels.reserve(group[ei].rules.size());
+ for (size_t ri = 0; ri < group[ei].rules.size(); ++ri) {
+ auto& rule = group[ei].rules[ri];
+ if (rule.needs_tags() &&
+ !obj.is_delete_marker() &&
+ tags[ei].ptr &&
+ !has_all_tags(rule.get_op(), *tags[ei].ptr))
+ continue;
+ if (rule.needs_tags() &&
+ !obj.is_delete_marker() &&
+ !tags[ei].ptr)
+ continue;
+ auto* act = rule.evaluate(
+ group[ei].obj, dpp, tags[ei].ptr, yield);
+ if (!act) continue;
+ if (act->is_delete()) {
+ /*
+ * S3 lifecycle conflict resolution gives hard deletes
+ * precedence over transitions for the same version.
+ * Suppress queued non-delete work once one matches.
+ */
+ dels.push_back({act, ei, ri});
+ has_del = true;
+ entry_non_dels.clear();
+ break;
+ }
+ entry_non_dels.push_back({act, ei, ri});
+ }
+ if (!has_del) {
+ non_dels.insert(non_dels.end(),
+ entry_non_dels.begin(),
+ entry_non_dels.end());
+ }
+ }
+
+ for (auto& r : non_dels)
+ group[r.ei].rules[r.ri].execute(
+ *r.a, group[r.ei].obj, dpp,
+ &batch_counters, tags[r.ei].ptr, false, yield);
+
+ std::vector<rgw::multi_delete::Item> items;
+ items.reserve(dels.size());
+ for (size_t i = 0; i < dels.size(); ++i)
+ items.push_back({group[dels[i].ei].obj.key, i});
+
+ /*
+ * max_aio=1: all versions share a bucket index shard,
+ * so parallel deletes would contend on that shard.
+ */
+ rgw::multi_delete::dispatch(
+ items, true, 1, yield,
+ [&](const rgw::multi_delete::Item& item,
+ bool skip_olh, boost::asio::yield_context y) {
+ auto& r = dels[item.index];
+ group[r.ei].rules[r.ri].execute(
+ *r.a, group[r.ei].obj, dpp,
+ &batch_counters, tags[r.ei].ptr, skip_olh, y);
+ });
+
+ for (size_t i = 0; i < group.size(); ++i)
+ batch_counters.decrement_pending();
+ });
+ }
+ key_group.clear();
+ };
+
rgw_bucket_dir_entry* o{nullptr};
for (auto offset = 0; ol.get_obj(this, yield, &o /* , fetch_barrier */); ++offset, ol.next()) {
const auto obj = *o;
rule.update();
}
- // Spawn one coroutine per object to process all rules
- workpool.spawn([&pf, &batch_counters, dpp=this, rules_copy=rules, obj, bucket=bucket.get()]
- (boost::asio::yield_context yield) mutable {
- // Check if any rule needs tags so we only fetch once per object
- bool any_rule_needs_tags = std::any_of(rules_copy.begin(), rules_copy.end(),
- [](const LCOpRule& r) { return r.needs_tags(); });
-
- boost::optional<RGWObjTags> cached_tags;
- const RGWObjTags* cached_tags_ptr = nullptr;
-
- if (any_rule_needs_tags && !obj.is_delete_marker()) {
- bufferlist tags_bl;
-
- rgw_obj_key obj_key = obj.key;
- if (obj_key.instance.empty() && bucket->versioned() && !obj.is_current()) {
- obj_key.instance = "null";
- }
-
- auto temp_obj = bucket->get_object(obj_key);
- std::unique_ptr<rgw::sal::Object::ReadOp> rop = temp_obj->get_read_op();
- int ret = rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, yield);
- if (ret == 0) {
- try {
- cached_tags.emplace();
- auto iter = tags_bl.cbegin();
- cached_tags->decode(iter);
- cached_tags_ptr = &*cached_tags;
- } catch (buffer::error& err) {
- ldpp_dout(dpp, 5) << "ERROR: decode tags for " << obj.key << dendl;
- }
- }
- }
-
- for (auto& rule : rules_copy) {
- if (rule.needs_tags() &&
- !obj.is_delete_marker() &&
- cached_tags_ptr &&
- !has_all_tags(rule.get_op(), *cached_tags_ptr)) {
- continue;
- }
+ // flush on key change, or at 1000 versions to bound memory
+ if (!key_group.empty()
+ && (obj.key.name != key_group.front().obj.key.name
+ || key_group.size() >= 1000))
+ flush_key_group();
- if (rule.needs_tags() &&
- !obj.is_delete_marker() &&
- !cached_tags_ptr) {
- continue;
- }
-
- pf(dpp, yield, rule, const_cast<rgw_bucket_dir_entry&>(obj),
- cached_tags_ptr);
- }
-
- // Decrement pending once per object, after all rules are evaluated
- batch_counters.decrement_pending();
- });
+ key_group.push_back({obj, rules});
total_objects_scanned++;
ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker="
<< worker->ix << " bucket=" << bucket_name
<< dendl;
+ flush_key_group();
return 0;
}
}
}
+ flush_key_group();
}
ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool,