]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWDeleteMultiObj uses spawn_throttle for concurrency
authorCasey Bodley <cbodley@redhat.com>
Thu, 9 May 2024 17:50:06 +0000 (13:50 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 31 May 2024 15:23:43 +0000 (11:23 -0400)
adapt RGWDeleteMultiObj to use ceph::async::spawn_throttle for the
handle_individual_object() coroutines it spawns

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index e1c7ee484f4c2da600ea9e532ba3c897a35459b8..d93cc506ffca490cd4bff11eb48c5aaa0c7f0662 100644 (file)
@@ -16,6 +16,7 @@
 #include "include/scope_guard.h"
 #include "common/Clock.h"
 #include "common/armor.h"
+#include "common/async/spawn_throttle.h"
 #include "common/errno.h"
 #include "common/mime.h"
 #include "common/utf8.h"
@@ -6671,26 +6672,11 @@ void RGWDeleteMultiObj::write_ops_log_entry(rgw_log_entry& entry) const {
   entry.delete_multi_obj_meta.objects = std::move(ops_log_entries);
 }
 
-void RGWDeleteMultiObj::wait_flush(optional_yield y,
-                                   boost::asio::deadline_timer *formatter_flush_cond,
-                                  std::function<bool()> predicate)
-{
-  if (y && formatter_flush_cond) {
-    auto yc = y.get_yield_context();
-    while (!predicate()) {
-      boost::system::error_code error;
-      formatter_flush_cond->async_wait(yc[error]);
-      rgw_flush_formatter(s, s->formatter);
-    }
-  }
-}
-
-void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y,
-                                                 boost::asio::deadline_timer *formatter_flush_cond)
+void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y)
 {
   std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(o);
   if (o.empty()) {
-    send_partial_response(o, false, "", -EINVAL, formatter_flush_cond);
+    send_partial_response(o, false, "", -EINVAL);
     return;
   }
 
@@ -6702,7 +6688,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
                                 s->bucket_acl, s->iam_policy,
                                 s->iam_identity_policies,
                                 s->session_policies, action)) {
-    send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
+    send_partial_response(o, false, "", -EACCES);
     return;
   }
 
@@ -6720,7 +6706,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
         check_obj_lock = false;
       } else {
         // Something went wrong.
-        send_partial_response(o, false, "", ret, formatter_flush_cond);
+        send_partial_response(o, false, "", ret);
         return;
       }
     } else {
@@ -6732,7 +6718,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
       ceph_assert(state_loaded == 0);
       int object_lock_response = verify_object_lock(this, obj->get_attrs(), bypass_perm, bypass_governance_mode);
       if (object_lock_response != 0) {
-        send_partial_response(o, false, "", object_lock_response, formatter_flush_cond);
+        send_partial_response(o, false, "", object_lock_response);
         return;
       }
     }
@@ -6747,7 +6733,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
           = driver->get_notification(obj.get(), s->src_object.get(), s, event_type, y);
   op_ret = res->publish_reserve(this);
   if (op_ret < 0) {
-    send_partial_response(o, false, "", op_ret, formatter_flush_cond);
+    send_partial_response(o, false, "", op_ret);
     return;
   }
 
@@ -6773,22 +6759,16 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
     }
   }
   
