list_params.prefix = prefix;
}
- int init(const DoutPrefixProvider *dpp) {
- return fetch(dpp);
+ int init(const DoutPrefixProvider *dpp, optional_yield y) {
+ return fetch(dpp, y);
}
- int fetch(const DoutPrefixProvider *dpp) {
+ int fetch(const DoutPrefixProvider *dpp, optional_yield y) {
CephContext* cct = dpp->get_cct();
int cnt = cct->_conf.get_val<uint64_t>("rgw_lc_list_cnt");
- int ret = bucket->list(dpp, list_params, cnt, list_results, null_yield);
+ int ret = bucket->list(dpp, list_params, cnt, list_results, y);
if (ret < 0) {
return ret;
}
return 0;
}
- void delay() {
- std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+ void delay(const DoutPrefixProvider* dpp) {
+ if (delay_ms) {
+ maybe_warn_about_blocking(dpp);
+ std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+ }
}
- bool get_obj(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry **obj,
+ bool get_obj(const DoutPrefixProvider *dpp, optional_yield y,
+ rgw_bucket_dir_entry **obj,
std::function<void(void)> fetch_barrier
= []() { /* nada */}) {
if (obj_iter == list_results.objs.end()) {
if (!list_results.is_truncated) {
- delay();
+ delay(dpp);
return false;
} else {
fetch_barrier();
list_params.marker = pre_obj.key;
- int ret = fetch(dpp);
+ int ret = fetch(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: list_op returned ret=" << ret
<< dendl;
return false;
}
}
- delay();
+ delay(dpp);
}
if (obj_iter->key.name == pre_obj.key.name) {
int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
const multimap<string, lc_op>& prefix_map,
ceph::async::spawn_throttle& workpool,
+ boost::asio::yield_context yield,
LCWorker* worker, time_t stop_at, bool once)
{
int ret;
rgw::sal::Bucket::ListParams params;
rgw::sal::Bucket::ListResults results;
- auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
params.list_versions = false;
/* lifecycle processing does not depend on total order, so can
* take advantage of unordered listing optimizations--such as
do {
auto offset = 0;
results.objs.clear();
- ret = target->list(this, params, 1000, results, null_yield);
+ ret = target->list(this, params, 1000, results, yield);
if (ret < 0) {
if (ret == (-ENOENT))
return 0;
return 0;
}
}
-
- std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
} while(results.is_truncated);
} /* for prefix_map */
continue;
}
- ret = ol.init(this);
+ ret = ol.init(this, yield);
if (ret < 0) {
if (ret == (-ENOENT))
return 0;
LCOpRule orule(oenv);
orule.build(); // why can't ctor do it?
rgw_bucket_dir_entry* o{nullptr};
- for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) {
+ for (auto offset = 0; ol.get_obj(this, yield, &o /* , fetch_barrier */); ++offset, ol.next()) {
orule.update();
workpool.spawn([&pf, dpp=this, orule, o=*o]
(boost::asio::yield_context yield) mutable {
}
ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool,
- worker, stop_at, once);
+ yield, worker, stop_at, once);
return ret;
}