LCWorker* worker, time_t stop_at, bool once)
{
int ret;
- rgw::sal::Bucket::ListParams params;
+ rgw::sal::Bucket::ListParams params_base;
rgw::sal::Bucket::ListResults results;
- params.list_versions = false;
+ params_base.list_versions = false;
/* lifecycle processing does not depend on total order, so can
* take advantage of unordered listing optimizations--such as
* operating on one shard at a time */
uint64_t threshold = cct->_conf.get_val<uint64_t>("rgw_lc_ordered_list_threshold");
const auto& current_index = target->get_info().layout.current_index;
- params.allow_unordered = should_list_unordered(current_index, threshold);
+ params_base.allow_unordered = should_list_unordered(current_index, threshold);
- params.ns = RGW_OBJ_NS_MULTIPART;
- params.access_list_filter = MultipartMetaFilter;
+ params_base.ns = RGW_OBJ_NS_MULTIPART;
+ params_base.access_list_filter = MultipartMetaFilter;
auto pf = [this, target] (optional_yield y, const lc_op& rule,
const rgw_bucket_dir_entry& obj) {
return ret;
};
- for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
+ std::map<std::string, std::vector<const lc_op*>> grouped_mp_ops;
+ for (auto& prefix_entry : prefix_map) {
+ if (!prefix_entry.second.status || prefix_entry.second.mp_expiration <= 0) {
+ continue;
+ }
+ grouped_mp_ops[prefix_entry.first].push_back(&prefix_entry.second);
+ }
+
+ for (auto prefix_iter = grouped_mp_ops.begin(); prefix_iter != grouped_mp_ops.end();
++prefix_iter) {
if (worker_should_stop(stop_at, once)) {
return 0;
}
- if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
- continue;
- }
+ rgw::sal::Bucket::ListParams params = params_base;
params.prefix = prefix_iter->first;
do {
auto offset = 0;
}
for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter, ++offset) {
- workpool.spawn([pf, op=prefix_iter->second, obj=*obj_iter]
- (boost::asio::yield_context yield) mutable {
- pf(yield, op, obj);
- });
+ const auto obj = *obj_iter;
+ for (auto* op : prefix_iter->second) {
+ workpool.spawn([pf, op, obj]
+ (boost::asio::yield_context yield) mutable {
+ pf(yield, *op, obj);
+ });
+ }
if (going_down()) {
return 0;
}
}
}
} while(results.is_truncated);
- } /* for prefix_map */
+ } /* for grouped_mp_ops */
return 0;
} /* RGWLC::handle_multipart_expiration */
<< prefix_map.size()
<< dendl;
+ std::map<std::string, std::vector<lc_op*>> grouped_ops;
+ for (auto& prefix_entry : prefix_map) {
+ grouped_ops[prefix_entry.first].push_back(&prefix_entry.second);
+ }
+
rgw_obj_key pre_marker;
rgw_obj_key next_marker;
- for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
- ++prefix_iter) {
+ for (auto prefix_iter = grouped_ops.begin(); prefix_iter != grouped_ops.end();
+ ++prefix_iter) {
if (worker_should_stop(stop_at, once)) {
ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker="
return 0;
}
- auto& op = prefix_iter->second;
- if (!is_valid_op(op)) {
- continue;
- }
ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
<< dendl;
- if (prefix_iter != prefix_map.begin() &&
+ if (prefix_iter != grouped_ops.begin() &&
(prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
- prev(prefix_iter)->first) == 0)) {
+ prev(prefix_iter)->first) == 0)) {
next_marker = pre_marker;
} else {
pre_marker = next_marker;
LCObjsLister ol(driver, bucket.get());
ol.set_prefix(prefix_iter->first);
- if (! zone_check(op, zone)) {
- ldpp_dout(this, 7) << "LC rule not executable in " << zone->get_tier_type()
- << " zone, skipping" << dendl;
+ std::vector<lc_op*> active_ops;
+ active_ops.reserve(prefix_iter->second.size());
+
+ for (auto* op : prefix_iter->second) {
+ if (!is_valid_op(*op)) {
+ continue;
+ }
+ if (!zone_check(*op, zone)) {
+ ldpp_dout(this, 7) << "LC rule not executable in " << zone->get_tier_type()
+ << " zone, skipping" << dendl;
+ continue;
+ }
+ active_ops.push_back(op);
+ }
+ if (active_ops.empty()) {
continue;
}
return ret;
}
- op_env oenv(op, driver, worker, bucket.get(), ol);
- LCOpRule orule(oenv);
- orule.build(); // why can't ctor do it?
+ std::vector<LCOpRule> rules;
+ rules.reserve(active_ops.size());
+ for (auto* op : active_ops) {
+ op_env oenv(*op, driver, worker, bucket.get(), ol);
+ rules.emplace_back(oenv);
+ rules.back().build(); // why can't ctor do it?
+ }
+
rgw_bucket_dir_entry* o{nullptr};
for (auto offset = 0; ol.get_obj(this, yield, &o /* , fetch_barrier */); ++offset, ol.next()) {
- orule.update();
- workpool.spawn([&pf, dpp=this, orule, o=*o]
- (boost::asio::yield_context yield) mutable {
- pf(dpp, yield, orule, o);
+ const auto obj = *o;
+ for (auto& rule : rules) {
+ rule.update();
+ workpool.spawn([&pf, dpp=this, rule, obj]
+ (boost::asio::yield_context yield) mutable {
+ pf(dpp, yield, rule, obj);
});
+ }
if ((offset % 100) == 0) {
if (worker_should_stop(stop_at, once)) {
ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker="