From e571752d458cecfaf0f70b01b4775efb64d959b7 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 9 May 2024 13:50:06 -0400 Subject: [PATCH] rgw: RGWDeleteMultiObj uses spawn_throttle for concurrency adapt RGWDeleteMultiObj to use ceph::async::spawn_throttle for the handle_individual_object() coroutines it spawns Signed-off-by: Casey Bodley --- src/rgw/rgw_op.cc | 74 ++++++++++-------------------------------- src/rgw/rgw_op.h | 23 ++----------- src/rgw/rgw_rest_s3.cc | 9 +---- src/rgw/rgw_rest_s3.h | 4 +-- 4 files changed, 24 insertions(+), 86 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index e1c7ee484f4c2..d93cc506ffca4 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -16,6 +16,7 @@ #include "include/scope_guard.h" #include "common/Clock.h" #include "common/armor.h" +#include "common/async/spawn_throttle.h" #include "common/errno.h" #include "common/mime.h" #include "common/utf8.h" @@ -6671,26 +6672,11 @@ void RGWDeleteMultiObj::write_ops_log_entry(rgw_log_entry& entry) const { entry.delete_multi_obj_meta.objects = std::move(ops_log_entries); } -void RGWDeleteMultiObj::wait_flush(optional_yield y, - boost::asio::deadline_timer *formatter_flush_cond, - std::function predicate) -{ - if (y && formatter_flush_cond) { - auto yc = y.get_yield_context(); - while (!predicate()) { - boost::system::error_code error; - formatter_flush_cond->async_wait(yc[error]); - rgw_flush_formatter(s, s->formatter); - } - } -} - -void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y, - boost::asio::deadline_timer *formatter_flush_cond) +void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y) { std::unique_ptr obj = bucket->get_object(o); if (o.empty()) { - send_partial_response(o, false, "", -EINVAL, formatter_flush_cond); + send_partial_response(o, false, "", -EINVAL); return; } @@ -6702,7 +6688,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_ s->bucket_acl, s->iam_policy, s->iam_identity_policies, s->session_policies, action)) { - send_partial_response(o, false, "", -EACCES, formatter_flush_cond); + send_partial_response(o, false, "", -EACCES); return; } @@ -6720,7 +6706,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_ check_obj_lock = false; } else { // Something went wrong. - send_partial_response(o, false, "", ret, formatter_flush_cond); + send_partial_response(o, false, "", ret); return; } } else { @@ -6732,7 +6718,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_ ceph_assert(state_loaded == 0); int object_lock_response = verify_object_lock(this, obj->get_attrs(), bypass_perm, bypass_governance_mode); if (object_lock_response != 0) { - send_partial_response(o, false, "", object_lock_response, formatter_flush_cond); + send_partial_response(o, false, "", object_lock_response); return; } } @@ -6747,7 +6733,7 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_ = driver->get_notification(obj.get(), s->src_object.get(), s, event_type, y); op_ret = res->publish_reserve(this); if (op_ret < 0) { - send_partial_response(o, false, "", op_ret, formatter_flush_cond); + send_partial_response(o, false, "", op_ret); return; } @@ -6773,22 +6759,16 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_ } } - send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret, formatter_flush_cond); + send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret); } void RGWDeleteMultiObj::execute(optional_yield y) { RGWMultiDelDelete *multi_delete; - vector::iterator iter; RGWMultiDelXMLParser parser; - uint32_t aio_count = 0; const uint32_t max_aio = std::max(1, s->cct->_conf->rgw_multi_obj_del_max_aio); + auto group = ceph::async::spawn_throttle{y, max_aio}; char* buf; - std::optional formatter_flush_cond; - if (y) { - auto ex = y.get_yield_context().get_executor(); - formatter_flush_cond = std::make_optional(ex); - } buf = data.c_str(); if (!buf) { @@ -6842,40 +6822,22 @@ void RGWDeleteMultiObj::execute(optional_yield y) } begin_response(); - if (multi_delete->objects.empty()) { - goto done; - } - for (iter = multi_delete->objects.begin(); - iter != multi_delete->objects.end(); - ++iter) { - rgw_obj_key obj_key = *iter; - if (y) { - wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] { - return aio_count < max_aio; - }); - aio_count++; - boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) { - handle_individual_object(obj_key, yield, &*formatter_flush_cond); - aio_count--; - }, [] (std::exception_ptr eptr) { - if (eptr) std::rethrow_exception(eptr); - }); - } else { - handle_individual_object(obj_key, y, nullptr); - } - } - if (formatter_flush_cond) { - wait_flush(y, &*formatter_flush_cond, [this, n=multi_delete->objects.size()] { - return n == ops_log_entries.size(); - }); + // process up to max_aio object deletes in parallel + for (const auto& key : multi_delete->objects) { + boost::asio::spawn(group.get_executor(), + [this, &key] (boost::asio::yield_context yield) { + handle_individual_object(key, yield); + }, group); + + rgw_flush_formatter(s, s->formatter); } + group.wait(); /* set the return code to zero, errors at this point will be dumped to the response */ op_ret = 0; -done: // will likely segfault if begin_response() has not been called end_response(); return; diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 8fe5540e96d94..ff99f84bd62d1 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -2027,24 +2027,7 @@ class RGWDeleteMultiObj : public RGWOp { * Handles the deletion of an individual object and uses * set_partial_response to record the outcome. */ - void handle_individual_object(const rgw_obj_key& o, - optional_yield y, - boost::asio::deadline_timer *formatter_flush_cond); - - /** - * When the request is being executed in a coroutine, performs - * the actual formatter flushing and is responsible for the - * termination condition (when when all partial object responses - * have been sent). Note that the formatter flushing must be handled - * on the coroutine that invokes the execute method vs. the - * coroutines that are spawned to handle individual objects because - * the flush logic uses a yield context that was captured - * and saved on the req_state vs. one that is passed on the stack. - * This is a no-op in the case where we're not executing as a coroutine. - */ - void wait_flush(optional_yield y, - boost::asio::deadline_timer *formatter_flush_cond, - std::function predicate); + void handle_individual_object(const rgw_obj_key& o, optional_yield y); protected: std::vector ops_log_entries; @@ -2072,8 +2055,8 @@ public: virtual void send_status() = 0; virtual void begin_response() = 0; virtual void send_partial_response(const rgw_obj_key& key, bool delete_marker, - const std::string& marker_version_id, int ret, - boost::asio::deadline_timer *formatter_flush_cond) = 0; + const std::string& marker_version_id, + int ret) = 0; virtual void end_response() = 0; const char* name() const override { return "multi_object_delete"; } RGWOpType get_type() override { return RGW_OP_DELETE_MULTI_OBJ; } diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 171ace9162ffc..40dee2e2398a6 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -4228,8 +4228,7 @@ void RGWDeleteMultiObj_ObjStore_S3::begin_response() void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key, bool delete_marker, const string& marker_version_id, - int ret, - boost::asio::deadline_timer *formatter_flush_cond) + int ret) { if (!key.empty()) { delete_multi_obj_entry ops_log_entry; @@ -4275,17 +4274,11 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key } ops_log_entries.push_back(std::move(ops_log_entry)); - if (formatter_flush_cond) { - formatter_flush_cond->cancel(); - } else { - rgw_flush_formatter(s, s->formatter); - } } } void RGWDeleteMultiObj_ObjStore_S3::end_response() { - s->formatter->close_section(); rgw_flush_formatter_and_reset(s, s->formatter); } diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index d15ddaba35aeb..dba3247174505 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -518,8 +518,8 @@ public: void send_status() override; void begin_response() override; void send_partial_response(const rgw_obj_key& key, bool delete_marker, - const std::string& marker_version_id, int ret, - boost::asio::deadline_timer *formatter_flush_cond) override; + const std::string& marker_version_id, + int ret) override; void end_response() override; }; -- 2.39.5