static string gc_oid_prefix = "gc";
static string gc_index_lock_name = "gc_process";
-void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
+void RGWGC::initialize(CephContext *_cct, RGWRados *_store, optional_yield y) {
cct = _cct;
store = _store;
op.create(false);
const uint64_t queue_size = cct->_conf->rgw_gc_max_queue_size, num_deferred_entries = cct->_conf->rgw_gc_max_deferred;
gc_log_init2(op, queue_size, num_deferred_entries);
- store->gc_operate(this, obj_names[i], &op);
+ store->gc_operate(this, obj_names[i], &op, null_yield);
}
}
return rgw_shards_mod(XXH64(tag.c_str(), tag.size(), seed), max_objs);
}
-std::tuple<int, std::optional<cls_rgw_obj_chain>> RGWGC::send_split_chain(const cls_rgw_obj_chain& chain, const std::string& tag)
+std::tuple<int, std::optional<cls_rgw_obj_chain>> RGWGC::send_split_chain(const cls_rgw_obj_chain& chain, const std::string& tag, optional_yield y)
{
ldpp_dout(this, 20) << "RGWGC::send_split_chain - tag is: " << tag << dendl;
broken_chain.objs.pop_back();
--it;
ldpp_dout(this, 20) << "RGWGC::send_split_chain - more than, dont add to broken chain and send chain" << dendl;
- auto ret = send_chain(broken_chain, tag);
+ auto ret = send_chain(broken_chain, tag, null_yield);
if (ret < 0) {
broken_chain.objs.insert(broken_chain.objs.end(), it, chain.objs.end()); // add all the remainder objs to the list to be deleted inline
ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
}
if (!broken_chain.objs.empty()) { //when the chain is smaller than or equal to rgw_max_chunk_size
ldpp_dout(this, 20) << "RGWGC::send_split_chain - sending leftover objects" << dendl;
- auto ret = send_chain(broken_chain, tag);
+ auto ret = send_chain(broken_chain, tag, null_yield);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
return {ret, {broken_chain}};
}
}
} else {
- auto ret = send_chain(chain, tag);
+ auto ret = send_chain(chain, tag, null_yield);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
return {ret, {std::move(chain)}};
return {0, {}};
}
-int RGWGC::send_chain(const cls_rgw_obj_chain& chain, const string& tag)
+int RGWGC::send_chain(const cls_rgw_obj_chain& chain, const string& tag, optional_yield y)
{
ObjectWriteOperation op;
cls_rgw_gc_obj_info info;
ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names[i] << "tag is: " << tag << dendl;
- auto ret = store->gc_operate(this, obj_names[i], &op);
+ auto ret = store->gc_operate(this, obj_names[i], &op, null_yield);
if (ret != -ECANCELED && ret != -EPERM) {
return ret;
}
ObjectWriteOperation set_entry_op;
cls_rgw_gc_set_entry(set_entry_op, cct->_conf->rgw_gc_obj_min_wait, info);
- return store->gc_operate(this, obj_names[i], &set_entry_op);
+ return store->gc_operate(this, obj_names[i], &set_entry_op, null_yield);
}
struct defer_chain_state {
return ret;
}
-int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
+int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc, optional_yield y)
{
ObjectWriteOperation op;
cls_rgw_gc_remove(op, tags);
return ret;
}
-int RGWGC::remove(int index, int num_entries)
+int RGWGC::remove(int index, int num_entries, optional_yield y)
{
ObjectWriteOperation op;
cls_rgw_gc_queue_remove_entries(op, num_entries);
- return store->gc_operate(this, obj_names[index], &op);
+ return store->gc_operate(this, obj_names[index], &op, null_yield);
}
int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
}
);
- int ret = gc->remove(index, rt, &index_io.c);
+ int ret = gc->remove(index, rt, &index_io.c, null_yield);
if (ret < 0) {
/* we already cleared list of tags, this prevents us from
* ballooning in case of a persistent problem
}
}
- int remove_queue_entries(int index, int num_entries) {
- int ret = gc->remove(index, num_entries);
+ int remove_queue_entries(int index, int num_entries, optional_yield y) {
+ int ret = gc->remove(index, num_entries, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to remove queue entries on index=" <<
index << " ret=" << ret << dendl;
}; // class RGWGCIOManger
int RGWGC::process(int index, int max_secs, bool expired_only,
- RGWGCIOManager& io_manager)
+ RGWGCIOManager& io_manager, optional_yield y)
{
ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
index << ", max_secs=" << max_secs << ", expired_only=" <<
}
//Remove the entries from the queue
ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
- ret = io_manager.remove_queue_entries(index, entries.size());
+ ret = io_manager.remove_queue_entries(index, entries.size(), null_yield);
if (ret < 0) {
ldpp_dout(this, 0) <<
"WARNING: failed to remove queue entries" << dendl;
return 0;
}
-int RGWGC::process(bool expired_only)
+int RGWGC::process(bool expired_only, optional_yield y)
{
int max_secs = cct->_conf->rgw_gc_processor_max_time;
for (int i = 0; i < max_objs; i++) {
int index = (i + start) % max_objs;
- int ret = process(index, max_secs, expired_only, io_manager);
+ int ret = process(index, max_secs, expired_only, io_manager, null_yield);
if (ret < 0)
return ret;
}
do {
utime_t start = ceph_clock_now();
ldpp_dout(dpp, 2) << "garbage collection: start" << dendl;
- int r = gc->process(true);
+ int r = gc->process(true, null_yield);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
}
static constexpr uint64_t seed = 8675309;
int tag_index(const std::string& tag);
- int send_chain(const cls_rgw_obj_chain& chain, const std::string& tag);
+ int send_chain(const cls_rgw_obj_chain& chain, const std::string& tag, optional_yield y);
class GCWorker : public Thread {
const DoutPrefixProvider *dpp;
finalize();
}
std::vector<bool> transitioned_objects_cache;
- std::tuple<int, std::optional<cls_rgw_obj_chain>> send_split_chain(const cls_rgw_obj_chain& chain, const std::string& tag);
+ std::tuple<int, std::optional<cls_rgw_obj_chain>> send_split_chain(const cls_rgw_obj_chain& chain, const std::string& tag, optional_yield y);
// asynchronously defer garbage collection on an object that's still being read
int async_defer_chain(const std::string& tag, const cls_rgw_obj_chain& info);
// callback for when async_defer_chain() fails with ECANCELED
void on_defer_canceled(const cls_rgw_gc_obj_info& info);
- int remove(int index, const std::vector<std::string>& tags, librados::AioCompletion **pc);
- int remove(int index, int num_entries);
+ int remove(int index, const std::vector<std::string>& tags, librados::AioCompletion **pc, optional_yield y);
+ int remove(int index, int num_entries, optional_yield y);
- void initialize(CephContext *_cct, RGWRados *_store);
+ void initialize(CephContext *_cct, RGWRados *_store, optional_yield y);
void finalize();
int list(int *index, std::string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue);
void list_init(int *index) { *index = 0; }
int process(int index, int process_max_secs, bool expired_only,
- RGWGCIOManager& io_manager);
- int process(bool expired_only);
+ RGWGCIOManager& io_manager, optional_yield y);
+ int process(bool expired_only, optional_yield y);
bool going_down();
void start_processor();
librados::ObjectWriteOperation op;
op.remove();
auto part_oid = info.part_oid(j);
- auto subr = rgw_rados_operate(dpp, ioctx, part_oid, &op, null_yield);
+ auto subr = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
if (subr < 0 && subr != -ENOENT) {
if (!ec)
ec = bs::error_code(-subr, bs::system_category());
} else {
op.remove();
}
- r = rgw_rados_operate(dpp, ioctx, oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
if (r < 0 && r != -ENOENT) {
if (!ec)
ec = bs::error_code(-r, bs::system_category());
const utime_t& from_time,
const utime_t& to_time,
const string& from_marker,
- const string& to_marker)
+ const string& to_marker, optional_yield y)
{
bool done = false;
do {
const ceph::real_time& start_time,
const ceph::real_time& end_time,
const string& from_marker,
- const string& to_marker)
+ const string& to_marker, optional_yield y)
{
auto obj = rados_svc->obj(rgw_raw_obj(driver->svc()->zone->get_zone_params().log_pool, oid));
int r = obj.open(dpp);
}
auto& ref = obj.get_ref();
int ret = cls_timeindex_trim_repeat(dpp, ref, oid, utime_t(start_time), utime_t(end_time),
- from_marker, to_marker);
+ from_marker, to_marker, null_yield);
if ((ret < 0 ) && (ret != -ENOENT)) {
return ret;
}
const utime_t& from,
const utime_t& to,
const string& from_marker,
- const string& to_marker)
+ const string& to_marker, optional_yield y)
{
ldpp_dout(dpp, 20) << "trying to trim removal hints to=" << to
<< ", to_marker=" << to_marker << dendl;
real_time rt_to = to.to_real_time();
int ret = exp_store.objexp_hint_trim(dpp, shard, rt_from, rt_to,
- from_marker, to_marker);
+ from_marker, to_marker, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR during trim: " << ret << dendl;
}
bool RGWObjectExpirer::process_single_shard(const DoutPrefixProvider *dpp,
const string& shard,
const utime_t& last_run,
- const utime_t& round_start)
+ const utime_t& round_start, optional_yield y)
{
string marker;
string out_marker;
garbage_chunk(dpp, entries, need_trim);
if (need_trim) {
- trim_chunk(dpp, shard, last_run, round_start, marker, out_marker);
+ trim_chunk(dpp, shard, last_run, round_start, marker, out_marker, null_yield);
}
utime_t now = ceph_clock_now();
/* Returns true if all shards have been processed successfully. */
bool RGWObjectExpirer::inspect_all_shards(const DoutPrefixProvider *dpp,
const utime_t& last_run,
- const utime_t& round_start)
+ const utime_t& round_start, optional_yield y)
{
CephContext * const cct = driver->ctx();
int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
ldpp_dout(dpp, 20) << "processing shard = " << shard << dendl;
- if (! process_single_shard(dpp, shard, last_run, round_start)) {
+ if (! process_single_shard(dpp, shard, last_run, round_start, null_yield)) {
all_done = false;
}
}
do {
utime_t start = ceph_clock_now();
ldpp_dout(this, 2) << "object expiration: start" << dendl;
- if (oe->inspect_all_shards(this, last_run, start)) {
+ if (oe->inspect_all_shards(this, last_run, start, null_yield)) {
/* All shards have been processed properly. Next time we can start
* from this moment. */
last_run = start;
const ceph::real_time& start_time,
const ceph::real_time& end_time,
const std::string& from_marker,
- const std::string& to_marker);
+ const std::string& to_marker, optional_yield y);
};
class RGWObjectExpirer {
const utime_t& from,
const utime_t& to,
const std::string& from_marker,
- const std::string& to_marker);
+ const std::string& to_marker, optional_yield y);
bool process_single_shard(const DoutPrefixProvider *dpp,
const std::string& shard,
const utime_t& last_run,
- const utime_t& round_start);
+ const utime_t& round_start, optional_yield y);
bool inspect_all_shards(const DoutPrefixProvider *dpp,
const utime_t& last_run,
- const utime_t& round_start);
+ const utime_t& round_start, optional_yield y);
bool going_down();
void start_processor();
continue;
}
- int r = store->delete_raw_obj(dpp, obj);
+ int r = store->delete_raw_obj(dpp, obj, null_yield);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
}
* Initialize the RADOS instance and prepare to do other ops
* Returns 0 on success, -ERR# on failure.
*/
-int RGWRados::init_complete(const DoutPrefixProvider *dpp)
+int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
{
int ret;
if (use_gc) {
gc = new RGWGC();
- gc->initialize(cct, this);
+ gc->initialize(cct, this, y);
} else {
ldpp_dout(dpp, 5) << "note: GC not initialized" << dendl;
}
hash = buf;
}
-int RGWRados::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info)
+int RGWRados::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y)
{
uint32_t index = 0;
map<string, rgw_usage_log_info>::iterator liter;
for (liter = log_objs.begin(); liter != log_objs.end(); ++liter) {
- int r = cls_obj_usage_log_add(dpp, liter->first, liter->second);
+ int r = cls_obj_usage_log_add(dpp, liter->first, liter->second, y);
if (r < 0)
return r;
}
return 0;
}
-int RGWRados::trim_usage(const DoutPrefixProvider *dpp, const rgw_user& user, const string& bucket_name, uint64_t start_epoch, uint64_t end_epoch)
+int RGWRados::trim_usage(const DoutPrefixProvider *dpp, const rgw_user& user, const string& bucket_name, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
uint32_t index = 0;
string hash, first_hash;
hash = first_hash;
do {
- int ret = cls_obj_usage_log_trim(dpp, hash, user_str, bucket_name, start_epoch, end_epoch);
+ int ret = cls_obj_usage_log_trim(dpp, hash, user_str, bucket_name, start_epoch, end_epoch, y);
if (ret < 0 && ret != -ENOENT)
return ret;
}
-int RGWRados::clear_usage(const DoutPrefixProvider *dpp)
+int RGWRados::clear_usage(const DoutPrefixProvider *dpp, optional_yield y)
{
auto max_shards = cct->_conf->rgw_usage_max_shards;
int ret=0;
for (unsigned i=0; i < max_shards; i++){
string oid = RGW_USAGE_OBJ_PREFIX + to_string(i);
- ret = cls_obj_usage_log_clear(dpp, oid);
+ ret = cls_obj_usage_log_clear(dpp, oid, y);
if (ret < 0){
ldpp_dout(dpp,0) << "usage clear on oid="<< oid << "failed with ret=" << ret << dendl;
return ret;
* fixes an issue where head objects were supposed to have a locator created, but ended
* up without one
*/
-int RGWRados::fix_head_obj_locator(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, bool copy_obj, bool remove_bad, rgw_obj_key& key)
+int RGWRados::fix_head_obj_locator(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, bool copy_obj, bool remove_bad, rgw_obj_key& key, optional_yield y)
{
const rgw_bucket& bucket = bucket_info.bucket;
string oid;
#define HEAD_SIZE 512 * 1024
op.read(0, HEAD_SIZE, &data, NULL);
- ret = rgw_rados_operate(dpp, ioctx, oid, &op, &data, null_yield);
+ ret = rgw_rados_operate(dpp, ioctx, oid, &op, &data, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: rgw_rados_operate(oid=" << oid << ") returned ret=" << ret << dendl;
return ret;
wop.write(0, data);
ioctx.locator_set_key(locator);
- rgw_rados_operate(dpp, ioctx, oid, &wop, null_yield);
+ rgw_rados_operate(dpp, ioctx, oid, &wop, y);
}
if (remove_bad) {
librados::IoCtx& src_ioctx,
const string& src_oid, const string& src_locator,
librados::IoCtx& dst_ioctx,
- const string& dst_oid, const string& dst_locator)
+ const string& dst_oid, const string& dst_locator, optional_yield y)
{
#define COPY_BUF_SIZE (4 * 1024 * 1024)
mtime = real_clock::from_timespec(mtime_ts);
}
rop.read(ofs, chunk_size, &data, NULL);
- ret = rgw_rados_operate(dpp, src_ioctx, src_oid, &rop, &data, null_yield);
+ ret = rgw_rados_operate(dpp, src_ioctx, src_oid, &rop, &data, y);
if (ret < 0) {
goto done_err;
}
mtime = real_clock::from_timespec(mtime_ts);
}
wop.write(ofs, data);
- ret = rgw_rados_operate(dpp, dst_ioctx, dst_oid, &wop, null_yield);
+ ret = rgw_rados_operate(dpp, dst_ioctx, dst_oid, &wop, y);
if (ret < 0) {
goto done_err;
}
*need_fix = true;
}
if (fix) {
- r = move_rados_obj(dpp, src_ioctx, oid, bad_loc, ioctx, oid, locator);
+ r = move_rados_obj(dpp, src_ioctx, oid, bad_loc, ioctx, oid, locator, y);
if (r < 0) {
ldpp_dout(dpp, -1) << "ERROR: copy_rados_obj() on oid=" << oid << " returned r=" << r << dendl;
}
epoch = ioctx.get_last_version();
poolid = ioctx.get_id();
- r = target->complete_atomic_modification(dpp);
+ r = target->complete_atomic_modification(dpp, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: complete_atomic_modification returned r=" << r << dendl;
}
return 0;
}
-int RGWRados::Object::complete_atomic_modification(const DoutPrefixProvider *dpp)
+int RGWRados::Object::complete_atomic_modification(const DoutPrefixProvider *dpp, optional_yield y)
{
if ((!manifest)|| state->keep_tail)
return 0;
//Delete objects inline just in case gc hasn't been initialised, prevents crashes
store->delete_objs_inline(dpp, chain, tag);
} else {
- auto [ret, leftover_chain] = store->gc->send_split_chain(chain, tag); // do it synchronously
+ auto [ret, leftover_chain] = store->gc->send_split_chain(chain, tag, y); // do it synchronously
if (ret < 0 && leftover_chain) {
//Delete objects inline if send chain to gc fails
store->delete_objs_inline(dpp, *leftover_chain, tag);
}
}
-std::tuple<int, std::optional<cls_rgw_obj_chain>> RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag)
+std::tuple<int, std::optional<cls_rgw_obj_chain>> RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, optional_yield y)
{
if (chain.empty()) {
return {0, std::nullopt};
}
- return gc->send_split_chain(chain, tag);
+ return gc->send_split_chain(chain, tag, y);
}
void RGWRados::delete_objs_inline(const DoutPrefixProvider *dpp, cls_rgw_obj_chain& chain, const string& tag)
}
r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y);
- int ret = target->complete_atomic_modification(dpp);
+ int ret = target->complete_atomic_modification(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: complete_atomic_modification returned ret=" << ret << dendl;
}
return del_op.delete_obj(null_yield, dpp);
}
-int RGWRados::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
+int RGWRados::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
{
rgw_rados_ref ref;
int r = get_raw_obj_ref(dpp, obj, &ref);
ObjectWriteOperation op;
op.remove();
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
if (r < 0)
return r;
ceph_assert(olh_state->is_olh);
rgw_obj target;
- int r = RGWRados::follow_olh(dpp, bucket_info, obj_ctx, olh_state, obj, &target); /* might return -EAGAIN */
+ int r = RGWRados::follow_olh(dpp, bucket_info, obj_ctx, olh_state, obj, &target, y); /* might return -EAGAIN */
if (r < 0) {
return r;
}
struct timespec mtime_ts = real_clock::to_timespec(mtime);
op.mtime2(&mtime_ts);
auto& ioctx = ref.pool.ioctx();
- r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
if (state) {
if (r >= 0) {
bufferlist acl_bl;
return 0;
}
-int RGWRados::obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, ObjectWriteOperation *op)
+int RGWRados::obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, ObjectWriteOperation *op, optional_yield y)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(dpp, bucket_info, obj, &ref);
return r;
}
- return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, op, null_yield);
+ return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, op, y);
}
-int RGWRados::obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, ObjectReadOperation *op)
+int RGWRados::obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, ObjectReadOperation *op, optional_yield y)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(dpp, bucket_info, obj, &ref);
bufferlist outbl;
- return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, op, &outbl, null_yield);
+ return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, op, &outbl, y);
}
-int RGWRados::olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag)
+int RGWRados::olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag, optional_yield y)
{
ObjectWriteOperation op;
op.setxattr(attr_name.c_str(), bl);
- int ret = obj_operate(dpp, bucket_info, olh_obj, &op);
+ int ret = obj_operate(dpp, bucket_info, olh_obj, &op, y);
if (ret < 0) {
return ret;
}
return 0;
}
-int RGWRados::olh_init_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj, string *op_tag)
+int RGWRados::olh_init_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj, string *op_tag, optional_yield y)
{
int ret;
- ret = olh_init_modification_impl(dpp, bucket_info, state, obj, op_tag);
+ ret = olh_init_modification_impl(dpp, bucket_info, state, obj, op_tag, y);
if (ret == -EEXIST) {
ret = -ECANCELED;
}
delete_marker, op_tag, meta, olh_epoch,
unmod_since, high_precision_time,
svc.zone->need_to_log_data(), zones_trace);
- return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
});
if (r < 0) {
ldpp_dout(dpp, 20) << "rgw_rados_operate() after cls_rgw_bucket_link_olh() returned r=" << r << dendl;
RGWBucketInfo& bucket_info,
const rgw_obj& obj_instance,
const string& op_tag, const string& olh_tag,
- uint64_t olh_epoch, rgw_zone_set *_zones_trace)
+ uint64_t olh_epoch, optional_yield y, rgw_zone_set *_zones_trace)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(dpp, bucket_info, obj_instance, &ref);
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_unlink_instance(op, key, op_tag,
olh_tag, olh_epoch, svc.zone->need_to_log_data(), zones_trace);
- return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
});
if (r < 0) {
ldpp_dout(dpp, 20) << "rgw_rados_operate() after cls_rgw_bucket_link_instance() returned r=" << r << dendl;
RGWBucketInfo& bucket_info, RGWObjState& state,
const rgw_obj& obj_instance, uint64_t ver_marker,
std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> > *log,
- bool *is_truncated)
+ bool *is_truncated, optional_yield y)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(dpp, bucket_info, obj_instance, &ref);
int op_ret = 0;
cls_rgw_get_olh_log(op, key, ver_marker, olh_tag, log_ret, op_ret);
bufferlist outbl;
- r = rgw_rados_operate(dpp, shard_ref.pool.ioctx(), shard_ref.obj.oid, &op, &outbl, null_yield);
+ r = rgw_rados_operate(dpp, shard_ref.pool.ioctx(), shard_ref.obj.oid, &op, &outbl, y);
if (r < 0) {
return r;
}
// olh_tag mismatch. this attempts to detect this case and reconstruct the OLH
// attributes from the bucket index. see http://tracker.ceph.com/issues/37792
int RGWRados::repair_olh(const DoutPrefixProvider *dpp, RGWObjState* state, const RGWBucketInfo& bucket_info,
- const rgw_obj& obj)
+ const rgw_obj& obj, optional_yield y)
{
// fetch the current olh entry from the bucket index
rgw_bucket_olh_entry olh;
if (r < 0) {
return r;
}
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "repair_olh failed to write olh attributes with "
<< cpp_strerror(r) << dendl;
int RGWRados::bucket_index_clear_olh(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
RGWObjState& state,
- const rgw_obj& obj_instance)
+ const rgw_obj& obj_instance, optional_yield y)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(dpp, bucket_info, obj_instance, &ref);
auto& ref = pbs->bucket_obj.get_ref();
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_clear_olh(op, key, olh_tag);
- return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
});
if (ret < 0) {
ldpp_dout(dpp, 5) << "rgw_rados_operate() after cls_rgw_clear_olh() returned ret=" << ret << dendl;
bufferlist& olh_tag,
std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> >& log,
uint64_t *plast_ver,
- rgw_zone_set* zones_trace)
+ optional_yield y, rgw_zone_set* zones_trace)
{
if (log.empty()) {
return 0;
}
/* update olh object */
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: could not apply olh update, r=" << r << dendl;
return r;
cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true); /* fail if found one of these, pending modification */
rm_op.remove();
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &rm_op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &rm_op, y);
if (r == -ECANCELED) {
return 0; /* someone else won this race */
} else {
/*
* only clear if was successful, otherwise we might clobber pending operations on this object
*/
- r = bucket_index_clear_olh(dpp, bucket_info, state, obj);
+ r = bucket_index_clear_olh(dpp, bucket_info, state, obj, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: could not clear bucket index olh entries r=" << r << dendl;
return r;
/*
* read olh log and apply it
*/
-int RGWRados::update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace)
+int RGWRados::update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y, rgw_zone_set *zones_trace)
{
map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
bool is_truncated;
uint64_t ver_marker = 0;
do {
- int ret = bucket_index_read_olh_log(dpp, bucket_info, *state, obj, ver_marker, &log, &is_truncated);
+ int ret = bucket_index_read_olh_log(dpp, bucket_info, *state, obj, ver_marker, &log, &is_truncated, y);
if (ret < 0) {
return ret;
}
- ret = apply_olh_log(dpp, obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker, zones_trace);
+ ret = apply_olh_log(dpp, obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker, y, zones_trace);
if (ret < 0) {
return ret;
}
return ret;
}
- ret = olh_init_modification(dpp, bucket_info, *state, olh_obj, &op_tag);
+ ret = olh_init_modification(dpp, bucket_info, *state, olh_obj, &op_tag, y);
if (ret < 0) {
ldpp_dout(dpp, 20) << "olh_init_modification() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
if (ret == -ECANCELED) {
if (ret == -ECANCELED) {
// the bucket index rejected the link_olh() due to olh tag mismatch;
// attempt to reconstruct olh head attributes based on the bucket index
- int r2 = repair_olh(dpp, state, bucket_info, olh_obj);
+ int r2 = repair_olh(dpp, state, bucket_info, olh_obj, y);
if (r2 < 0 && r2 != -ECANCELED) {
return r2;
}
return -EIO;
}
- ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj);
+ ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y);
if (ret == -ECANCELED) { /* already did what we needed, no need to retry, raced with another user */
ret = 0;
}
if (ret < 0)
return ret;
- ret = olh_init_modification(dpp, bucket_info, *state, olh_obj, &op_tag);
+ ret = olh_init_modification(dpp, bucket_info, *state, olh_obj, &op_tag, y);
if (ret < 0) {
ldpp_dout(dpp, 20) << "olh_init_modification() target_obj=" << target_obj << " returned " << ret << dendl;
if (ret == -ECANCELED) {
string olh_tag(state->olh_tag.c_str(), state->olh_tag.length());
- ret = bucket_index_unlink_instance(dpp, bucket_info, target_obj, op_tag, olh_tag, olh_epoch, zones_trace);
+ ret = bucket_index_unlink_instance(dpp, bucket_info, target_obj, op_tag, olh_tag, olh_epoch, y, zones_trace);
if (ret < 0) {
ldpp_dout(dpp, 20) << "bucket_index_unlink_instance() target_obj=" << target_obj << " returned " << ret << dendl;
if (ret == -ECANCELED) {
return -EIO;
}
- ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, zones_trace);
+ ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y, zones_trace);
if (ret == -ECANCELED) { /* already did what we needed, no need to retry, raced with another user */
return 0;
}
gen_rand_obj_instance_name(&target_obj->key);
}
-int RGWRados::get_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh)
+int RGWRados::get_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh, optional_yield y)
{
map<string, bufferlist> attrset;
ObjectReadOperation op;
op.getxattrs(&attrset, NULL);
- int r = obj_operate(dpp, bucket_info, obj, &op);
+ int r = obj_operate(dpp, bucket_info, obj, &op, y);
if (r < 0) {
return r;
}
}
}
-int RGWRados::remove_olh_pending_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, map<string, bufferlist>& pending_attrs)
+int RGWRados::remove_olh_pending_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, map<string, bufferlist>& pending_attrs, optional_yield y)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(dpp, bucket_info, olh_obj, &ref);
op.rmxattr(i->first.c_str());
}
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
if (r == -ENOENT || r == -ECANCELED) {
/* raced with some other change, shouldn't sweat about it */
return 0;
return 0;
}
-int RGWRados::follow_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjectCtx& obj_ctx, RGWObjState *state, const rgw_obj& olh_obj, rgw_obj *target)
+int RGWRados::follow_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjectCtx& obj_ctx, RGWObjState *state, const rgw_obj& olh_obj, rgw_obj *target, optional_yield y)
{
map<string, bufferlist> pending_entries;
rgw_filter_attrset(state->attrset, RGW_ATTR_OLH_PENDING_PREFIX, &pending_entries);
check_pending_olh_entries(dpp, pending_entries, &rm_pending_entries);
if (!rm_pending_entries.empty()) {
- int ret = remove_olh_pending_entries(dpp, bucket_info, *state, olh_obj, rm_pending_entries);
+ int ret = remove_olh_pending_entries(dpp, bucket_info, *state, olh_obj, rm_pending_entries, y);
if (ret < 0) {
ldpp_dout(dpp, 20) << "ERROR: rm_pending_entries returned ret=" << ret << dendl;
return ret;
if (!pending_entries.empty()) {
ldpp_dout(dpp, 20) << __func__ << "(): found pending entries, need to update_olh() on bucket=" << olh_obj.bucket << dendl;
- int ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj);
+ int ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y);
if (ret < 0) {
if (ret == -ECANCELED) {
// In this context, ECANCELED means that the OLH tag changed in either the bucket index entry or the OLH object.
return 0;
}
-int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op)
+int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op, optional_yield y)
{
- return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, null_yield);
+ return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, y);
}
int RGWRados::gc_aio_operate(const string& oid, librados::AioCompletion *c,
return gc_pool_ctx.aio_operate(oid, c, op);
}
-int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectReadOperation *op, bufferlist *pbl)
+int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectReadOperation *op, bufferlist *pbl, optional_yield y)
{
- return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, pbl, null_yield);
+ return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, pbl, y);
}
int RGWRados::list_gc_objs(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
return gc->list(index, marker, max, expired_only, result, truncated, processing_queue);
}
-int RGWRados::process_gc(bool expired_only)
+int RGWRados::process_gc(bool expired_only, optional_yield y)
{
- return gc->process(expired_only);
+ return gc->process(expired_only, y);
}
int RGWRados::list_lc_progress(string& marker, uint32_t max_entries,
return ret;
}
-bool RGWRados::process_expire_objects(const DoutPrefixProvider *dpp)
+bool RGWRados::process_expire_objects(const DoutPrefixProvider *dpp, optional_yield y)
{
- return obj_expirer->inspect_all_shards(dpp, utime_t(), ceph_clock_now());
+ return obj_expirer->inspect_all_shards(dpp, utime_t(), ceph_clock_now(), y);
}
int RGWRados::cls_obj_prepare_op(const DoutPrefixProvider *dpp, BucketShard& bs, RGWModifyOp op, string& tag,
cls_rgw_bucket_list_op(op, marker, prefix, empty_delimiter,
num_entries,
list_versions, &result);
- r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, null_yield);
+ r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
": error in rgw_rados_operate (bucket list op), r=" << r << dendl;
int RGWRados::cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const string& oid,
- rgw_usage_log_info& info)
+ rgw_usage_log_info& info, optional_yield y)
{
rgw_raw_obj obj(svc.zone->get_zone_params().usage_log_pool, oid);
ObjectWriteOperation op;
cls_rgw_usage_log_add(op, info);
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
return r;
}
return r;
}
-static int cls_rgw_usage_log_trim_repeat(const DoutPrefixProvider *dpp, rgw_rados_ref ref, const string& user, const string& bucket, uint64_t start_epoch, uint64_t end_epoch)
+static int cls_rgw_usage_log_trim_repeat(const DoutPrefixProvider *dpp, rgw_rados_ref ref, const string& user, const string& bucket, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
bool done = false;
do {
librados::ObjectWriteOperation op;
cls_rgw_usage_log_trim(op, user, bucket, start_epoch, end_epoch);
- int r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ int r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
if (r == -ENODATA)
done = true;
else if (r < 0)
}
int RGWRados::cls_obj_usage_log_trim(const DoutPrefixProvider *dpp, const string& oid, const string& user, const string& bucket,
- uint64_t start_epoch, uint64_t end_epoch)
+ uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
rgw_raw_obj obj(svc.zone->get_zone_params().usage_log_pool, oid);
return r;
}
- r = cls_rgw_usage_log_trim_repeat(dpp, ref, user, bucket, start_epoch, end_epoch);
+ r = cls_rgw_usage_log_trim_repeat(dpp, ref, user, bucket, start_epoch, end_epoch, y);
return r;
}
-int RGWRados::cls_obj_usage_log_clear(const DoutPrefixProvider *dpp, string& oid)
+int RGWRados::cls_obj_usage_log_clear(const DoutPrefixProvider *dpp, string& oid, optional_yield y)
{
rgw_raw_obj obj(svc.zone->get_zone_params().usage_log_pool, oid);
}
librados::ObjectWriteOperation op;
cls_rgw_usage_log_clear(op);
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
return r;
}
int RGWRados::check_bucket_shards(const RGWBucketInfo& bucket_info,
const rgw_bucket& bucket,
uint64_t num_objs,
- const DoutPrefixProvider *dpp)
+ const DoutPrefixProvider *dpp, optional_yield y)
{
if (! cct->_conf.get_val<bool>("rgw_dynamic_resharding")) {
return 0;
"; new num shards " << final_num_shards << " (suggested " <<
suggested_num_shards << ")" << dendl;
- return add_bucket_to_reshard(dpp, bucket_info, final_num_shards);
+ return add_bucket_to_reshard(dpp, bucket_info, final_num_shards, y);
}
-int RGWRados::add_bucket_to_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, uint32_t new_num_shards)
+int RGWRados::add_bucket_to_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, uint32_t new_num_shards, optional_yield y)
{
RGWReshard reshard(this->driver, dpp);
entry.old_num_shards = num_source_shards;
entry.new_num_shards = new_num_shards;
- return reshard.add(dpp, entry);
+ return reshard.add(dpp, entry, y);
}
int RGWRados::check_quota(const DoutPrefixProvider *dpp, const rgw_user& bucket_owner, rgw_bucket& bucket,
int init_ctl(const DoutPrefixProvider *dpp);
virtual int init_rados();
int init_begin(const DoutPrefixProvider *dpp);
- int init_complete(const DoutPrefixProvider *dpp);
+ int init_complete(const DoutPrefixProvider *dpp, optional_yield y);
void finalize();
int register_to_service_map(const DoutPrefixProvider *dpp, const std::string& daemon_type, const std::map<std::string, std::string>& meta);
int log_show_next(const DoutPrefixProvider *dpp, RGWAccessHandle handle, rgw_log_entry *entry);
// log bandwidth info
- int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info);
+ int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y);
int read_usage(const DoutPrefixProvider *dpp, const rgw_user& user, const std::string& bucket_name, uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool *is_truncated, RGWUsageIter& read_iter, std::map<rgw_user_bucket,
rgw_usage_log_entry>& usage);
- int trim_usage(const DoutPrefixProvider *dpp, const rgw_user& user, const std::string& bucket_name, uint64_t start_epoch, uint64_t end_epoch);
- int clear_usage(const DoutPrefixProvider *dpp);
+ int trim_usage(const DoutPrefixProvider *dpp, const rgw_user& user, const std::string& bucket_name, uint64_t start_epoch, uint64_t end_epoch, optional_yield y);
+ int clear_usage(const DoutPrefixProvider *dpp, optional_yield y);
int create_pool(const DoutPrefixProvider *dpp, const rgw_pool& pool);
int prepare_atomic_modification(const DoutPrefixProvider *dpp, librados::ObjectWriteOperation& op, bool reset_obj, const std::string *ptag,
const char *ifmatch, const char *ifnomatch, bool removal_op, bool modify_tail, optional_yield y);
- int complete_atomic_modification(const DoutPrefixProvider *dpp);
+ int complete_atomic_modification(const DoutPrefixProvider *dpp, optional_yield y);
public:
Object(RGWRados *_store, const RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
const ceph::real_time& expiration_time = ceph::real_time(),
rgw_zone_set *zones_trace = nullptr);
- int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj);
+ int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y);
/** Remove an object from the bucket index */
int delete_obj_index(const rgw_obj& obj, ceph::real_time mtime,
std::map<std::string, bufferlist> *attrs, bufferlist *first_chunk,
RGWObjVersionTracker *objv_tracker, optional_yield y);
- int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op);
- int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);
+ int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op, optional_yield y);
+ int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op, optional_yield y);
int guard_reshard(const DoutPrefixProvider *dpp,
BucketShard *bs,
const DoutPrefixProvider *dpp);
void bucket_index_guard_olh_op(const DoutPrefixProvider *dpp, RGWObjState& olh_state, librados::ObjectOperation& op);
- int olh_init_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::string *op_tag);
- int olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::string *op_tag);
+ int olh_init_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::string *op_tag, optional_yield y);
+ int olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::string *op_tag, optional_yield y);
int bucket_index_link_olh(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info, RGWObjState& olh_state,
const rgw_obj& obj_instance, bool delete_marker,
RGWBucketInfo& bucket_info,
const rgw_obj& obj_instance,
const std::string& op_tag, const std::string& olh_tag,
- uint64_t olh_epoch, rgw_zone_set *zones_trace = nullptr);
+ uint64_t olh_epoch, optional_yield y, rgw_zone_set *zones_trace = nullptr);
int bucket_index_read_olh_log(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info, RGWObjState& state,
const rgw_obj& obj_instance, uint64_t ver_marker,
- std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
+ std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> > *log, bool *is_truncated, optional_yield y);
int bucket_index_trim_olh_log(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& obj_state, const rgw_obj& obj_instance, uint64_t ver);
- int bucket_index_clear_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance);
+ int bucket_index_clear_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance, optional_yield y);
int apply_olh_log(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState& obj_state, RGWBucketInfo& bucket_info, const rgw_obj& obj,
bufferlist& obj_tag, std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> >& log,
- uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr);
- int update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr);
+ uint64_t *plast_ver, optional_yield y, rgw_zone_set *zones_trace = nullptr);
+ int update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y, rgw_zone_set *zones_trace = nullptr);
int set_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time,
optional_yield y, rgw_zone_set *zones_trace = nullptr, bool log_data_change = false);
int repair_olh(const DoutPrefixProvider *dpp, RGWObjState* state, const RGWBucketInfo& bucket_info,
- const rgw_obj& obj);
+ const rgw_obj& obj, optional_yield y);
int unlink_obj_instance(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
uint64_t olh_epoch, optional_yield y, rgw_zone_set *zones_trace = nullptr);
void check_pending_olh_entries(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist>& pending_entries, std::map<std::string, bufferlist> *rm_pending_entries);
- int remove_olh_pending_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::map<std::string, bufferlist>& pending_attrs);
- int follow_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjectCtx& ctx, RGWObjState *state, const rgw_obj& olh_obj, rgw_obj *target);
- int get_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh);
+ int remove_olh_pending_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::map<std::string, bufferlist>& pending_attrs, optional_yield y);
+ int follow_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, RGWObjectCtx& ctx, RGWObjState *state, const rgw_obj& olh_obj, rgw_obj *target, optional_yield y);
+ int get_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh, optional_yield y);
void gen_rand_obj_instance_name(rgw_obj_key *target_key);
void gen_rand_obj_instance_name(rgw_obj *target);
std::list<rgw_cls_bi_entry> *entries, bool *is_truncated);
int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs);
- int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info);
+ int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y);
int cls_obj_usage_log_read(const DoutPrefixProvider *dpp, const std::string& oid, const std::string& user, const std::string& bucket, uint64_t start_epoch,
uint64_t end_epoch, uint32_t max_entries, std::string& read_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage, bool *is_truncated);
int cls_obj_usage_log_trim(const DoutPrefixProvider *dpp, const std::string& oid, const std::string& user, const std::string& bucket, uint64_t start_epoch,
- uint64_t end_epoch);
- int cls_obj_usage_log_clear(const DoutPrefixProvider *dpp, std::string& oid);
+ uint64_t end_epoch, optional_yield y);
+ int cls_obj_usage_log_clear(const DoutPrefixProvider *dpp, std::string& oid, optional_yield y);
int get_target_shard_id(const rgw::bucket_index_normal_layout& layout, const std::string& obj_key, int *shard_id);
int unlock(const rgw_pool& pool, const std::string& oid, rgw_zone_id& zone_id, std::string& owner_id);
void update_gc_chain(const DoutPrefixProvider *dpp, rgw_obj head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain);
- std::tuple<int, std::optional<cls_rgw_obj_chain>> send_chain_to_gc(cls_rgw_obj_chain& chain, const std::string& tag);
+ std::tuple<int, std::optional<cls_rgw_obj_chain>> send_chain_to_gc(cls_rgw_obj_chain& chain, const std::string& tag, optional_yield y);
void delete_objs_inline(const DoutPrefixProvider *dpp, cls_rgw_obj_chain& chain, const std::string& tag);
- int gc_operate(const DoutPrefixProvider *dpp, std::string& oid, librados::ObjectWriteOperation *op);
+ int gc_operate(const DoutPrefixProvider *dpp, std::string& oid, librados::ObjectWriteOperation *op, optional_yield y);
int gc_aio_operate(const std::string& oid, librados::AioCompletion *c,
librados::ObjectWriteOperation *op);
- int gc_operate(const DoutPrefixProvider *dpp, std::string& oid, librados::ObjectReadOperation *op, bufferlist *pbl);
+ int gc_operate(const DoutPrefixProvider *dpp, std::string& oid, librados::ObjectReadOperation *op, bufferlist *pbl, optional_yield y);
int list_gc_objs(int *index, std::string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue);
- int process_gc(bool expired_only);
- bool process_expire_objects(const DoutPrefixProvider *dpp);
+ int process_gc(bool expired_only, optional_yield y);
+ bool process_expire_objects(const DoutPrefixProvider *dpp, optional_yield y);
int defer_gc(const DoutPrefixProvider *dpp, RGWObjectCtx* ctx, RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y);
int process_lc(const std::unique_ptr<rgw::sal::Bucket>& optional_bucket);
librados::IoCtx& src_ioctx,
const std::string& src_oid, const std::string& src_locator,
librados::IoCtx& dst_ioctx,
- const std::string& dst_oid, const std::string& dst_locator);
- int fix_head_obj_locator(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, bool copy_obj, bool remove_bad, rgw_obj_key& key);
+ const std::string& dst_oid, const std::string& dst_locator, optional_yield y);
+ int fix_head_obj_locator(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, bool copy_obj, bool remove_bad, rgw_obj_key& key, optional_yield y);
int fix_tail_obj_locator(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info,
rgw_obj_key& key, bool fix, bool *need_fix, optional_yield y);
optional_yield y, bool check_size_only = false);
int check_bucket_shards(const RGWBucketInfo& bucket_info, const rgw_bucket& bucket,
- uint64_t num_objs, const DoutPrefixProvider *dpp);
+ uint64_t num_objs, const DoutPrefixProvider *dpp, optional_yield y);
- int add_bucket_to_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, uint32_t new_num_shards);
+ int add_bucket_to_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, uint32_t new_num_shards, optional_yield y);
uint64_t instance_id();
int RGWBucketReshard::execute(int num_shards,
ReshardFaultInjector& fault,
int max_op_entries,
- const DoutPrefixProvider *dpp,
+ const DoutPrefixProvider *dpp, optional_yield y,
bool verbose, ostream *out,
Formatter *formatter,
RGWReshard* reshard_log)
auto unlock = make_scope_guard([this] { reshard_lock.unlock(); });
if (reshard_log) {
- ret = reshard_log->update(dpp, bucket_info);
+ ret = reshard_log->update(dpp, bucket_info, y);
if (ret < 0) {
return ret;
}
get_logshard_oid(int(sid), oid);
}
-int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
+int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, optional_yield y)
{
if (!store->svc()->zone->can_reshard()) {
ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
librados::ObjectWriteOperation op;
cls_rgw_reshard_add(op, entry);
- int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
+ int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
return ret;
return 0;
}
-int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info)
+int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y)
{
cls_rgw_reshard_entry entry;
entry.bucket_name = bucket_info.bucket.name;
return ret;
}
- ret = add(dpp, entry);
+ ret = add(dpp, entry, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
cpp_strerror(-ret) << dendl;
return 0;
}
-int RGWReshard::remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry)
+int RGWReshard::remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry, optional_yield y)
{
string logshard_oid;
librados::ObjectWriteOperation op;
cls_rgw_reshard_remove(op, entry);
- int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
+ int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
return ret;
}
int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry,
- int max_entries, const DoutPrefixProvider *dpp)
+ int max_entries, const DoutPrefixProvider *dpp, optional_yield y)
{
ldpp_dout(dpp, 20) << __func__ << " resharding " <<
entry.bucket_name << dendl;
": removing reshard queue entry for a resharded or non-existent bucket" <<
entry.bucket_name << dendl;
- ret = remove(dpp, entry);
+ ret = remove(dpp, entry, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ <<
": Error removing non-existent bucket " <<
ldpp_dout(dpp, 1) << "Bucket " << bucket_info.bucket << " is not "
"eligible for resharding until peer zones finish syncing one "
"or more of its old log generations" << dendl;
- return remove(dpp, entry);
+ return remove(dpp, entry, y);
}
RGWBucketReshard br(store, bucket_info, bucket_attrs, nullptr);
ReshardFaultInjector f; // no fault injected
- ret = br.execute(entry.new_num_shards, f, max_entries, dpp,
+ ret = br.execute(entry.new_num_shards, f, max_entries, dpp, y,
false, nullptr, nullptr, this);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ <<
" removing reshard queue entry for bucket " << entry.bucket_name <<
dendl;
- ret = remove(dpp, entry);
+ ret = remove(dpp, entry, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << ": Error removing bucket " <<
entry.bucket_name << " from resharding queue: " <<
return 0;
}
-int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp)
+int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp, optional_yield y)
{
string marker;
bool truncated = true;
}
for(auto& entry: entries) { // logshard entries
- process_entry(entry, max_entries, dpp);
+ process_entry(entry, max_entries, dpp, y);
if (ret < 0) {
return ret;
}
*logshard = objname + buf;
}
-int RGWReshard::process_all_logshards(const DoutPrefixProvider *dpp)
+int RGWReshard::process_all_logshards(const DoutPrefixProvider *dpp, optional_yield y)
{
int ret = 0;
ldpp_dout(dpp, 20) << "processing logshard = " << logshard << dendl;
- ret = process_single_logshard(i, dpp);
+ ret = process_single_logshard(i, dpp, y);
ldpp_dout(dpp, 20) << "finish processing logshard = " << logshard << " , ret = " << ret << dendl;
}
void *RGWReshard::ReshardWorker::entry() {
do {
utime_t start = ceph_clock_now();
- reshard->process_all_logshards(this);
+ reshard->process_all_logshards(this, null_yield);
if (reshard->going_down())
break;
const std::map<std::string, bufferlist>& _bucket_attrs,
RGWBucketReshardLock* _outer_reshard_lock);
int execute(int num_shards, ReshardFaultInjector& f,
- int max_op_entries, const DoutPrefixProvider *dpp,
+ int max_op_entries, const DoutPrefixProvider *dpp, optional_yield y,
bool verbose = false, std::ostream *out = nullptr,
ceph::Formatter *formatter = nullptr,
RGWReshard *reshard_log = nullptr);
public:
RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr);
- int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
- int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info);
+ int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, optional_yield y);
+ int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y);
int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
- int remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry);
+ int remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry, optional_yield y);
int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
int clear_bucket_resharding(const DoutPrefixProvider *dpp, const std::string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
/* reshard thread */
int process_entry(const cls_rgw_reshard_entry& entry, int max_entries,
- const DoutPrefixProvider *dpp);
- int process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp);
- int process_all_logshards(const DoutPrefixProvider *dpp);
+ const DoutPrefixProvider *dpp, optional_yield y);
+ int process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp, optional_yield y);
+ int process_all_logshards(const DoutPrefixProvider *dpp, optional_yield y);
bool going_down();
void start_processor();
void stop_processor();
usage_iter, usage);
}
-int RadosUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
+int RadosUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
std::string bucket_name;
- return store->getRados()->trim_usage(dpp, get_id(), bucket_name, start_epoch, end_epoch);
+ return store->getRados()->trim_usage(dpp, get_id(), bucket_name, start_epoch, end_epoch, y);
}
int RadosUser::load_user(const DoutPrefixProvider* dpp, optional_yield y)
}
} while(results.is_truncated);
- ret = abort_multiparts(dpp, store->ctx());
+ ret = abort_multiparts(dpp, store->ctx(), y);
if (ret < 0) {
return ret;
}
if (ret < 0)
return ret;
- ret = abort_multiparts(dpp, cct);
+ ret = abort_multiparts(dpp, cct, y);
if (ret < 0) {
return ret;
}
return 0;
}
-int RadosBucket::check_bucket_shards(const DoutPrefixProvider* dpp)
+int RadosBucket::check_bucket_shards(const DoutPrefixProvider* dpp, optional_yield y)
{
- return store->getRados()->check_bucket_shards(info, info.bucket, get_count(), dpp);
+ return store->getRados()->check_bucket_shards(info, info.bucket, get_count(), dpp, y);
}
int RadosBucket::link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint, RGWObjVersionTracker* objv)
usage_iter, usage);
}
-int RadosBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
+int RadosBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
- return store->getRados()->trim_usage(dpp, owner->get_id(), get_name(), start_epoch, end_epoch);
+ return store->getRados()->trim_usage(dpp, owner->get_id(), get_name(), start_epoch, end_epoch, y);
}
int RadosBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink)
}
int RadosBucket::abort_multiparts(const DoutPrefixProvider* dpp,
- CephContext* cct)
+ CephContext* cct, optional_yield y)
{
constexpr int max = 1000;
int ret, num_deleted = 0;
if (!uploads.empty()) {
for (const auto& upload : uploads) {
- ret = upload->abort(dpp, cct);
+ ret = upload->abort(dpp, cct, y);
if (ret < 0) {
// we're doing a best-effort; if something cannot be found,
// log it and keep moving forward
objv_tracker, y);
}
-int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
+int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
{
- return rados->delete_raw_obj(dpp, obj);
+ return rados->delete_raw_obj(dpp, obj, y);
}
void RadosStore::get_raw_obj(const rgw_placement_rule& placement_rule, const rgw_obj& obj, rgw_raw_obj* raw_obj)
return 0;
}
-int RadosStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info)
+int RadosStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y)
{
- return rados->log_usage(dpp, usage_info);
+ return rados->log_usage(dpp, usage_info, y);
}
int RadosStore::log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl)
is_truncated, usage_iter, usage);
}
-int RadosStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
+int RadosStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
rgw_user uid;
std::string bucket_name;
- return rados->trim_usage(dpp, uid, bucket_name, start_epoch, end_epoch);
+ return rados->trim_usage(dpp, uid, bucket_name, start_epoch, end_epoch, y);
}
int RadosStore::get_config_key_val(std::string name, bufferlist* bl)
}
-int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct)
+int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct, optional_yield y)
{
std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
meta_obj->set_in_extra_data(true);
store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id());
} else {
/* use upload id as tag and do it synchronously */
- auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id());
+ auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id(), y);
if (ret < 0 && leftover_chain) {
ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
if (ret == -ENOENT) {
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
- virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info) override;
+ virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
virtual int register_to_service_map(const DoutPrefixProvider *dpp, const std::string& daemon_type,
const std::map<std::string, std::string>& meta) override;
virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) override;
virtual void wakeup_meta_sync_shards(std::set<int>& shard_ids) override { rados->wakeup_meta_sync_shards(shard_ids); }
virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, boost::container::flat_map<int, boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) override { rados->wakeup_data_sync_shards(dpp, source_zone, shard_ids); }
- virtual int clear_usage(const DoutPrefixProvider *dpp) override { return rados->clear_usage(dpp); }
+ virtual int clear_usage(const DoutPrefixProvider *dpp, optional_yield y) override { return rados->clear_usage(dpp, y); }
virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool* is_truncated,
RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
- virtual int trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
+ virtual int trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override;
virtual int get_config_key_val(std::string name, bufferlist* bl) override;
virtual int meta_list_keys_init(const DoutPrefixProvider *dpp, const std::string& section, const std::string& marker, void** phandle) override;
virtual int meta_list_keys_next(const DoutPrefixProvider *dpp, void* handle, int max, std::list<std::string>& keys, bool* truncated) override;
/* Unique to RadosStore */
int get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
librados::IoCtx* ioctx);
- int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj);
+ int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y);
void get_raw_obj(const rgw_placement_rule& placement_rule, const rgw_obj& obj, rgw_raw_obj* raw_obj);
int get_raw_chunk_size(const DoutPrefixProvider* dpp, const rgw_raw_obj& obj, uint64_t* chunk_size);
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
bool* is_truncated, RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
- virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
+ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override;
virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) override;
virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info = nullptr) override;
int shard_id, RGWGetBucketStats_CB* ctx) override;
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int update_container_stats(const DoutPrefixProvider* dpp) override;
- virtual int check_bucket_shards(const DoutPrefixProvider* dpp) override;
+ virtual int check_bucket_shards(const DoutPrefixProvider* dpp, optional_yield y) override;
virtual int chown(const DoutPrefixProvider* dpp, User& new_user, optional_yield y) override;
virtual int put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time mtime) override;
virtual bool is_owner(User* user) override;
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
bool* is_truncated, RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
- virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
+ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override;
virtual int remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink) override;
virtual int check_index(const DoutPrefixProvider *dpp, std::map<RGWObjCategory, RGWStorageStats>& existing_stats, std::map<RGWObjCategory, RGWStorageStats>& calculated_stats) override;
virtual int rebuild_index(const DoutPrefixProvider *dpp) override;
std::map<std::string, bool> *common_prefixes,
bool *is_truncated) override;
virtual int abort_multiparts(const DoutPrefixProvider* dpp,
- CephContext* cct) override;
+ CephContext* cct, optional_yield y) override;
int read_topics(rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override;
int write_topics(const rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker,
int num_parts, int marker,
int* next_marker, bool* truncated,
bool assume_unsorted = false) override;
- virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct) override;
+ virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, optional_yield y) override;
virtual int complete(const DoutPrefixProvider* dpp,
optional_yield y, CephContext* cct,
std::map<int, std::string>& part_etags,
string status = (needs_fixing ? "needs_fixing" : "ok");
if ((needs_fixing || remove_bad) && fix) {
- ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->fix_head_obj_locator(dpp(), obj->get_bucket()->get_info(), needs_fixing, remove_bad, obj->get_key());
+ ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->fix_head_obj_locator(dpp(), obj->get_bucket()->get_info(), needs_fixing, remove_bad, obj->get_key(), null_yield);
if (ret < 0) {
cerr << "ERROR: fix_head_object_locator() returned ret=" << ret << std::endl;
goto done;
false,
false,
false,
+ null_yield,
need_cache && g_conf()->rgw_cache_enabled,
need_gc);
}
return -ret;
}
}
- ret = RGWUsage::trim(dpp(), driver, user.get(), bucket.get(), start_epoch, end_epoch);
+ ret = RGWUsage::trim(dpp(), driver, user.get(), bucket.get(), start_epoch, end_epoch, null_yield);
if (ret < 0) {
cerr << "ERROR: read_usage() returned ret=" << ret << std::endl;
return 1;
return 1;
}
- ret = RGWUsage::clear(dpp(), driver);
+ ret = RGWUsage::clear(dpp(), driver, null_yield);
if (ret < 0) {
return ret;
}
}
RGWOLHInfo olh;
rgw_obj obj(bucket->get_key(), object);
- ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_olh(dpp(), bucket->get_info(), obj, &olh);
+ ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_olh(dpp(), bucket->get_info(), obj, &olh, null_yield);
if (ret < 0) {
cerr << "ERROR: failed reading olh: " << cpp_strerror(-ret) << std::endl;
return -ret;
return -ret;
}
- ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bucket_index_read_olh_log(dpp(), bucket->get_info(), *state, obj->get_obj(), 0, &log, &is_truncated);
+ ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bucket_index_read_olh_log(dpp(), bucket->get_info(), *state, obj->get_obj(), 0, &log, &is_truncated, null_yield);
if (ret < 0) {
cerr << "ERROR: failed reading olh: " << cpp_strerror(-ret) << std::endl;
return -ret;
} // OPT::OBJECT_REINDEX
if (opt_cmd == OPT::OBJECTS_EXPIRE) {
- if (!static_cast<rgw::sal::RadosStore*>(driver)->getRados()->process_expire_objects(dpp())) {
+ if (!static_cast<rgw::sal::RadosStore*>(driver)->getRados()->process_expire_objects(dpp(), null_yield)) {
cerr << "ERROR: process_expire_objects() processing returned error." << std::endl;
return 1;
}
} else if (inject_abort_at) {
fault.inject(*inject_abort_at, InjectAbort{});
}
- ret = br.execute(num_shards, fault, max_entries, dpp(),
+ ret = br.execute(num_shards, fault, max_entries, dpp(), null_yield,
verbose, &cout, formatter.get());
return -ret;
}
entry.old_num_shards = num_source_shards;
entry.new_num_shards = num_shards;
- return reshard.add(dpp(), entry);
+ return reshard.add(dpp(), entry, null_yield);
}
if (opt_cmd == OPT::RESHARD_LIST) {
if (opt_cmd == OPT::RESHARD_PROCESS) {
RGWReshard reshard(static_cast<rgw::sal::RadosStore*>(driver), true, &cout);
- int ret = reshard.process_all_logshards(dpp());
+ int ret = reshard.process_all_logshards(dpp(), null_yield);
if (ret < 0) {
cerr << "ERROR: failed to process reshard logs, error=" << cpp_strerror(-ret) << std::endl;
return -ret;
entry.tenant = tenant;
entry.bucket_name = bucket_name;
- ret = reshard.remove(dpp(), entry);
+ ret = reshard.remove(dpp(), entry, null_yield);
if (ret == -ENOENT) {
if (!resharding_underway) {
cerr << "Error, bucket \"" << bucket_name <<
run_quota,
run_sync,
g_conf().get_val<bool>("rgw_dynamic_resharding"),
- true, // run notification thread
+ true, null_yield, // run notification thread
g_conf()->rgw_cache_enabled);
if (!env.driver) {
return -EIO;
if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
rgw_obj_key key(obj.key);
std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name);
- int ret = mpu->abort(this, cct);
+ int ret = mpu->abort(this, cct, null_yield);
if (ret == 0) {
if (perfcounter) {
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
num_entries = 0;
lock.unlock();
- driver->log_usage(this, old_map);
+ driver->log_usage(this, old_map, null_yield);
}
CephContext *get_cct() const override { return cct; }
DriverManager::Config cfg;
cfg.store_name = "rados";
cfg.filter_name = "none";
- driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false, false);
+ driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false, false, null_yield);
if (!driver) {
std::cerr << "couldn't init storage provider" << std::endl;
return EIO;
}
multipart_trace = tracing::rgw::tracer.add_span(name(), trace_ctx);
- op_ret = upload->abort(this, s->cct);
+ op_ret = upload->abort(this, s->cct, y);
}
int RGWListMultipart::verify_permission(optional_yield y)
return r;
}
- return bucket->check_bucket_shards(dpp);
+ return bucket->check_bucket_shards(dpp, y);
}
int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user& _u, optional_yield y)
cct->_conf->rgw_enable_quota_threads,
cct->_conf->rgw_run_sync_thread,
cct->_conf.get_val<bool>("rgw_dynamic_resharding"),
- true, // run notification thread
+ true, null_yield, // run notification thread
cct->_conf->rgw_cache_enabled);
}
}
}
- op_ret = RGWUsage::trim(this, driver, user.get(), bucket.get(), start, end);
+ op_ret = RGWUsage::trim(this, driver, user.get(), bucket.get(), start, end, y);
}
RGWOp *RGWHandler_Usage::op_get()
bool run_reshard_thread,
bool run_notification_thread,
bool use_cache,
- bool use_gc)
+ bool use_gc, optional_yield y)
{
rgw::sal::Driver* driver{nullptr};
delete driver;
return nullptr;
}
- if (rados->init_complete(dpp) < 0) {
+ if (rados->init_complete(dpp, y) < 0) {
delete driver;
return nullptr;
}
delete driver;
return nullptr;
}
- if (rados->init_complete(dpp) < 0) {
+ if (rados->init_complete(dpp, y) < 0) {
delete driver;
return nullptr;
}
/** Log usage data to the driver. Usage data is things like bytes sent/received and
* op count */
- virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info) = 0;
+ virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) = 0;
/** Log OP data to the driver. Data is opaque to SAL */
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) = 0;
/** Register this driver to the service map. Somewhat Rados specific; may be removed*/
/** Wake up sync threads for bucket data sync */
virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, boost::container::flat_map<int, boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) = 0;
/** Clear all usage statistics globally */
- virtual int clear_usage(const DoutPrefixProvider *dpp) = 0;
+ virtual int clear_usage(const DoutPrefixProvider *dpp, optional_yield y) = 0;
/** Get usage statistics for all users and buckets */
virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool* is_truncated,
RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) = 0;
/** Trim usage log for all users and buckets */
- virtual int trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) = 0;
+ virtual int trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) = 0;
/** Get a configuration value for the given name */
virtual int get_config_key_val(std::string name, bufferlist* bl) = 0;
/** Start a metadata listing of the given section */
bool* is_truncated, RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) = 0;
/** Trim User usage stats to the given epoch range */
- virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) = 0;
+ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) = 0;
/** Load this User from the backing store. requires ID to be set, fills all other fields. */
virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) = 0;
/** Refresh the metadata stats (size, count, and so on) from the backing store */
virtual int update_container_stats(const DoutPrefixProvider* dpp) = 0;
/** Check if this bucket needs resharding, and schedule it if it does */
- virtual int check_bucket_shards(const DoutPrefixProvider* dpp) = 0;
+ virtual int check_bucket_shards(const DoutPrefixProvider* dpp, optional_yield y) = 0;
/** Change the owner of this bucket in the backing store. Current owner must be set. Does not
* change ownership of the objects in the bucket. */
virtual int chown(const DoutPrefixProvider* dpp, User& new_user, optional_yield y) = 0;
bool* is_truncated, RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) = 0;
/** Trim the usage information to the given epoch range */
- virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) = 0;
+ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) = 0;
/** Remove objects from the bucket index of this bucket. May be removed from API */
virtual int remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink) = 0;
/** Check the state of the bucket index, and get stats from it. May be removed from API */
bool *is_truncated) = 0;
/** Abort multipart uploads in a bucket */
virtual int abort_multiparts(const DoutPrefixProvider* dpp,
- CephContext* cct) = 0;
+ CephContext* cct, optional_yield y) = 0;
/** Read the bucket notification config into @a notifications with and (optionally) @a objv_tracker */
virtual int read_topics(rgw_pubsub_bucket_topics& notifications,
int* next_marker, bool* truncated,
bool assume_unsorted = false) = 0;
/** Abort this upload */
- virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct) = 0;
+ virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, optional_yield y) = 0;
/** Complete this upload, making it available as a normal object */
virtual int complete(const DoutPrefixProvider* dpp,
optional_yield y, CephContext* cct,
bool quota_threads,
bool run_sync_thread,
bool run_reshard_thread,
- bool run_notification_thread,
+ bool run_notification_thread, optional_yield y,
bool use_cache = true,
bool use_gc = true) {
rgw::sal::Driver* driver = init_storage_provider(dpp, cct, cfg, use_gc_thread,
run_sync_thread,
run_reshard_thread,
run_notification_thread,
- use_cache, use_gc);
+ use_cache, use_gc, y);
return driver;
}
/** Get a stripped down driver by service name */
bool run_reshard_thread,
bool run_notification_thread,
bool use_metadata_cache,
- bool use_gc);
+ bool use_gc, optional_yield y);
/** Initialize a new raw Driver */
static rgw::sal::Driver* init_raw_storage_provider(const DoutPrefixProvider* dpp,
CephContext* cct,
return 0;
}
- int DBUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
+ int DBUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
return 0;
}
return 0;
}
- int DBBucket::check_bucket_shards(const DoutPrefixProvider *dpp)
+ int DBBucket::check_bucket_shards(const DoutPrefixProvider *dpp, optional_yield y)
{
return 0;
}
return 0;
}
- int DBBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
+ int DBBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
return 0;
}
}
int DBBucket::abort_multiparts(const DoutPrefixProvider* dpp,
- CephContext* cct) {
+ CephContext* cct, optional_yield y) {
return 0;
}
return 0;
}
- int DBMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct)
+ int DBMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct, optional_yield y)
{
std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
meta_obj->set_in_extra_data(true);
return lc;
}
- int DBStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info)
+ int DBStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y)
{
return 0;
}
return 0;
}
- int DBStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
+ int DBStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
return 0;
}
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
bool* is_truncated, RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
- virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
+ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override;
/* Placeholders */
virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) override;
virtual int read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB* ctx) override;
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int update_container_stats(const DoutPrefixProvider *dpp) override;
- virtual int check_bucket_shards(const DoutPrefixProvider *dpp) override;
+ virtual int check_bucket_shards(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int chown(const DoutPrefixProvider *dpp, User& new_user, optional_yield y) override;
virtual int put_info(const DoutPrefixProvider *dpp, bool exclusive, ceph::real_time mtime) override;
virtual bool is_owner(User* user) override;
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
bool *is_truncated, RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
- virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
+ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override;
virtual int remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink) override;
virtual int check_index(const DoutPrefixProvider *dpp, std::map<RGWObjCategory, RGWStorageStats>& existing_stats, std::map<RGWObjCategory, RGWStorageStats>& calculated_stats) override;
virtual int rebuild_index(const DoutPrefixProvider *dpp) override;
std::map<std::string, bool> *common_prefixes,
bool *is_truncated) override;
virtual int abort_multiparts(const DoutPrefixProvider* dpp,
- CephContext* cct) override;
+ CephContext* cct, optional_yield y) override;
friend class DBStore;
};
int num_parts, int marker,
int* next_marker, bool* truncated,
bool assume_unsorted = false) override;
- virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct) override;
+ virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, optional_yield y) override;
virtual int complete(const DoutPrefixProvider* dpp,
optional_yield y, CephContext* cct,
std::map<int, std::string>& part_etags,
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
- virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info) override;
+ virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
virtual int register_to_service_map(const DoutPrefixProvider *dpp, const std::string& daemon_type,
const std::map<std::string, std::string>& meta) override;
boost::container::flat_map<
int,
boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) override { return; }
- virtual int clear_usage(const DoutPrefixProvider *dpp) override { return 0; }
+ virtual int clear_usage(const DoutPrefixProvider *dpp, optional_yield y) override { return 0; }
virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool *is_truncated,
RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
- virtual int trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
+ virtual int trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override;
virtual int get_config_key_val(std::string name, bufferlist* bl) override;
virtual int meta_list_keys_init(const DoutPrefixProvider *dpp, const std::string& section, const std::string& marker, void** phandle) override;
virtual int meta_list_keys_next(const DoutPrefixProvider *dpp, void* handle, int max, std::list<std::string>& keys, bool* truncated) override;
return next->get_cr_registry();
}
-int FilterDriver::log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info)
+int FilterDriver::log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y)
{
- return next->log_usage(dpp, usage_info);
+ return next->log_usage(dpp, usage_info, y);
}
int FilterDriver::log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl)
return next->wakeup_data_sync_shards(dpp, source_zone, shard_ids);
}
-int FilterDriver::clear_usage(const DoutPrefixProvider *dpp)
+int FilterDriver::clear_usage(const DoutPrefixProvider *dpp, optional_yield y)
{
- return next->clear_usage(dpp);
+ return next->clear_usage(dpp, y);
}
int FilterDriver::read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch,
}
int FilterDriver::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch,
- uint64_t end_epoch)
+ uint64_t end_epoch, optional_yield y)
{
- return next->trim_all_usage(dpp, start_epoch, end_epoch);
+ return next->trim_all_usage(dpp, start_epoch, end_epoch, y);
}
int FilterDriver::get_config_key_val(std::string name, bufferlist* bl)
}
int FilterUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch,
- uint64_t end_epoch)
+ uint64_t end_epoch, optional_yield y)
{
- return next->trim_usage(dpp, start_epoch, end_epoch);
+ return next->trim_usage(dpp, start_epoch, end_epoch, y);
}
int FilterUser::load_user(const DoutPrefixProvider* dpp, optional_yield y)
return next->update_container_stats(dpp);
}
-int FilterBucket::check_bucket_shards(const DoutPrefixProvider* dpp)
+int FilterBucket::check_bucket_shards(const DoutPrefixProvider* dpp, optional_yield y)
{
- return next->check_bucket_shards(dpp);
+ return next->check_bucket_shards(dpp, y);
}
int FilterBucket::chown(const DoutPrefixProvider* dpp, User& new_user, optional_yield y)
}
int FilterBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch,
- uint64_t end_epoch)
+ uint64_t end_epoch, optional_yield y)
{
- return next->trim_usage(dpp, start_epoch, end_epoch);
+ return next->trim_usage(dpp, start_epoch, end_epoch, y);
}
int FilterBucket::remove_objs_from_index(const DoutPrefixProvider *dpp,
return 0;
}
-int FilterBucket::abort_multiparts(const DoutPrefixProvider* dpp, CephContext* cct)
+int FilterBucket::abort_multiparts(const DoutPrefixProvider* dpp, CephContext* cct, optional_yield y)
{
- return next->abort_multiparts(dpp, cct);
+ return next->abort_multiparts(dpp, cct, y);
}
int FilterObject::delete_object(const DoutPrefixProvider* dpp,
return 0;
}
-int FilterMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct)
+int FilterMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct, optional_yield y)
{
- return next->abort(dpp, cct);
+ return next->abort(dpp, cct, y);
}
int FilterMultipartUpload::complete(const DoutPrefixProvider *dpp,
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket,
- RGWUsageBatch>& usage_info) override;
+ RGWUsageBatch>& usage_info, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid,
bufferlist& bl) override;
virtual int register_to_service_map(const DoutPrefixProvider *dpp, const
virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
const rgw_zone_id& source_zone,
boost::container::flat_map<int, boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) override;
- virtual int clear_usage(const DoutPrefixProvider *dpp) override;
+ virtual int clear_usage(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int read_all_usage(const DoutPrefixProvider *dpp,
uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool* is_truncated,
RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
virtual int trim_all_usage(const DoutPrefixProvider *dpp,
- uint64_t start_epoch, uint64_t end_epoch) override;
+ uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override;
virtual int get_config_key_val(std::string name, bufferlist* bl) override;
virtual int meta_list_keys_init(const DoutPrefixProvider *dpp,
const std::string& section,
bool* is_truncated, RGWUsageIter& usage_iter,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) override;
virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch,
- uint64_t end_epoch) override;
+ uint64_t end_epoch, optional_yield y) override;
virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) override;
virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool
int shard_id, RGWGetBucketStats_CB* ctx) override;
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int update_container_stats(const DoutPrefixProvider* dpp) override;
- virtual int check_bucket_shards(const DoutPrefixProvider* dpp) override;
+ virtual int check_bucket_shards(const DoutPrefixProvider* dpp, optional_yield y) override;
virtual int chown(const DoutPrefixProvider* dpp, User& new_user,
optional_yield y) override;
virtual int put_info(const DoutPrefixProvider* dpp, bool exclusive,
std::map<rgw_user_bucket,
rgw_usage_log_entry>& usage) override;
virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch,
- uint64_t end_epoch) override;
+ uint64_t end_epoch, optional_yield y) override;
virtual int remove_objs_from_index(const DoutPrefixProvider *dpp,
std::list<rgw_obj_index_key>&
objs_to_unlink) override;
std::map<std::string, bool> *common_prefixes,
bool *is_truncated) override;
virtual int abort_multiparts(const DoutPrefixProvider* dpp,
- CephContext* cct) override;
+ CephContext* cct, optional_yield y) override;
int read_topics(rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override {
int num_parts, int marker,
int* next_marker, bool* truncated,
bool assume_unsorted = false) override;
- virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct) override;
+ virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, optional_yield y) override;
virtual int complete(const DoutPrefixProvider* dpp,
optional_yield y, CephContext* cct,
std::map<int, std::string>& part_etags,
int RGWUsage::trim(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
rgw::sal::User* user , rgw::sal::Bucket* bucket,
- uint64_t start_epoch, uint64_t end_epoch)
+ uint64_t start_epoch, uint64_t end_epoch, optional_yield y)
{
if (bucket) {
- return bucket->trim_usage(dpp, start_epoch, end_epoch);
+ return bucket->trim_usage(dpp, start_epoch, end_epoch, y);
} else if (user) {
- return user->trim_usage(dpp, start_epoch, end_epoch);
+ return user->trim_usage(dpp, start_epoch, end_epoch, y);
} else {
- return driver->trim_all_usage(dpp, start_epoch, end_epoch);
+ return driver->trim_all_usage(dpp, start_epoch, end_epoch, y);
}
}
-int RGWUsage::clear(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver)
+int RGWUsage::clear(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, optional_yield y)
{
- return driver->clear_usage(dpp);
+ return driver->clear_usage(dpp, y);
}
static int trim(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
rgw::sal::User* user , rgw::sal::Bucket* bucket,
- uint64_t start_epoch, uint64_t end_epoch);
+ uint64_t start_epoch, uint64_t end_epoch, optional_yield y);
- static int clear(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver);
+ static int clear(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, optional_yield y);
};
ldout(cct, 0) << "ERROR: could not sync bucket stats: ret=" << ret << dendl;
return ret;
}
- ret = bucket->check_bucket_shards(dpp);
+ ret = bucket->check_bucket_shards(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR in check_bucket_shards: " << cpp_strerror(-ret)<< dendl;
}
#include "include/rados/librados.hpp"
+#include "rgw_tools.h"
+
#include "common/common_init.h"
#include "common/config.h"
#include "common/ceph_argparse.h"
false,
false,
false,
- false,
+ false, null_yield,
true,
false));
if (!store) {