]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: async_defer_chain() that detects transition to cls_rgw_gc
authorCasey Bodley <cbodley@redhat.com>
Tue, 24 Sep 2019 18:44:44 +0000 (14:44 -0400)
committerPritha Srivastava <prsrivas@redhat.com>
Wed, 16 Oct 2019 03:53:38 +0000 (09:23 +0530)
this replaces the defer_chain() function with optional asynchrony with
one that's only async. in the pre-transition case, it arranges for a
librados callback to detect the transition and retry against cls_rgw_gc

the RGWRados::gc_aio_operate() interface now requires the caller to pass
in an AioCompletion to support async_defer_chain()'s custom callback

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/cls/rgw_gc/cls_rgw_gc_client.cc
src/cls/rgw_gc/cls_rgw_gc_client.h
src/rgw/rgw_gc.cc
src/rgw/rgw_gc.h
src/rgw/rgw_multi.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index e7f98c232a75b647b00f30c9706cd915c4128fdf..ac22d8628d19080d736de4a78044f07ba77e8032 100644 (file)
@@ -41,7 +41,7 @@ int cls_rgw_gc_queue_get_capacity(IoCtx& io_ctx, const string& oid, uint64_t& si
   return 0;
 }
 
-void cls_rgw_gc_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
+void cls_rgw_gc_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info)
 {
   bufferlist in;
   cls_rgw_gc_set_entry_op call;
@@ -91,7 +91,7 @@ void cls_rgw_gc_queue_remove_entries(ObjectWriteOperation& op, uint32_t num_entr
   op.exec(RGW_GC_CLASS, RGW_GC_QUEUE_REMOVE_ENTRIES, in);
 }
 
-void cls_rgw_gc_queue_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
+void cls_rgw_gc_queue_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info)
 {
   bufferlist in;
   cls_rgw_gc_queue_defer_entry_op defer_op;
index 758c09b6aa112ce4ecd215b32d0a9303de3c7089..107bed4eb9e1df20e03233cb8c7ab5fd8cc98733 100644 (file)
@@ -8,10 +8,10 @@
 
 void cls_rgw_gc_queue_init(librados::ObjectWriteOperation& op, uint64_t size, uint64_t num_urgent_data_entries);
 int cls_rgw_gc_queue_get_capacity(librados::IoCtx& io_ctx, const string& oid, uint64_t& size);
-void cls_rgw_gc_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
+void cls_rgw_gc_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info);
 int cls_rgw_gc_queue_list_entries(librados::IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max, bool expired_only,
                     list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker);
 void cls_rgw_gc_queue_remove_entries(librados::ObjectWriteOperation& op, uint32_t num_entries);
-void cls_rgw_gc_queue_defer_entry(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
+void cls_rgw_gc_queue_defer_entry(librados::ObjectWriteOperation& op, uint32_t expiration_secs, const cls_rgw_gc_obj_info& info);
 
-#endif
\ No newline at end of file
+#endif
index e791939a5d2a3333f41b53d39f1ff157bf2f8975..f996c732af1f49b63c28895cafba6235a5428ad6 100644 (file)
@@ -9,9 +9,9 @@
 #include "cls/rgw/cls_rgw_client.h"
 #include "cls/rgw_gc/cls_rgw_gc_client.h"
 #include "cls/refcount/cls_refcount_client.h"
+#include "cls/version/cls_version_client.h"
 #include "rgw_perf_counters.h"
 #include "cls/lock/cls_lock_client.h"
-#include "cls/version/cls_version_client.h"
 #include "include/random.h"
 
 #include <list> // XXX
@@ -74,7 +74,7 @@ void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_gc_obj_info& info)
   cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
 }
 
-int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
+int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag)
 {
   ObjectWriteOperation op;
   cls_rgw_gc_obj_info info;
@@ -94,114 +94,121 @@ int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
   return store->gc_operate(obj_names[i], &op);
 }
 