-  send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret, formatter_flush_cond);
+  send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret);
 }
 
 void RGWDeleteMultiObj::execute(optional_yield y)
 {
   RGWMultiDelDelete *multi_delete;
-  vector<rgw_obj_key>::iterator iter;
   RGWMultiDelXMLParser parser;
-  uint32_t aio_count = 0;
   const uint32_t max_aio = std::max<uint32_t>(1, s->cct->_conf->rgw_multi_obj_del_max_aio);
+  auto group = ceph::async::spawn_throttle{y, max_aio};
   char* buf;
-  std::optional<boost::asio::deadline_timer> formatter_flush_cond;
-  if (y) {
-    auto ex = y.get_yield_context().get_executor();
-    formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
-  }
 
   buf = data.c_str();
   if (!buf) {
@@ -6842,40 +6822,22 @@ void RGWDeleteMultiObj::execute(optional_yield y)
   }
 
   begin_response();
-  if (multi_delete->objects.empty()) {
-    goto done;
-  }
 
-  for (iter = multi_delete->objects.begin();
-        iter != multi_delete->objects.end();
-        ++iter) {
-    rgw_obj_key obj_key = *iter;
-    if (y) {
-      wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] {
-        return aio_count < max_aio;
-      });
-      aio_count++;
-      boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) {
-        handle_individual_object(obj_key, yield, &*formatter_flush_cond);
-        aio_count--;
-      }, [] (std::exception_ptr eptr) {
-        if (eptr) std::rethrow_exception(eptr);
-      }); 
-    } else {
-      handle_individual_object(obj_key, y, nullptr);
-    }
-  }
-  if (formatter_flush_cond) {
-    wait_flush(y, &*formatter_flush_cond, [this, n=multi_delete->objects.size()] {
-      return n == ops_log_entries.size();
-    });
+  // process up to max_aio object deletes in parallel
+  for (const auto& key : multi_delete->objects) {
+    boost::asio::spawn(group.get_executor(),
+                       [this, &key] (boost::asio::yield_context yield) {
+                         handle_individual_object(key, yield);
+                       }, group);
+
+    rgw_flush_formatter(s, s->formatter);
   }
+  group.wait();
 
   /*  set the return code to zero, errors at this point will be
   dumped to the response */
   op_ret = 0;
 
-done:
   // will likely segfault if begin_response() has not been called
   end_response();
   return;
index 8fe5540e96d94aa03c0bd77bc100f6434908fa38..ff99f84bd62d17b906b795bbc1c51c4318cf4fea 100644 (file)
@@ -2027,24 +2027,7 @@ class RGWDeleteMultiObj : public RGWOp {
    * Handles the deletion of an individual object and uses
    * set_partial_response to record the outcome.
    */
-  void handle_individual_object(const rgw_obj_key& o,
-                               optional_yield y,
-                                boost::asio::deadline_timer *formatter_flush_cond);
-
-  /**
-   * When the request is being executed in a coroutine, performs
-   * the actual formatter flushing and is responsible for the
-   * termination condition (when when all partial object responses
-   * have been sent). Note that the formatter flushing must be handled
-   * on the coroutine that invokes the execute method vs. the
-   * coroutines that are spawned to handle individual objects because
-   * the flush logic uses a yield context that was captured
-   * and saved on the req_state vs. one that is passed on the stack.
-   * This is a no-op in the case where we're not executing as a coroutine.
-   */
-  void wait_flush(optional_yield y,
-                  boost::asio::deadline_timer *formatter_flush_cond,
-                  std::function<bool()> predicate);
+  void handle_individual_object(const rgw_obj_key& o, optional_yield y);
 
 protected:
   std::vector<delete_multi_obj_entry> ops_log_entries;
@@ -2072,8 +2055,8 @@ public:
   virtual void send_status() = 0;
   virtual void begin_response() = 0;
   virtual void send_partial_response(const rgw_obj_key& key, bool delete_marker,
-                                     const std::string& marker_version_id, int ret,
-                                     boost::asio::deadline_timer *formatter_flush_cond) = 0;
+                                     const std::string& marker_version_id,
+                                     int ret) = 0;
   virtual void end_response() = 0;
   const char* name() const override { return "multi_object_delete"; }
   RGWOpType get_type() override { return RGW_OP_DELETE_MULTI_OBJ; }
index 171ace9162ffcfff88c14bca3d7792fce4e49e31..40dee2e2398a6b74beab61494ab70fb94790d701 100644 (file)
@@ -4228,8 +4228,7 @@ void RGWDeleteMultiObj_ObjStore_S3::begin_response()
 void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key,
                                                          bool delete_marker,
                                                          const string& marker_version_id,
-                                                          int ret,
-                                                          boost::asio::deadline_timer *formatter_flush_cond)
+                                                          int ret)
 {
   if (!key.empty()) {
     delete_multi_obj_entry ops_log_entry;
@@ -4275,17 +4274,11 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key
     }
 
     ops_log_entries.push_back(std::move(ops_log_entry));
-    if (formatter_flush_cond) {
-      formatter_flush_cond->cancel();
-    } else {
-      rgw_flush_formatter(s, s->formatter);
-    }
   }
 }
 
 void RGWDeleteMultiObj_ObjStore_S3::end_response()
 {
-
   s->formatter->close_section();
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
index d15ddaba35aebefda48c26ab229c322460ea09c5..dba32471745054c49740cbcb450f0e02bfe2b3d5 100644 (file)
@@ -518,8 +518,8 @@ public:
   void send_status() override;
   void begin_response() override;
   void send_partial_response(const rgw_obj_key& key, bool delete_marker,
-                             const std::string& marker_version_id, int ret,
-                             boost::asio::deadline_timer *formatter_flush_cond) override;
+                             const std::string& marker_version_id,
+                             int ret) override;
   void end_response() override;
 };