const DoutPrefixProvider *dpp;
std::unique_ptr<rgw::sal::PlacementTier> tier;
+ const RGWObjTags* cached_tags{nullptr};
lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
boost::optional<std::string> next_key_name,
public:
LCOpRule(op_env& _env) : env(_env) {}
+ bool needs_tags() const {
+ return env.op.obj_tags != boost::none;
+ }
+
+ const lc_op& get_op() const {
+ return env.op;
+ }
+
boost::optional<std::string> get_next_key_name() {
return next_key_name;
}
void build();
void update();
int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
- optional_yield y);
+ optional_yield y, const RGWObjTags* cached_tags = nullptr);
}; /* LCOpRule */
RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
if (op.obj_tags != boost::none) {
*skip = true;
+ if (oc.cached_tags) {
+ if (! has_all_tags(op, *oc.cached_tags)) {
+ ldpp_dout(oc.dpp, 20) << __func__ << "() skipping obj " << oc.obj
+ << " as tags do not match in rule: "
+ << op.id << dendl;
+ return 0;
+ }
+
+ *skip = false;
+ return 0;
+ }
+
bufferlist tags_bl;
int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl, y);
if (ret < 0) {
int LCOpRule::process(rgw_bucket_dir_entry& o,
const DoutPrefixProvider *dpp,
- optional_yield y)
+ optional_yield y,
+ const RGWObjTags* cached_tags)
{
lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp);
+ ctx.cached_tags = cached_tags;
shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
real_time exp;
rgw::sal::Zone* zone = driver->get_zone();
auto pf = [&bucket_name](const DoutPrefixProvider* dpp, optional_yield y,
- LCOpRule& op_rule, rgw_bucket_dir_entry& o) {
+ LCOpRule& op_rule, rgw_bucket_dir_entry& o,
+ const RGWObjTags* cached_tags) {
ldpp_dout(dpp, 20)
<< __func__ << "(): key=" << o.key << dendl;
- int ret = op_rule.process(o, dpp, y);
+ int ret = op_rule.process(o, dpp, y, cached_tags);
if (ret < 0) {
ldpp_dout(dpp, 20)
<< "ERROR: orule.process() returned ret=" << ret
}
// Spawn one coroutine per object to process all rules
- workpool.spawn([&pf, dpp=this, rules_copy=rules, obj]
+ workpool.spawn([&pf, dpp=this, rules_copy=rules, obj, bucket=bucket.get()]
(boost::asio::yield_context yield) mutable {
+ // Check if any rule needs tags so we only fetch once per object
+ bool any_rule_needs_tags = std::any_of(rules_copy.begin(), rules_copy.end(),
+ [](const LCOpRule& r) { return r.needs_tags(); });
+
+ boost::optional<RGWObjTags> cached_tags;
+ const RGWObjTags* cached_tags_ptr = nullptr;
+
+ if (any_rule_needs_tags && !obj.is_delete_marker()) {
+ bufferlist tags_bl;
+
+ rgw_obj_key obj_key = obj.key;
+ if (obj_key.instance.empty() && bucket->versioned() && !obj.is_current()) {
+ obj_key.instance = "null";
+ }
+
+ auto temp_obj = bucket->get_object(obj_key);
+ std::unique_ptr<rgw::sal::Object::ReadOp> rop = temp_obj->get_read_op();
+ int ret = rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, yield);
+ if (ret == 0) {
+ try {
+ cached_tags.emplace();
+ auto iter = tags_bl.cbegin();
+ cached_tags->decode(iter);
+ cached_tags_ptr = &*cached_tags;
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 5) << "ERROR: decode tags for " << obj.key << dendl;
+ }
+ }
+ }
+
for (auto& rule : rules_copy) {
- pf(dpp, yield, rule, const_cast<rgw_bucket_dir_entry&>(obj));
+ if (rule.needs_tags() &&
+ !obj.is_delete_marker() &&
+ cached_tags_ptr &&
+ !has_all_tags(rule.get_op(), *cached_tags_ptr)) {
+ continue;
+ }
+
+ if (rule.needs_tags() &&
+ !obj.is_delete_marker() &&
+ !cached_tags_ptr) {
+ continue;
+ }
+
+ pf(dpp, yield, rule, const_cast<rgw_bucket_dir_entry&>(obj),
+ cached_tags_ptr);
}
});