From ad3a8b51f8287b6f888b533de3f0f62c6b955606 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 24 Sep 2019 14:44:44 -0400 Subject: [PATCH] rgw: async_defer_chain() that detects transition to cls_rgw_gc this replaces the defer_chain() function with optional asynchrony with one that's only async. in the pre-transition case, it arranges for a librados callback to detect the transition and retry against cls_rgw_gc the RGWRados::gc_aio_operate() interface now requires the caller to pass in an AioCompletion to support async_defer_chain()'s custom callback Signed-off-by: Casey Bodley --- src/cls/rgw_gc/cls_rgw_gc_client.cc | 4 +- src/cls/rgw_gc/cls_rgw_gc_client.h | 6 +- src/rgw/rgw_gc.cc | 185 +++++++++++++++------------- src/rgw/rgw_gc.h | 10 +- src/rgw/rgw_multi.cc | 3 +- src/rgw/rgw_rados.cc | 20 ++- src/rgw/rgw_rados.h | 5 +- 7 files changed, 121 insertions(+), 112 deletions(-) diff --git a/src/cls/rgw_gc/cls_rgw_gc_client.cc b/src/cls/rgw_gc/cls_rgw_gc_client.cc index e7f98c232a75b..ac22d8628d190 100644 --- a/src/cls/rgw_gc/cls_rgw_gc_client.cc +++ b/src/cls/rgw_gc/cls_rgw_gc_client.cc @@ -41,7 +41,7 @@ int cls_rgw_gc_queue_get_capacity(IoCtx& io_ctx, const string& oid, uint64_t& si return 0; } -void cls_rgw_gc_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) +void cls_rgw_gc_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info) { bufferlist in; cls_rgw_gc_set_entry_op call; @@ -91,7 +91,7 @@ void cls_rgw_gc_queue_remove_entries(ObjectWriteOperation& op, uint32_t num_entr op.exec(RGW_GC_CLASS, RGW_GC_QUEUE_REMOVE_ENTRIES, in); } -void cls_rgw_gc_queue_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) +void cls_rgw_gc_queue_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info) { bufferlist in; cls_rgw_gc_queue_defer_entry_op defer_op; diff --git a/src/cls/rgw_gc/cls_rgw_gc_client.h b/src/cls/rgw_gc/cls_rgw_gc_client.h index 758c09b6aa112..107bed4eb9e1d 100644 --- a/src/cls/rgw_gc/cls_rgw_gc_client.h +++ b/src/cls/rgw_gc/cls_rgw_gc_client.h @@ -8,10 +8,10 @@ void cls_rgw_gc_queue_init(librados::ObjectWriteOperation& op, uint64_t size, uint64_t num_urgent_data_entries); int cls_rgw_gc_queue_get_capacity(librados::IoCtx& io_ctx, const string& oid, uint64_t& size); -void cls_rgw_gc_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); +void cls_rgw_gc_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info); int cls_rgw_gc_queue_list_entries(librados::IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max, bool expired_only, list& entries, bool *truncated, string& next_marker); void cls_rgw_gc_queue_remove_entries(librados::ObjectWriteOperation& op, uint32_t num_entries); -void cls_rgw_gc_queue_defer_entry(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info); +void cls_rgw_gc_queue_defer_entry(librados::ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info); -#endif \ No newline at end of file +#endif diff --git a/src/rgw/rgw_gc.cc b/src/rgw/rgw_gc.cc index e791939a5d2a3..f996c732af1f4 100644 --- a/src/rgw/rgw_gc.cc +++ b/src/rgw/rgw_gc.cc @@ -9,9 +9,9 @@ #include "cls/rgw/cls_rgw_client.h" #include "cls/rgw_gc/cls_rgw_gc_client.h" #include "cls/refcount/cls_refcount_client.h" +#include "cls/version/cls_version_client.h" #include "rgw_perf_counters.h" #include "cls/lock/cls_lock_client.h" -#include "cls/version/cls_version_client.h" #include "include/random.h" #include // XXX @@ -74,7 +74,7 @@ void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_gc_obj_info& info) cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info); } -int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync) +int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag) { ObjectWriteOperation op; cls_rgw_gc_obj_info info; @@ -94,114 +94,121 @@ int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync) return store->gc_operate(obj_names[i], &op); } -int RGWGC::defer_chain(const string& tag, cls_rgw_obj_chain& chain, bool sync) -{ +struct defer_chain_state { + librados::AioCompletion* completion = nullptr; + // TODO: hold a reference on the state in RGWGC to avoid use-after-free if + // RGWGC destructs before this completion fires + RGWGC* gc = nullptr; cls_rgw_gc_obj_info info; - info.chain = chain; - info.tag = tag; - int i = tag_index(tag); + ~defer_chain_state() { + if (completion) { + completion->release(); + } + } +}; + +static void async_defer_callback(librados::completion_t, void* arg) +{ + std::unique_ptr state{static_cast(arg)}; + if (state->completion->get_return_value() == -ECANCELED) { + state->gc->on_defer_canceled(state->info); + } +} + +void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info) +{ + const std::string& tag = info.tag; + const int i = tag_index(tag); + + // ECANCELED from cls_version_check() tells us that we've transitioned + transitioned_objects_cache[i] = true; ObjectWriteOperation op; + cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info); + cls_rgw_gc_remove(op, {tag}); - obj_version objv; - objv.ver = 0; - cls_version_check(op, objv, VER_COND_EQ); - cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag); - if (sync) { - auto ret = store->gc_operate(obj_names[i], &op); - if (ret != -ECANCELED && ret != -EPERM) { - return ret; - } - } else { - AioCompletion *c; - auto ret = store->gc_aio_operate(obj_names[i], &op, &c); - c->wait_for_safe(); - ret = c->get_return_value(); - c->release(); - if (ret != -ECANCELED && ret != -EPERM) { - return ret; - } - } + auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + store->gc_aio_operate(obj_names[i], c, &op); + c->release(); +} - if (! transitioned_objects_cache[i]) { - cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag); - } else { +int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain) +{ + const int i = tag_index(tag); + + // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue + if (transitioned_objects_cache[i]) { + cls_rgw_gc_obj_info info; + info.chain = chain; + info.tag = tag; + + ObjectWriteOperation op; cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info); - } - if (sync) { - auto ret = store->gc_operate(obj_names[i], &op); - if ((ret < 0 && ret != -ENOENT)) { - return ret; - } - if (! transitioned_objects_cache[i]) { - //Successfully deferred - if (ret == 0) { - //Enqueue in queue - cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info); - //Remove tag from omap - vector tags; - tags.emplace_back(tag); - cls_rgw_gc_remove(op, tags); - return store->gc_operate(obj_names[i], &op); //Enqueue, remove in one step - } - //If not found in omap, then the tag must be in queue - if (ret == -ENOENT) { - cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info); - return store->gc_operate(obj_names[i], &op); - } - } - if (transitioned_objects_cache[i]) { - return ret; - } - } else { - AioCompletion *c; - auto ret = store->gc_aio_operate(obj_names[i], &op, &c); - c->wait_for_safe(); - ret = c->get_return_value(); + + // this tag may still be present in omap, so remove it once the cls_rgw_gc + // enqueue succeeds + cls_rgw_gc_remove(op, {tag}); + + auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + int ret = store->gc_aio_operate(obj_names[i], c, &op); c->release(); - if ((ret < 0 && ret != -ENOENT)) { - return ret; - } - if (! transitioned_objects_cache[i]) { - //Successfully deferred - if (ret == 0) { - //Enqueue in queue - cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info); - //Remove tag from omap - vector tags; - tags.emplace_back(tag); - cls_rgw_gc_remove(op, tags); - } - //If not found in omap, then the tag must be in queue - if (ret == -ENOENT) { - cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info); - } - ret = store->gc_aio_operate(obj_names[i], &op, &c); - c->wait_for_safe(); - ret = c->get_return_value(); - c->release(); - return ret; - } - if (transitioned_objects_cache[i]) { - return ret; - } + return ret; } - return 0; + + // if we haven't seen the transition yet, write the defer to omap with cls_rgw + ObjectWriteOperation op; + + // assert that we haven't initialized cls_rgw_gc queue. this prevents us + // from writing new entries to omap after the transition + obj_version objv; // objv.ver = 0 + cls_version_check(op, objv, VER_COND_EQ); + + cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag); + + // prepare a callback to detect the transition via ECANCELED from cls_version_check() + auto state = std::make_unique(); + state->gc = this; + state->info.chain = chain; + state->info.tag = tag; + state->completion = librados::Rados::aio_create_completion( + state.get(), async_defer_callback, nullptr); + + int ret = store->gc_aio_operate(obj_names[i], state->completion, &op); + if (ret == 0) { + state.release(); // release ownership until async_defer_callback() + } + return ret; } int RGWGC::remove(int index, const std::vector& tags, AioCompletion **pc) { ObjectWriteOperation op; cls_rgw_gc_remove(op, tags); - return store->gc_aio_operate(obj_names[index], &op, pc); + + auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + int ret = store->gc_aio_operate(obj_names[index], c, &op); + if (ret < 0) { + c->release(); + } else { + *pc = c; + } + return ret; } int RGWGC::remove(int index, int num_entries, librados::AioCompletion **pc) { ObjectWriteOperation op; cls_rgw_gc_queue_remove_entries(op, num_entries); - return store->gc_aio_operate(obj_names[index], &op, pc); + + auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + int ret = store->gc_aio_operate(obj_names[index], c, &op); + if (ret < 0) { + c->release(); + } else { + *pc = c; + } + return ret; } int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list& result, bool *truncated) diff --git a/src/rgw/rgw_gc.h b/src/rgw/rgw_gc.h index e21914a82830e..fd76f4cd3e684 100644 --- a/src/rgw/rgw_gc.h +++ b/src/rgw/rgw_gc.h @@ -49,8 +49,14 @@ public: } vector transitioned_objects_cache; void add_chain(librados::ObjectWriteOperation& op, cls_rgw_gc_obj_info& info); - int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync); - int defer_chain(const string& tag, cls_rgw_obj_chain& info, bool sync); + int send_chain(cls_rgw_obj_chain& chain, const string& tag); + + // asynchronously defer garbage collection on an object that's still being read + int async_defer_chain(const string& tag, const cls_rgw_obj_chain& info); + + // callback for when async_defer_chain() fails with ECANCELED + void on_defer_canceled(const cls_rgw_gc_obj_info& info); + int remove(int index, const std::vector& tags, librados::AioCompletion **pc); int remove(int index, int num_entries, librados::AioCompletion **pc); diff --git a/src/rgw/rgw_multi.cc b/src/rgw/rgw_multi.cc index e95e97d1aa765..03138750d8ae8 100644 --- a/src/rgw/rgw_multi.cc +++ b/src/rgw/rgw_multi.cc @@ -253,7 +253,8 @@ int abort_multipart_upload(rgw::sal::RGWRadosStore *store, CephContext *cct, } while (truncated); /* use upload id as tag and do it asynchronously */ - ret = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id(), false); + ret = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id()); + // XXX: should detect ENOSPC and delete inline if (ret < 0) { ldout(cct, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl; return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8b9073a582c05..d80ee75da8c16 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -4487,7 +4487,7 @@ int RGWRados::Object::complete_atomic_modification() } string tag = (state->tail_tag.length() > 0 ? state->tail_tag.to_str() : state->obj_tag.to_str()); - return store->gc->send_chain(chain, tag, false); // do it async + return store->gc->send_chain(chain, tag); // do it sync } void RGWRados::update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain) @@ -4504,9 +4504,9 @@ void RGWRados::update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_ } } -int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool sync) +int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag) { - return gc->send_chain(chain, tag, sync); + return gc->send_chain(chain, tag); } static void accumulate_raw_stats(const rgw_bucket_dir_header& header, @@ -4620,7 +4620,7 @@ int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_ob cls_rgw_obj_chain chain; update_gc_chain(state->obj, *state->manifest, &chain); - return gc->defer_chain(tag, chain, true); + return gc->async_defer_chain(tag, chain); } void RGWRados::remove_rgw_head_obj(ObjectWriteOperation& op) @@ -7775,16 +7775,10 @@ int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op) return rgw_rados_operate(gc_pool_ctx, oid, op, null_yield); } -int RGWRados::gc_aio_operate(string& oid, librados::ObjectWriteOperation *op, AioCompletion **pc) +int RGWRados::gc_aio_operate(const string& oid, librados::AioCompletion *c, + librados::ObjectWriteOperation *op) { - AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); - int r = gc_pool_ctx.aio_operate(oid, c, op); - if (!pc) { - c->release(); - } else { - *pc = c; - } - return r; + return gc_pool_ctx.aio_operate(oid, c, op); } int RGWRados::gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index e3017e785a1f0..e7d3d83d0249b 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1390,9 +1390,10 @@ public: int unlock(const rgw_pool& pool, const string& oid, string& zone_id, string& owner_id); void update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain); - int send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool sync); + int send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag); int gc_operate(string& oid, librados::ObjectWriteOperation *op); - int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op, librados::AioCompletion **pc = nullptr); + int gc_aio_operate(const std::string& oid, librados::AioCompletion *c, + librados::ObjectWriteOperation *op); int gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl); int list_gc_objs(int *index, string& marker, uint32_t max, bool expired_only, std::list& result, bool *truncated); -- 2.39.5