#include "cls/rgw/cls_rgw_client.h"
#include "cls/rgw_gc/cls_rgw_gc_client.h"
#include "cls/refcount/cls_refcount_client.h"
+#include "cls/version/cls_version_client.h"
#include "rgw_perf_counters.h"
#include "cls/lock/cls_lock_client.h"
-#include "cls/version/cls_version_client.h"
#include "include/random.h"
#include <list> // XXX
cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
}
-int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
+int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag)
{
ObjectWriteOperation op;
cls_rgw_gc_obj_info info;
return store->gc_operate(obj_names[i], &op);
}
-int RGWGC::defer_chain(const string& tag, cls_rgw_obj_chain& chain, bool sync)
-{
+struct defer_chain_state {
+ librados::AioCompletion* completion = nullptr;
+ // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
+ // RGWGC destructs before this completion fires
+ RGWGC* gc = nullptr;
cls_rgw_gc_obj_info info;
- info.chain = chain;
- info.tag = tag;
- int i = tag_index(tag);
+ ~defer_chain_state() {
+ if (completion) {
+ completion->release();
+ }
+ }
+};
+
+static void async_defer_callback(librados::completion_t, void* arg)
+{
+ std::unique_ptr<defer_chain_state> state{static_cast<defer_chain_state*>(arg)};
+ if (state->completion->get_return_value() == -ECANCELED) {
+ state->gc->on_defer_canceled(state->info);
+ }
+}
+
+void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info)
+{
+ const std::string& tag = info.tag;
+ const int i = tag_index(tag);
+
+ // ECANCELED from cls_version_check() tells us that we've transitioned
+ transitioned_objects_cache[i] = true;
ObjectWriteOperation op;
+ cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
+ cls_rgw_gc_remove(op, {tag});
- obj_version objv;
- objv.ver = 0;
- cls_version_check(op, objv, VER_COND_EQ);
- cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
- if (sync) {
- auto ret = store->gc_operate(obj_names[i], &op);
- if (ret != -ECANCELED && ret != -EPERM) {
- return ret;
- }
- } else {
- AioCompletion *c;
- auto ret = store->gc_aio_operate(obj_names[i], &op, &c);
- c->wait_for_safe();
- ret = c->get_return_value();
- c->release();
- if (ret != -ECANCELED && ret != -EPERM) {
- return ret;
- }
- }
+ auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ store->gc_aio_operate(obj_names[i], c, &op);
+ c->release();
+}
- if (! transitioned_objects_cache[i]) {
- cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
- } else {
+int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain)
+{
+ const int i = tag_index(tag);
+
+ // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
+ if (transitioned_objects_cache[i]) {
+ cls_rgw_gc_obj_info info;
+ info.chain = chain;
+ info.tag = tag;
+
+ ObjectWriteOperation op;
cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
- }
- if (sync) {
- auto ret = store->gc_operate(obj_names[i], &op);
- if ((ret < 0 && ret != -ENOENT)) {
- return ret;
- }
- if (! transitioned_objects_cache[i]) {
- //Successfully deferred
- if (ret == 0) {
- //Enqueue in queue
- cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
- //Remove tag from omap
- vector<string> tags;
- tags.emplace_back(tag);
- cls_rgw_gc_remove(op, tags);
- return store->gc_operate(obj_names[i], &op); //Enqueue, remove in one step
- }
- //If not found in omap, then the tag must be in queue
- if (ret == -ENOENT) {
- cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
- return store->gc_operate(obj_names[i], &op);
- }
- }
- if (transitioned_objects_cache[i]) {
- return ret;
- }
- } else {
- AioCompletion *c;
- auto ret = store->gc_aio_operate(obj_names[i], &op, &c);
- c->wait_for_safe();
- ret = c->get_return_value();
+
+ // this tag may still be present in omap, so remove it once the cls_rgw_gc
+ // enqueue succeeds
+ cls_rgw_gc_remove(op, {tag});
+
+ auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ int ret = store->gc_aio_operate(obj_names[i], c, &op);
c->release();
- if ((ret < 0 && ret != -ENOENT)) {
- return ret;
- }
- if (! transitioned_objects_cache[i]) {
- //Successfully deferred
- if (ret == 0) {
- //Enqueue in queue
- cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
- //Remove tag from omap
- vector<string> tags;
- tags.emplace_back(tag);
- cls_rgw_gc_remove(op, tags);
- }
- //If not found in omap, then the tag must be in queue
- if (ret == -ENOENT) {
- cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
- }
- ret = store->gc_aio_operate(obj_names[i], &op, &c);
- c->wait_for_safe();
- ret = c->get_return_value();
- c->release();
- return ret;
- }
- if (transitioned_objects_cache[i]) {
- return ret;
- }
+ return ret;
}
- return 0;
+
+ // if we haven't seen the transition yet, write the defer to omap with cls_rgw
+ ObjectWriteOperation op;
+
+ // assert that we haven't initialized cls_rgw_gc queue. this prevents us
+ // from writing new entries to omap after the transition
+ obj_version objv; // objv.ver = 0
+ cls_version_check(op, objv, VER_COND_EQ);
+
+ cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
+
+ // prepare a callback to detect the transition via ECANCELED from cls_version_check()
+ auto state = std::make_unique<defer_chain_state>();
+ state->gc = this;
+ state->info.chain = chain;
+ state->info.tag = tag;
+ state->completion = librados::Rados::aio_create_completion(
+ state.get(), async_defer_callback, nullptr);
+
+ int ret = store->gc_aio_operate(obj_names[i], state->completion, &op);
+ if (ret == 0) {
+ state.release(); // release ownership until async_defer_callback()
+ }
+ return ret;
}
int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
{
ObjectWriteOperation op;
cls_rgw_gc_remove(op, tags);
- return store->gc_aio_operate(obj_names[index], &op, pc);
+
+ auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ int ret = store->gc_aio_operate(obj_names[index], c, &op);
+ if (ret < 0) {
+ c->release();
+ } else {
+ *pc = c;
+ }
+ return ret;
}
int RGWGC::remove(int index, int num_entries, librados::AioCompletion **pc)
{
ObjectWriteOperation op;
cls_rgw_gc_queue_remove_entries(op, num_entries);
- return store->gc_aio_operate(obj_names[index], &op, pc);
+
+ auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ int ret = store->gc_aio_operate(obj_names[index], c, &op);
+ if (ret < 0) {
+ c->release();
+ } else {
+ *pc = c;
+ }
+ return ret;
}
int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
}
string tag = (state->tail_tag.length() > 0 ? state->tail_tag.to_str() : state->obj_tag.to_str());
- return store->gc->send_chain(chain, tag, false); // do it async
+ return store->gc->send_chain(chain, tag); // do it sync
}
void RGWRados::update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain)
}
}
-int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool sync)
+int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag)
{
- return gc->send_chain(chain, tag, sync);
+ return gc->send_chain(chain, tag);
}
static void accumulate_raw_stats(const rgw_bucket_dir_header& header,
cls_rgw_obj_chain chain;
update_gc_chain(state->obj, *state->manifest, &chain);
- return gc->defer_chain(tag, chain, true);
+ return gc->async_defer_chain(tag, chain);
}
void RGWRados::remove_rgw_head_obj(ObjectWriteOperation& op)
return rgw_rados_operate(gc_pool_ctx, oid, op, null_yield);
}
-int RGWRados::gc_aio_operate(string& oid, librados::ObjectWriteOperation *op, AioCompletion **pc)
+int RGWRados::gc_aio_operate(const string& oid, librados::AioCompletion *c,
+ librados::ObjectWriteOperation *op)
{
- AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- int r = gc_pool_ctx.aio_operate(oid, c, op);
- if (!pc) {
- c->release();
- } else {
- *pc = c;
- }
- return r;
+ return gc_pool_ctx.aio_operate(oid, c, op);
}
int RGWRados::gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl)