]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: concurrency for multi object deletes
authorCory Snyder <csnyder@iland.com>
Wed, 26 Oct 2022 20:14:55 +0000 (16:14 -0400)
committerCory Snyder <csnyder@iland.com>
Mon, 31 Oct 2022 19:29:48 +0000 (19:29 +0000)
Enables concurrent deletes of individual objects for multi-object
delete calls when executed with asio.

Fixes: https://tracker.ceph.com/issues/57947
Signed-off-by: Cory Snyder <csnyder@iland.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index 90c40778f217e699a5ae72e12b2d72bf76437c15..9dba03d0ed0813392f48d13d2be684e96cdc181e 100644 (file)
@@ -12,6 +12,7 @@
 #include <boost/algorithm/string/predicate.hpp>
 #include <boost/optional.hpp>
 #include <boost/utility/in_place_factory.hpp>
+#include <boost/asio.hpp>
 
 #include "include/scope_guard.h"
 #include "common/Clock.h"
@@ -6838,12 +6839,173 @@ void RGWDeleteMultiObj::write_ops_log_entry(rgw_log_entry& entry) const {
   entry.delete_multi_obj_meta.objects = std::move(ops_log_entries);
 }
 
+void RGWDeleteMultiObj::wait_flush(optional_yield y, size_t n)
+{
+  if (y) {
+    if (ops_log_entries.size() == n) {
+      rgw_flush_formatter(s, s->formatter);
+      return;
+    }
+    auto yc = y.get_yield_context();
+    for (;;) {
+      boost::system::error_code error;
+      formatter_flush_cond->async_wait(yc[error]);
+      rgw_flush_formatter(s, s->formatter);
+      if (ops_log_entries.size() == n) {
+        break;
+      }
+    }
+  }
+}
+
+void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key *o, optional_yield y)
+{
+  std::string version_id;
+  std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(*o);
+  if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) {
+    auto identity_policy_res = eval_identity_or_session_policies(this, s->iam_user_policies, s->env,
+                                                                 o->instance.empty() ?
+                                                                 rgw::IAM::s3DeleteObject :
+                                                                 rgw::IAM::s3DeleteObjectVersion,
+                                                                 ARN(obj->get_obj()));
+    if (identity_policy_res == Effect::Deny) {
+      send_partial_response(*o, false, "", -EACCES);
+      return;
+    }
+
+    rgw::IAM::Effect e = Effect::Pass;
+    rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
+    if (s->iam_policy) {
+      ARN obj_arn(obj->get_obj());
+      e = s->iam_policy->eval(s->env,
+                              *s->auth.identity,
+                              o->instance.empty() ?
+                              rgw::IAM::s3DeleteObject :
+                              rgw::IAM::s3DeleteObjectVersion,
+                              obj_arn,
+                              princ_type);
+    }
+    if (e == Effect::Deny) {
+      send_partial_response(*o, false, "", -EACCES);
+      return;
+    }
+
+    if (!s->session_policies.empty()) {
+      auto session_policy_res = eval_identity_or_session_policies(this, s->session_policies, s->env,
+                                                                  o->instance.empty() ?
+                                                                  rgw::IAM::s3DeleteObject :
+                                                                  rgw::IAM::s3DeleteObjectVersion,
+                                                                  ARN(obj->get_obj()));
+      if (session_policy_res == Effect::Deny) {
+        send_partial_response(*o, false, "", -EACCES);
+        return;
+      }
+      if (princ_type == rgw::IAM::PolicyPrincipal::Role) {
+        //Intersection of session policy and identity policy plus intersection of session policy and bucket policy
+        if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) &&
+            (session_policy_res != Effect::Allow || e != Effect::Allow)) {
+          send_partial_response(*o, false, "", -EACCES);
+          return;
+        }
+      } else if (princ_type == rgw::IAM::PolicyPrincipal::Session) {
+        //Intersection of session policy and identity policy plus bucket policy
+        if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) {
+          send_partial_response(*o, false, "", -EACCES);
+          return;
+        }
+      } else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy
+        if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) {
+          send_partial_response(*o, false, "", -EACCES);
+          return;
+        }
+      }
+      send_partial_response(*o, false, "", -EACCES);
+      return;
+    }
+
+    if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
+      send_partial_response(*o, false, "", -EACCES);
+      return;
+    }
+  }
+
+  uint64_t obj_size = 0;
+  std::string etag;
+
+  if (!rgw::sal::Object::empty(obj.get())) {
+    RGWObjState* astate = nullptr;
+    bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
+    const auto ret = obj->get_obj_state(this, &astate, y, true);
+
+    if (ret < 0) {
+      if (ret == -ENOENT) {
+        // object maybe delete_marker, skip check_obj_lock
+        check_obj_lock = false;
+      } else {
+        // Something went wrong.
+        send_partial_response(*o, false, "", ret);
+        return;
+      }
+    } else {
+      obj_size = astate->size;
+      etag = astate->attrset[RGW_ATTR_ETAG].to_str();
+    }
+
+    if (check_obj_lock) {
+      ceph_assert(astate);
+      int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
+      if (object_lock_response != 0) {
+        send_partial_response(*o, false, "", object_lock_response);
+        return;
+      }
+    }
+  }
+
+  // make reservation for notification if needed
+  const auto versioned_object = s->bucket->versioning_enabled();
+  const auto event_type = versioned_object && obj->get_instance().empty() ?
+                          rgw::notify::ObjectRemovedDeleteMarkerCreated :
+                          rgw::notify::ObjectRemovedDelete;
+  std::unique_ptr<rgw::sal::Notification> res
+          = store->get_notification(obj.get(), s->src_object.get(), s, event_type);
+  op_ret = res->publish_reserve(this);
+  if (op_ret < 0) {
+    send_partial_response(*o, false, "", op_ret);
+    return;
+  }
+
+  obj->set_atomic();
+
+  std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op();
+  del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status();
+  del_op->params.obj_owner = s->owner;
+  del_op->params.bucket_owner = s->bucket_owner;
+  del_op->params.marker_version_id = version_id;
+
+  op_ret = del_op->delete_obj(this, y);
+  if (op_ret == -ENOENT) {
+    op_ret = 0;
+  }
+
+  send_partial_response(*o, obj->get_delete_marker(), del_op->result.version_id, op_ret);
+
+  // send request to notification manager
+  int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
+  if (ret < 0) {
+    ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+    // too late to rollback operation, hence op_ret is not set here
+  }
+}
+
 void RGWDeleteMultiObj::execute(optional_yield y)
 {
   RGWMultiDelDelete *multi_delete;
   vector<rgw_obj_key>::iterator iter;
   RGWMultiDelXMLParser parser;
   char* buf;
+  if (y) {
+    formatter_flush_cond = std::make_unique<boost::asio::deadline_timer>(y.get_io_context());  
+  }
 
   buf = data.c_str();
   if (!buf) {
@@ -6904,143 +7066,18 @@ void RGWDeleteMultiObj::execute(optional_yield y)
   for (iter = multi_delete->objects.begin();
         iter != multi_delete->objects.end();
         ++iter) {
-    std::string version_id;
-    std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(*iter);
-    if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) {
-      auto identity_policy_res = eval_identity_or_session_policies(this, s->iam_user_policies, s->env,
-                                              iter->instance.empty() ?
-                                              rgw::IAM::s3DeleteObject :
-                                              rgw::IAM::s3DeleteObjectVersion,
-                                              ARN(obj->get_obj()));
-      if (identity_policy_res == Effect::Deny) {
-        send_partial_response(*iter, false, "", -EACCES);
-        continue;
-      }
-
-      rgw::IAM::Effect e = Effect::Pass;
-      rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
-      if (s->iam_policy) {
-        ARN obj_arn(obj->get_obj());
-        e = s->iam_policy->eval(s->env,
-                                  *s->auth.identity,
-                                  iter->instance.empty() ?
-                                  rgw::IAM::s3DeleteObject :
-                                  rgw::IAM::s3DeleteObjectVersion,
-                                  obj_arn,
-           princ_type);
-      }
-      if (e == Effect::Deny) {
-        send_partial_response(*iter, false, "", -EACCES);
-             continue;
-      }
-
-      if (!s->session_policies.empty()) {
-        auto session_policy_res = eval_identity_or_session_policies(this, s->session_policies, s->env,
-                                              iter->instance.empty() ?
-                                              rgw::IAM::s3DeleteObject :
-                                              rgw::IAM::s3DeleteObjectVersion,
-                                              ARN(obj->get_obj()));
-        if (session_policy_res == Effect::Deny) {
-          send_partial_response(*iter, false, "", -EACCES);
-               continue;
-        }
-        if (princ_type == rgw::IAM::PolicyPrincipal::Role) {
-          //Intersection of session policy and identity policy plus intersection of session policy and bucket policy
-          if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) &&
-              (session_policy_res != Effect::Allow || e != Effect::Allow)) {
-            send_partial_response(*iter, false, "", -EACCES);
-                 continue;
-          }
-        } else if (princ_type == rgw::IAM::PolicyPrincipal::Session) {
-          //Intersection of session policy and identity policy plus bucket policy
-          if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) {
-            send_partial_response(*iter, false, "", -EACCES);
-                 continue;
-          }
-        } else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy
-          if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) {
-            send_partial_response(*iter, false, "", -EACCES);
-                 continue;
-          }
-        }
-        send_partial_response(*iter, false, "", -EACCES);
-             continue;
-      }
-
-      if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
-             send_partial_response(*iter, false, "", -EACCES);
-             continue;
-      }
-    }
-
-    uint64_t obj_size = 0;
-    std::string etag;
-
-    if (!rgw::sal::Object::empty(obj.get())) {
-      RGWObjState* astate = nullptr;
-      bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
-      const auto ret = obj->get_obj_state(this, &astate, s->yield, true);
-
-      if (ret < 0) {
-        if (ret == -ENOENT) {
-          // object maybe delete_marker, skip check_obj_lock
-          check_obj_lock = false;
-        } else {
-          // Something went wrong.
-          send_partial_response(*iter, false, "", ret);
-          continue;
-        }
-      } else {
-        obj_size = astate->size;
-        etag = astate->attrset[RGW_ATTR_ETAG].to_str();
-      }
-
-      if (check_obj_lock) {
-        ceph_assert(astate);
-        int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
-        if (object_lock_response != 0) {
-          send_partial_response(*iter, false, "", object_lock_response);
-          continue;
-        }
-      }
-    }
-
-    // make reservation for notification if needed
-    const auto versioned_object = s->bucket->versioning_enabled();
-    const auto event_type = versioned_object && obj->get_instance().empty() ?
-      rgw::notify::ObjectRemovedDeleteMarkerCreated :
-      rgw::notify::ObjectRemovedDelete;
-    std::unique_ptr<rgw::sal::Notification> res
-      = store->get_notification(obj.get(), s->src_object.get(), s, event_type);
-    op_ret = res->publish_reserve(this);
-    if (op_ret < 0) {
-      send_partial_response(*iter, false, "", op_ret);
-      continue;
-    }
-
-    obj->set_atomic();
-
-    std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op();
-    del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status();
-    del_op->params.obj_owner = s->owner;
-    del_op->params.bucket_owner = s->bucket_owner;
-    del_op->params.marker_version_id = version_id;
-
-    op_ret = del_op->delete_obj(this, y);
-    if (op_ret == -ENOENT) {
-      op_ret = 0;
-    }
-
-    send_partial_response(*iter, obj->get_delete_marker(), del_op->result.version_id, op_ret);
-
-    // send request to notification manager
-    int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
-    if (ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
-      // too late to rollback operation, hence op_ret is not set here
+    rgw_obj_key* obj_key = &*iter;
+    if (y) {
+      spawn::spawn(y.get_yield_context(), [this, &y, obj_key] (yield_context yield) {
+        handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }); 
+      }); 
+    } else {
+      handle_individual_object(obj_key, y);
     }
   }
 
+  wait_flush(y, multi_delete->objects.size());
+
   /*  set the return code to zero, errors at this point will be
   dumped to the response */
   op_ret = 0;
index bfcda73b43b72c191c114da3420dc06645aca13a..e7eee104db1a1dcff99e323b57fbfeb4649c1476 100644 (file)
@@ -2029,8 +2029,37 @@ public:
 
 
 class RGWDeleteMultiObj : public RGWOp {
+  /**
+   * Handles the deletion of an individual object and uses
+   * set_partial_response to record the outcome. 
+   */
+  void handle_individual_object(const rgw_obj_key *o, optional_yield y);
+  
+  /**
+   * When the request is being executed in a coroutine, performs
+   * the actual formatter flushing and is responsible for the
+   * termination condition (when when all partial object responses
+   * have been sent). Note that the formatter flushing must be handled
+   * on the coroutine that invokes the execute method vs. the 
+   * coroutines that are spawned to handle individual objects because
+   * the flush logic uses a yield context that was captured
+   * and saved on the req_state vs. one that is passed on the stack.
+   * This is a no-op in the case where we're not executing as a coroutine.
+   */
+  void wait_flush(optional_yield y, size_t n);
+
 protected:
   std::vector<delete_multi_obj_entry> ops_log_entries;
+
+  /**
+   * Acts as an async condition variable when the request is being
+   * executed on a coroutine. Formatter flushing must happen on the main
+   * request coroutine vs. spawned coroutines, so spawned coroutines use
+   * the cancellation of this timer to notify the main coroutine when
+   * data is ready to flush. 
+   */
+  std::unique_ptr<boost::asio::deadline_timer> formatter_flush_cond;
+  
   bufferlist data;
   rgw::sal::Bucket* bucket;
   bool quiet;
@@ -2039,7 +2068,6 @@ protected:
   bool bypass_perm;
   bool bypass_governance_mode;
 
-
 public:
   RGWDeleteMultiObj() {
     quiet = false;
@@ -2047,6 +2075,7 @@ public:
     bypass_perm = true;
     bypass_governance_mode = false;
   }
+
   int verify_permission(optional_yield y) override;
   void pre_exec() override;
   void execute(optional_yield y) override;
@@ -2054,7 +2083,7 @@ public:
   virtual int get_params(optional_yield y) = 0;
   virtual void send_status() = 0;
   virtual void begin_response() = 0;
-  virtual void send_partial_response(rgw_obj_key& key, bool delete_marker,
+  virtual void send_partial_response(const rgw_obj_key& key, bool delete_marker,
                                      const std::string& marker_version_id, int ret) = 0;
   virtual void end_response() = 0;
   const char* name() const override { return "multi_object_delete"; }
index 47162e7dec10ad7fb7ea9a25e8142da53916f5e0..0393ef3df866f6cbe4a5e0455f6fe4fd42aa2b1b 100644 (file)
@@ -5307,7 +5307,7 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
   store->remove_rgw_head_obj(op);
 
   auto& ioctx = ref.pool.ioctx();
-  r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, null_yield);
+  r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
 
   /* raced with another operation, object state is indeterminate */
   const bool need_invalidate = (r == -ECANCELED);
@@ -7765,7 +7765,7 @@ int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp,
     op.read(0, cct->_conf->rgw_max_chunk_size, first_chunk, NULL);
   }
   bufferlist outbl;