-int RGWGC::defer_chain(const string& tag, cls_rgw_obj_chain& chain, bool sync)
-{
+struct defer_chain_state {
+  librados::AioCompletion* completion = nullptr;
+  // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
+  // RGWGC destructs before this completion fires
+  RGWGC* gc = nullptr;
   cls_rgw_gc_obj_info info;
-  info.chain = chain;
-  info.tag = tag;
 
-  int i = tag_index(tag);
+  ~defer_chain_state() {
+    if (completion) {
+      completion->release();
+    }
+  }
+};
+
+static void async_defer_callback(librados::completion_t, void* arg)
+{
+  std::unique_ptr<defer_chain_state> state{static_cast<defer_chain_state*>(arg)};
+  if (state->completion->get_return_value() == -ECANCELED) {
+    state->gc->on_defer_canceled(state->info);
+  }
+}
+
+void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info)
+{
+  const std::string& tag = info.tag;
+  const int i = tag_index(tag);
+
+  // ECANCELED from cls_version_check() tells us that we've transitioned
+  transitioned_objects_cache[i] = true;
 
   ObjectWriteOperation op;
+  cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
+  cls_rgw_gc_remove(op, {tag});
 
-  obj_version objv;
-  objv.ver = 0;
-  cls_version_check(op, objv, VER_COND_EQ);
-  cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
-  if (sync) {
-    auto ret = store->gc_operate(obj_names[i], &op);
-    if (ret != -ECANCELED && ret != -EPERM) {
-      return ret;
-    }
-  } else {
-    AioCompletion *c;
-    auto ret = store->gc_aio_operate(obj_names[i], &op, &c);
-    c->wait_for_safe();
-    ret = c->get_return_value();
-    c->release();
-    if (ret != -ECANCELED && ret != -EPERM) {
-      return ret;
-    }
-  }
+  auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+  store->gc_aio_operate(obj_names[i], c, &op);
+  c->release();
+}
 
-  if (! transitioned_objects_cache[i]) {
-    cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
-  } else {
+int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain)
+{
+  const int i = tag_index(tag);
+
+  // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
+  if (transitioned_objects_cache[i]) {
+    cls_rgw_gc_obj_info info;
+    info.chain = chain;
+    info.tag = tag;
+
+    ObjectWriteOperation op;
     cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
-  }
-  if (sync) {
-    auto ret = store->gc_operate(obj_names[i], &op);
-    if ((ret < 0 && ret != -ENOENT)) {
-      return ret;
-    }
-    if (! transitioned_objects_cache[i]) {
-      //Successfully deferred
-      if (ret == 0) {
-        //Enqueue in queue
-        cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
-        //Remove tag from omap
-        vector<string> tags;
-        tags.emplace_back(tag);
-        cls_rgw_gc_remove(op, tags);
-        return store->gc_operate(obj_names[i], &op); //Enqueue, remove in one step
-      }
-      //If not found in omap, then the tag must be in queue
-      if (ret == -ENOENT) {
-        cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
-        return store->gc_operate(obj_names[i], &op);
-      }
-    }
-    if (transitioned_objects_cache[i]) {
-      return ret;
-    }
-  } else {
-    AioCompletion *c;
-    auto ret = store->gc_aio_operate(obj_names[i], &op, &c);
-    c->wait_for_safe();
-    ret = c->get_return_value();
+
+    // this tag may still be present in omap, so remove it once the cls_rgw_gc
+    // enqueue succeeds
+    cls_rgw_gc_remove(op, {tag});
+
+    auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+    int ret = store->gc_aio_operate(obj_names[i], c, &op);
     c->release();
-    if ((ret < 0 && ret != -ENOENT)) {
-      return ret;
-    }
-    if (! transitioned_objects_cache[i]) {
-      //Successfully deferred
-      if (ret == 0) {
-        //Enqueue in queue
-        cls_rgw_gc_queue_enqueue(op, cct->_conf->rgw_gc_obj_min_wait, info);
-        //Remove tag from omap
-        vector<string> tags;
-        tags.emplace_back(tag);
-        cls_rgw_gc_remove(op, tags);
-      }
-      //If not found in omap, then the tag must be in queue
-      if (ret == -ENOENT) {
-        cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
-      }
-      ret = store->gc_aio_operate(obj_names[i], &op, &c);
-      c->wait_for_safe();
-      ret = c->get_return_value();
-      c->release();
-      return ret;
-    }
-    if (transitioned_objects_cache[i]) {
-      return ret;
-    }
+    return ret;
   }
-  return 0;
+
+  // if we haven't seen the transition yet, write the defer to omap with cls_rgw
+  ObjectWriteOperation op;
+
+  // assert that we haven't initialized cls_rgw_gc queue. this prevents us
+  // from writing new entries to omap after the transition
+  obj_version objv; // objv.ver = 0
+  cls_version_check(op, objv, VER_COND_EQ);
+
+  cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
+
+  // prepare a callback to detect the transition via ECANCELED from cls_version_check()
+  auto state = std::make_unique<defer_chain_state>();
+  state->gc = this;
+  state->info.chain = chain;
+  state->info.tag = tag;
+  state->completion = librados::Rados::aio_create_completion(
+      state.get(), async_defer_callback, nullptr);
+
+  int ret = store->gc_aio_operate(obj_names[i], state->completion, &op);
+  if (ret == 0) {
+    state.release(); // release ownership until async_defer_callback()
+  }
+  return ret;
 }
 
 int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
 {
   ObjectWriteOperation op;
   cls_rgw_gc_remove(op, tags);
-  return store->gc_aio_operate(obj_names[index], &op, pc);
+
+  auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+  int ret = store->gc_aio_operate(obj_names[index], c, &op);
+  if (ret < 0) {
+    c->release();
+  } else {
+    *pc = c;
+  }
+  return ret;
 }
 
 int RGWGC::remove(int index, int num_entries, librados::AioCompletion **pc)
 {
   ObjectWriteOperation op;
   cls_rgw_gc_queue_remove_entries(op, num_entries);
-  return store->gc_aio_operate(obj_names[index], &op, pc);
+
+  auto c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+  int ret = store->gc_aio_operate(obj_names[index], c, &op);
+  if (ret < 0) {
+    c->release();
+  } else {
+    *pc = c;
+  }
+  return ret;
 }
 
 int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
index e21914a82830ea7c5a4517f8ba56e032fc134cca..fd76f4cd3e684f7f472c9b67ba987562f129ff25 100644 (file)
@@ -49,8 +49,14 @@ public:
   }
   vector<bool> transitioned_objects_cache;
   void add_chain(librados::ObjectWriteOperation& op, cls_rgw_gc_obj_info& info);
-  int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync);
-  int defer_chain(const string& tag, cls_rgw_obj_chain& info, bool sync);
+  int send_chain(cls_rgw_obj_chain& chain, const string& tag);
+
+  // asynchronously defer garbage collection on an object that's still being read
+  int async_defer_chain(const string& tag, const cls_rgw_obj_chain& info);
+
+  // callback for when async_defer_chain() fails with ECANCELED
+  void on_defer_canceled(const cls_rgw_gc_obj_info& info);
+
   int remove(int index, const std::vector<string>& tags, librados::AioCompletion **pc);
   int remove(int index, int num_entries, librados::AioCompletion **pc);
 
index e95e97d1aa765735a970f8c1ddd59d40f42116b7..03138750d8ae8831eefde0d30a98a07c3dd576ab 100644 (file)
@@ -253,7 +253,8 @@ int abort_multipart_upload(rgw::sal::RGWRadosStore *store, CephContext *cct,
   } while (truncated);
 
   /* use upload id as tag and do it asynchronously */
-  ret = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id(), false);
+  ret = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id());
+  // XXX: should detect ENOSPC and delete inline
   if (ret < 0) {
     ldout(cct, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
     return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
index 8b9073a582c05233221be5d42ddaf66e720bfc96..d80ee75da8c16a4da252f45956ed81ceb5c24a40 100644 (file)
@@ -4487,7 +4487,7 @@ int RGWRados::Object::complete_atomic_modification()
   }
 
   string tag = (state->tail_tag.length() > 0 ? state->tail_tag.to_str() : state->obj_tag.to_str());
-  return store->gc->send_chain(chain, tag, false);  // do it async
+  return store->gc->send_chain(chain, tag);  // do it sync
 }
 
 void RGWRados::update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain)
@@ -4504,9 +4504,9 @@ void RGWRados::update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_
   }
 }
 
-int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool sync)
+int RGWRados::send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag)
 {
-  return gc->send_chain(chain, tag, sync);
+  return gc->send_chain(chain, tag);
 }
 
 static void accumulate_raw_stats(const rgw_bucket_dir_header& header,
@@ -4620,7 +4620,7 @@ int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_ob
 
   cls_rgw_obj_chain chain;
   update_gc_chain(state->obj, *state->manifest, &chain);
-  return gc->defer_chain(tag, chain, true);
+  return gc->async_defer_chain(tag, chain);
 }
 
 void RGWRados::remove_rgw_head_obj(ObjectWriteOperation& op)
@@ -7775,16 +7775,10 @@ int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
   return rgw_rados_operate(gc_pool_ctx, oid, op, null_yield);
 }
 
-int RGWRados::gc_aio_operate(string& oid, librados::ObjectWriteOperation *op, AioCompletion **pc)
+int RGWRados::gc_aio_operate(const string& oid, librados::AioCompletion *c,
+                             librados::ObjectWriteOperation *op)
 {
-  AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
-  int r = gc_pool_ctx.aio_operate(oid, c, op);
-  if (!pc) {
-    c->release();
-  } else {
-    *pc = c;
-  }
-  return r;
+  return gc_pool_ctx.aio_operate(oid, c, op);
 }
 
 int RGWRados::gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl)
index e3017e785a1f0c67e63f02cd430b6f857e381219..e7d3d83d0249bd40d0c43891de8d72e6b7dcdd47 100644 (file)
@@ -1390,9 +1390,10 @@ public:
   int unlock(const rgw_pool& pool, const string& oid, string& zone_id, string& owner_id);
 
   void update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain);
-  int send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool sync);
+  int send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag);
   int gc_operate(string& oid, librados::ObjectWriteOperation *op);
-  int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op, librados::AioCompletion **pc = nullptr);
+  int gc_aio_operate(const std::string& oid, librados::AioCompletion *c,
+                     librados::ObjectWriteOperation *op);
   int gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl);
 
   int list_gc_objs(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated);