RGWFormatterFlusher& flusher,
const DoutPrefixProvider *dpp, optional_yield y)
{
- auto process_f = [dpp](const bucket_instance_ls& lst,
+ auto process_f = [dpp, y](const bucket_instance_ls& lst,
Formatter *formatter,
rgw::sal::Driver* driver){
for (const auto &binfo: lst) {
std::unique_ptr<rgw::sal::Bucket> bucket;
driver->get_bucket(nullptr, binfo, &bucket);
- int ret = bucket->purge_instance(dpp, null_yield);
+ int ret = bucket->purge_instance(dpp, y);
if (ret == 0){
auto md_key = "bucket.instance:" + binfo.bucket.get_key();
- ret = driver->meta_remove(dpp, md_key, null_yield);
+ ret = driver->meta_remove(dpp, md_key, y);
}
formatter->open_object_section("delete_status");
formatter->dump_string("bucket_instance", binfo.bucket.get_key());
Formatter *formatter,
const std::string& tenant_name,
const std::string& bucket_name,
- const DoutPrefixProvider *dpp)
+ const DoutPrefixProvider *dpp, optional_yield y)
{
- int ret = fix_single_bucket_lc(driver, tenant_name, bucket_name, dpp, null_yield);
+ int ret = fix_single_bucket_lc(driver, tenant_name, bucket_name, dpp, y);
format_lc_status(formatter, tenant_name, bucket_name, -ret);
}
int RGWBucketAdminOp::fix_lc_shards(rgw::sal::Driver* driver,
RGWBucketAdminOpState& op_state,
RGWFormatterFlusher& flusher,
- const DoutPrefixProvider *dpp)
+ const DoutPrefixProvider *dpp, optional_yield y)
{
std::string marker;
void *handle;
if (const std::string& bucket_name = op_state.get_bucket_name();
! bucket_name.empty()) {
const rgw_user user_id = op_state.get_user_id();
- process_single_lc_entry(driver, formatter, user_id.tenant, bucket_name, dpp);
+ process_single_lc_entry(driver, formatter, user_id.tenant, bucket_name, dpp, y);
formatter->flush(cout);
} else {
int ret = driver->meta_list_keys_init(dpp, "bucket", marker, &handle);
} if (ret != -ENOENT) {
for (const auto &key:keys) {
auto [tenant_name, bucket_name] = split_tenant(key);
- process_single_lc_entry(driver, formatter, tenant_name, bucket_name, dpp);
+ process_single_lc_entry(driver, formatter, tenant_name, bucket_name, dpp, y);
}
}
formatter->flush(cout); // regularly flush every 1k entries
static int clear_stale_instances(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state,
RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp, optional_yield y);
static int fix_lc_shards(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state,
- RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp);
+ RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp, optional_yield y);
static int fix_obj_expiry(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state,
RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp, optional_yield y, bool dry_run = false);
op.create(false);
const uint64_t queue_size = cct->_conf->rgw_gc_max_queue_size, num_deferred_entries = cct->_conf->rgw_gc_max_deferred;
gc_log_init2(op, queue_size, num_deferred_entries);
- store->gc_operate(this, obj_names[i], &op, null_yield);
+ store->gc_operate(this, obj_names[i], &op, y);
}
}
broken_chain.objs.pop_back();
--it;
ldpp_dout(this, 20) << "RGWGC::send_split_chain - more than, dont add to broken chain and send chain" << dendl;
- auto ret = send_chain(broken_chain, tag, null_yield);
+ auto ret = send_chain(broken_chain, tag, y);
if (ret < 0) {
broken_chain.objs.insert(broken_chain.objs.end(), it, chain.objs.end()); // add all the remainder objs to the list to be deleted inline
ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
}
if (!broken_chain.objs.empty()) { //when the chain is smaller than or equal to rgw_max_chunk_size
ldpp_dout(this, 20) << "RGWGC::send_split_chain - sending leftover objects" << dendl;
- auto ret = send_chain(broken_chain, tag, null_yield);
+ auto ret = send_chain(broken_chain, tag, y);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
return {ret, {broken_chain}};
}
}
} else {
- auto ret = send_chain(chain, tag, null_yield);
+ auto ret = send_chain(chain, tag, y);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
return {ret, {std::move(chain)}};
ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names[i] << "tag is: " << tag << dendl;
- auto ret = store->gc_operate(this, obj_names[i], &op, null_yield);
+ auto ret = store->gc_operate(this, obj_names[i], &op, y);
if (ret != -ECANCELED && ret != -EPERM) {
return ret;
}
ObjectWriteOperation set_entry_op;
cls_rgw_gc_set_entry(set_entry_op, cct->_conf->rgw_gc_obj_min_wait, info);
- return store->gc_operate(this, obj_names[i], &set_entry_op, null_yield);
+ return store->gc_operate(this, obj_names[i], &set_entry_op, y);
}
struct defer_chain_state {
ObjectWriteOperation op;
cls_rgw_gc_queue_remove_entries(op, num_entries);
- return store->gc_operate(this, obj_names[index], &op, null_yield);
+ return store->gc_operate(this, obj_names[index], &op, y);
}
int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
for (int i = 0; i < max_objs; i++) {
int index = (i + start) % max_objs;
- int ret = process(index, max_secs, expired_only, io_manager, null_yield);
+ int ret = process(index, max_secs, expired_only, io_manager, y);
if (ret < 0)
return ret;
}
const RGWPubSub ps(res.store, res.user_tenant);
const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
rgw_pubsub_bucket_topics bucket_topics;
- auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.s->yield);
+ auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
if (rc < 0) {
// failed to fetch bucket topics
return rc;
}
auto& ref = obj.get_ref();
int ret = cls_timeindex_trim_repeat(dpp, ref, oid, utime_t(start_time), utime_t(end_time),
- from_marker, to_marker, null_yield);
+ from_marker, to_marker, y);
if ((ret < 0 ) && (ret != -ENOENT)) {
return ret;
}
real_time rt_to = to.to_real_time();
int ret = exp_store.objexp_hint_trim(dpp, shard, rt_from, rt_to,
- from_marker, to_marker, null_yield);
+ from_marker, to_marker, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR during trim: " << ret << dendl;
}
garbage_chunk(dpp, entries, need_trim);
if (need_trim) {
- trim_chunk(dpp, shard, last_run, round_start, marker, out_marker, null_yield);
+ trim_chunk(dpp, shard, last_run, round_start, marker, out_marker, y);
}
utime_t now = ceph_clock_now();
ldpp_dout(dpp, 20) << "processing shard = " << shard << dendl;
- if (! process_single_shard(dpp, shard, last_run, round_start, null_yield)) {
+ if (! process_single_shard(dpp, shard, last_run, round_start, y)) {
all_done = false;
}
}
continue;
}
- int r = store->delete_raw_obj(dpp, obj, null_yield);
+ int r = store->delete_raw_obj(dpp, obj, y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
}
if (need_to_remove_head) {
std::string version_id;
ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
- int r = store->delete_obj(dpp, obj_ctx, bucket_info, head_obj, 0, 0);
+ int r = store->delete_obj(dpp, obj_ctx, bucket_info, head_obj, 0, y, 0);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
}
if (copy_if_newer) {
/* need to get mtime for destination */
- ret = get_obj_state(dpp, &obj_ctx, dest_bucket_info, stat_dest_obj, &dest_state, &manifest, y, stat_follow_olh, null_yield);
+ ret = get_obj_state(dpp, &obj_ctx, dest_bucket_info, stat_dest_obj, &dest_state, &manifest, stat_follow_olh, y);
if (ret < 0)
goto set_err_state;
if (copy_if_newer && canceled) {
ldpp_dout(dpp, 20) << "raced with another write of obj: " << dest_obj << dendl;
obj_ctx.invalidate(dest_obj); /* object was overwritten */
- ret = get_obj_state(dpp, &obj_ctx, dest_bucket_info, stat_dest_obj, &dest_state, &manifest, y, stat_follow_olh, null_yield);
+ ret = get_obj_state(dpp, &obj_ctx, dest_bucket_info, stat_dest_obj, &dest_state, &manifest, stat_follow_olh, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << ": get_err_state() returned ret=" << ret << dendl;
goto set_err_state;
store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id());
} else {
// use upload id as tag and do it synchronously
- auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id());
+ auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id(), y);
if (ret < 0 && leftover_chain) {
ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
if (ret == -ENOENT) {
}
RGWRados* store = rados_store->getRados();
- int ret = store->process_gc(!include_all);
+ int ret = store->process_gc(!include_all, null_yield);
if (ret < 0) {
cerr << "ERROR: gc processing returned error: " << cpp_strerror(-ret) << std::endl;
return 1;
}
if (opt_cmd == OPT::LC_RESHARD_FIX) {
- ret = RGWBucketAdminOp::fix_lc_shards(driver, bucket_op, stream_flusher, dpp());
+ ret = RGWBucketAdminOp::fix_lc_shards(driver, bucket_op, stream_flusher, dpp(), null_yield);
if (ret < 0) {
cerr << "ERROR: fixing lc shards: " << cpp_strerror(-ret) << std::endl;
}
if (res != -ECANCELED) {
break;
}
- res = s->bucket->try_refresh_info(s, nullptr, null_yield);
+ res = s->bucket->try_refresh_info(s, nullptr, s->yield);
if (res != 0) {
break;
}
return;
}
- op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this] {
+ op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this, y] {
auto sync_policy = (s->bucket->get_info().sync_policy ? *s->bucket->get_info().sync_policy : rgw_sync_policy_info());
for (auto& group : sync_policy_groups) {
s->bucket->get_info().set_sync_policy(std::move(sync_policy));
- int ret = s->bucket->put_info(this, false, real_time(), null_yield);
+ int ret = s->bucket->put_info(this, false, real_time(), y);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: put_bucket_instance_info (bucket=" << s->bucket << ") returned ret=" << ret << dendl;
return ret;
return;
}
- op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this] {
+ op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this, y] {
if (!s->bucket->get_info().sync_policy) {
return 0;
}
s->bucket->get_info().set_sync_policy(std::move(sync_policy));
- int ret = s->bucket->put_info(this, false, real_time(), null_yield);
+ int ret = s->bucket->put_info(this, false, real_time(), y);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: put_bucket_instance_info (bucket=" << s->bucket << ") returned ret=" << ret << dendl;
return ret;
return;
}
- op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this] {
+ op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this, y] {
s->bucket->get_info().has_website = true;
s->bucket->get_info().website_conf = website_conf;
- op_ret = s->bucket->put_info(this, false, real_time(), null_yield);
+ op_ret = s->bucket->put_info(this, false, real_time(), y);
return op_ret;
}, y);
<< "returned err=" << op_ret << dendl;
return;
}
- op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this] {
+ op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this, y] {
s->bucket->get_info().has_website = false;
s->bucket->get_info().website_conf = RGWBucketWebsiteConf();
- op_ret = s->bucket->put_info(this, false, real_time(), null_yield);
+ op_ret = s->bucket->put_info(this, false, real_time(), y);
return op_ret;
}, y);
if (op_ret < 0) {
return;
}
- op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this] {
+ op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this, y] {
s->bucket->get_info().obj_lock = obj_lock;
- op_ret = s->bucket->put_info(this, false, real_time(), null_yield);
+ op_ret = s->bucket->put_info(this, false, real_time(), y);
return op_ret;
}, y);
return;
false,
false,
false,
- false, null_yield,
- true,
+ false,
+ true, null_yield,
false));
if (!store) {
std::cerr << "couldn't init storage provider" << std::endl;
false,
false,
false,
- false,
+ false, null_yield,
false);
ASSERT_NE(driver, nullptr);
return 0;
}
- virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override {
+ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override {
return 0;
}