-  r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, &outbl, null_yield);
+  r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, &outbl, y);
 
   if (epoch) {
     *epoch = ref.pool.ioctx().get_last_version();
index 4de7f604e14a0b385fef7f56ce6d2d1ce137d219..4038d3add04b425d2f3029abe0d0be5f8c086aca 100644 (file)
@@ -4138,9 +4138,10 @@ void RGWDeleteMultiObj_ObjStore_S3::begin_response()
   rgw_flush_formatter(s, s->formatter);
 }
 
-void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
+void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key,
                                                          bool delete_marker,
-                                                         const string& marker_version_id, int ret)
+                                                         const string& marker_version_id,
+                                                          int ret)
 {
   if (!key.empty()) {
     delete_multi_obj_entry ops_log_entry;
@@ -4186,7 +4187,11 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
     }
 
     ops_log_entries.push_back(std::move(ops_log_entry));
-    rgw_flush_formatter(s, s->formatter);
+    if (formatter_flush_cond) {
+      formatter_flush_cond->cancel();
+    } else {
+      rgw_flush_formatter(s, s->formatter);
+    }
   }
 }
 
index adff0e12ecc9bb12e092cdc6696b620b13e65a2a..8f304ce941d112753758c2823a183a343ab3bdb3 100644 (file)
@@ -516,7 +516,7 @@ public:
   int get_params(optional_yield y) override;
   void send_status() override;
   void begin_response() override;
-  void send_partial_response(rgw_obj_key& key, bool delete_marker,
+  void send_partial_response(const rgw_obj_key& key, bool delete_marker,
                              const std::string& marker_version_id, int ret) override;
   void end_response() override;
 };