From: Cory Snyder Date: Wed, 26 Oct 2022 20:14:55 +0000 (-0400) Subject: rgw: concurrency for multi object deletes X-Git-Tag: v16.2.13~36^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e1534a5722b5b01a1fc39c07e9fff7a8719b8549;p=ceph.git rgw: concurrency for multi object deletes Enables concurrent deletes of individual objects for multi-object delete calls when executed with asio. Fixes: https://tracker.ceph.com/issues/57947 Signed-off-by: Cory Snyder (cherry picked from commit 6b6592f50b6b81fa13a330bcb91273ba7f25c0c9) Conflicts: src/rgw/rgw_op.cc src/rgw/rgw_op.h src/rgw/rgw_rest_s3.h Cherry-pick notes: - Conflicts due to absence of std namespace prefix on Pacific - Conflicts due to name rgw::sal::RGWObject vs rgw::sal::Object --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index cc3ff80845d3..134e785f74e3 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include "include/scope_guard.h" #include "common/Clock.h" @@ -7034,6 +7035,158 @@ void RGWDeleteMultiObj::write_ops_log_entry(rgw_log_entry& entry) const { entry.delete_multi_obj_meta.objects = std::move(ops_log_entries); } +void RGWDeleteMultiObj::wait_flush(optional_yield y, size_t n) +{ + if (y) { + if (ops_log_entries.size() == n) { + rgw_flush_formatter(s, s->formatter); + return; + } + auto yc = y.get_yield_context(); + for (;;) { + boost::system::error_code error; + formatter_flush_cond->async_wait(yc[error]); + rgw_flush_formatter(s, s->formatter); + if (ops_log_entries.size() == n) { + break; + } + } + } +} + +void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key *o, optional_yield y) +{ + std::string version_id; + std::unique_ptr obj = bucket->get_object(*o); + if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) { + auto identity_policy_res = eval_identity_or_session_policies(s->iam_user_policies, s->env, + boost::none, + o->instance.empty() ? + rgw::IAM::s3DeleteObject : + rgw::IAM::s3DeleteObjectVersion, + ARN(obj->get_obj())); + if (identity_policy_res == Effect::Deny) { + send_partial_response(*o, false, "", -EACCES); + return; + } + + rgw::IAM::Effect e = Effect::Pass; + rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other; + if (s->iam_policy) { + e = s->iam_policy->eval(s->env, + *s->auth.identity, + o->instance.empty() ? + rgw::IAM::s3DeleteObject : + rgw::IAM::s3DeleteObjectVersion, + ARN(obj->get_obj()), + princ_type); + } + if (e == Effect::Deny) { + send_partial_response(*o, false, "", -EACCES); + return; + } + + if (!s->session_policies.empty()) { + auto session_policy_res = eval_identity_or_session_policies(s->session_policies, s->env, + boost::none, + o->instance.empty() ? + rgw::IAM::s3DeleteObject : + rgw::IAM::s3DeleteObjectVersion, + ARN(obj->get_obj())); + if (session_policy_res == Effect::Deny) { + send_partial_response(*o, false, "", -EACCES); + return; + } + if (princ_type == rgw::IAM::PolicyPrincipal::Role) { + //Intersection of session policy and identity policy plus intersection of session policy and bucket policy + if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && + (session_policy_res != Effect::Allow || e != Effect::Allow)) { + send_partial_response(*o, false, "", -EACCES); + return; + } + } else if (princ_type == rgw::IAM::PolicyPrincipal::Session) { + //Intersection of session policy and identity policy plus bucket policy + if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) { + send_partial_response(*o, false, "", -EACCES); + return; + } + } else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy + if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) { + send_partial_response(*o, false, "", -EACCES); + return; + } + } + send_partial_response(*o, false, "", -EACCES); + return; + } + + if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) { + send_partial_response(*o, false, "", -EACCES); + return; + } + } + + uint64_t obj_size = 0; + std::string etag; + + if (!rgw::sal::RGWObject::empty(obj.get())) { + RGWObjState* astate = nullptr; + bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled(); + const auto ret = obj->get_obj_state(this, obj_ctx, *bucket, &astate, y, true); + + if (ret < 0) { + if (ret == -ENOENT) { + // object maybe delete_marker, skip check_obj_lock + check_obj_lock = false; + } else { + // Something went wrong. + send_partial_response(*o, false, "", ret); + return; + } + } else { + obj_size = astate->size; + etag = astate->attrset[RGW_ATTR_ETAG].to_str(); + } + + if (check_obj_lock) { + ceph_assert(astate); + int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode); + if (object_lock_response != 0) { + send_partial_response(*o, false, "", object_lock_response); + return; + } + } + } + + // make reservation for notification if needed + const auto versioned_object = s->bucket->versioning_enabled(); + rgw::notify::reservation_t res(this, store, s, obj.get()); + const auto event_type = versioned_object && obj->get_instance().empty() ? + rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete; + op_ret = rgw::notify::publish_reserve(this, event_type, res, nullptr); + if (op_ret < 0) { + send_partial_response(*o, false, "", op_ret); + return; + } + + obj->set_atomic(obj_ctx); + + op_ret = obj->delete_object(this, obj_ctx, s->owner, s->bucket_owner, ceph::real_time(), + false, 0, version_id, y); + if (op_ret == -ENOENT) { + op_ret = 0; + } + + send_partial_response(*o, obj->get_delete_marker(), version_id, op_ret); + + // send request to notification manager + const auto ret = rgw::notify::publish_commit(obj.get(), obj_size, ceph::real_clock::now(), etag, event_type, res, this); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl; + // too late to rollback operation, hence op_ret is not set here + } +} + void RGWDeleteMultiObj::execute(optional_yield y) { RGWMultiDelDelete *multi_delete; @@ -7041,6 +7194,9 @@ void RGWDeleteMultiObj::execute(optional_yield y) RGWMultiDelXMLParser parser; RGWObjectCtx *obj_ctx = static_cast(s->obj_ctx); char* buf; + if (y) { + formatter_flush_cond = std::make_unique(y.get_io_context()); + } buf = data.c_str(); if (!buf) { @@ -7101,137 +7257,18 @@ void RGWDeleteMultiObj::execute(optional_yield y) for (iter = multi_delete->objects.begin(); iter != multi_delete->objects.end(); ++iter) { - std::string version_id; - std::unique_ptr obj = bucket->get_object(*iter); - if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) { - auto identity_policy_res = eval_identity_or_session_policies(s->iam_user_policies, s->env, - boost::none, - iter->instance.empty() ? - rgw::IAM::s3DeleteObject : - rgw::IAM::s3DeleteObjectVersion, - ARN(obj->get_obj())); - if (identity_policy_res == Effect::Deny) { - send_partial_response(*iter, false, "", -EACCES); - continue; - } - - rgw::IAM::Effect e = Effect::Pass; - rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other; - if (s->iam_policy) { - e = s->iam_policy->eval(s->env, - *s->auth.identity, - iter->instance.empty() ? - rgw::IAM::s3DeleteObject : - rgw::IAM::s3DeleteObjectVersion, - ARN(obj->get_obj()), - princ_type); - } - if (e == Effect::Deny) { - send_partial_response(*iter, false, "", -EACCES); - continue; - } - - if (!s->session_policies.empty()) { - auto session_policy_res = eval_identity_or_session_policies(s->session_policies, s->env, - boost::none, - iter->instance.empty() ? - rgw::IAM::s3DeleteObject : - rgw::IAM::s3DeleteObjectVersion, - ARN(obj->get_obj())); - if (session_policy_res == Effect::Deny) { - send_partial_response(*iter, false, "", -EACCES); - continue; - } - if (princ_type == rgw::IAM::PolicyPrincipal::Role) { - //Intersection of session policy and identity policy plus intersection of session policy and bucket policy - if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && - (session_policy_res != Effect::Allow || e != Effect::Allow)) { - send_partial_response(*iter, false, "", -EACCES); - continue; - } - } else if (princ_type == rgw::IAM::PolicyPrincipal::Session) { - //Intersection of session policy and identity policy plus bucket policy - if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) { - send_partial_response(*iter, false, "", -EACCES); - continue; - } - } else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy - if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) { - send_partial_response(*iter, false, "", -EACCES); - continue; - } - } - send_partial_response(*iter, false, "", -EACCES); - continue; - } - - if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) { - send_partial_response(*iter, false, "", -EACCES); - continue; - } - } - - uint64_t obj_size = 0; - std::string etag; - - if (!rgw::sal::RGWObject::empty(obj.get())) { - RGWObjState* astate = nullptr; - bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled(); - const auto ret = obj->get_obj_state(this, obj_ctx, *bucket, &astate, s->yield, true); - - if (ret < 0) { - if (ret == -ENOENT) { - // object maybe delete_marker, skip check_obj_lock - check_obj_lock = false; - } else { - // Something went wrong. - send_partial_response(*iter, false, "", ret); - continue; - } - } else { - obj_size = astate->size; - etag = astate->attrset[RGW_ATTR_ETAG].to_str(); - } - - if (check_obj_lock) { - ceph_assert(astate); - int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode); - if (object_lock_response != 0) { - send_partial_response(*iter, false, "", object_lock_response); - continue; - } - } - } - - // make reservation for notification if needed - const auto versioned_object = s->bucket->versioning_enabled(); - rgw::notify::reservation_t res(this, store, s, obj.get()); - const auto event_type = versioned_object && obj->get_instance().empty() ? - rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete; - op_ret = rgw::notify::publish_reserve(this, event_type, res, nullptr); - if (op_ret < 0) { - send_partial_response(*iter, false, "", op_ret); - continue; - } - - obj->set_atomic(obj_ctx); - - op_ret = obj->delete_object(this, obj_ctx, s->owner, s->bucket_owner, ceph::real_time(), - false, 0, version_id, s->yield); - if (op_ret == -ENOENT) { - op_ret = 0; - } - - send_partial_response(*iter, obj->get_delete_marker(), version_id, op_ret); - - // send request to notification manager - const auto ret = rgw::notify::publish_commit(obj.get(), obj_size, ceph::real_clock::now(), etag, event_type, res, this); - if (ret < 0) { - ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl; - // too late to rollback operation, hence op_ret is not set here + rgw_obj_key* obj_key = &*iter; + if (y) { + spawn::spawn(y.get_yield_context(), [this, &y, obj_key] (yield_context yield) { + handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }); + }); + } else { + handle_individual_object(obj_key, y); } } + wait_flush(y, multi_delete->objects.size()); + /* set the return code to zero, errors at this point will be dumped to the response */ op_ret = 0; diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 92a4afe7789c..3e49148e437e 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1907,8 +1907,37 @@ public: class RGWDeleteMultiObj : public RGWOp { + /** + * Handles the deletion of an individual object and uses + * set_partial_response to record the outcome. + */ + void handle_individual_object(const rgw_obj_key *o, optional_yield y); + + /** + * When the request is being executed in a coroutine, performs + * the actual formatter flushing and is responsible for the + * termination condition (when when all partial object responses + * have been sent). Note that the formatter flushing must be handled + * on the coroutine that invokes the execute method vs. the + * coroutines that are spawned to handle individual objects because + * the flush logic uses a yield context that was captured + * and saved on the req_state vs. one that is passed on the stack. + * This is a no-op in the case where we're not executing as a coroutine. + */ + void wait_flush(optional_yield y, size_t n); + protected: std::vector ops_log_entries; + + /** + * Acts as an async condition variable when the request is being + * executed on a coroutine. Formatter flushing must happen on the main + * request coroutine vs. spawned coroutines, so spawned coroutines use + * the cancellation of this timer to notify the main coroutine when + * data is ready to flush. + */ + std::unique_ptr formatter_flush_cond; + bufferlist data; rgw::sal::RGWBucket* bucket; bool quiet; @@ -1917,7 +1946,6 @@ protected: bool bypass_perm; bool bypass_governance_mode; - public: RGWDeleteMultiObj() { quiet = false; @@ -1925,6 +1953,7 @@ public: bypass_perm = true; bypass_governance_mode = false; } + int verify_permission(optional_yield y) override; void pre_exec() override; void execute(optional_yield y) override; @@ -1932,8 +1961,8 @@ public: virtual int get_params(optional_yield y) = 0; virtual void send_status() = 0; virtual void begin_response() = 0; - virtual void send_partial_response(rgw_obj_key& key, bool delete_marker, - const string& marker_version_id, int ret) = 0; + virtual void send_partial_response(const rgw_obj_key& key, bool delete_marker, + const std::string& marker_version_id, int ret) = 0; virtual void end_response() = 0; const char* name() const override { return "multi_object_delete"; } RGWOpType get_type() override { return RGW_OP_DELETE_MULTI_OBJ; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 1000d93adcaa..0d549a47b6a9 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -5185,7 +5185,7 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi store->remove_rgw_head_obj(op); auto& ioctx = ref.pool.ioctx(); - r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, null_yield); + r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y); /* raced with another operation, object state is indeterminate */ const bool need_invalidate = (r == -ECANCELED); @@ -7601,7 +7601,7 @@ int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp, op.read(0, cct->_conf->rgw_max_chunk_size, first_chunk, NULL); } bufferlist outbl; - r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, &outbl, null_yield); + r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, &outbl, y); if (epoch) { *epoch = ref.pool.ioctx().get_last_version(); diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 1ed672fa6f99..a0b2d6f36cae 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -3925,9 +3925,10 @@ void RGWDeleteMultiObj_ObjStore_S3::begin_response() rgw_flush_formatter(s, s->formatter); } -void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key, +void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key, bool delete_marker, - const string& marker_version_id, int ret) + const string& marker_version_id, + int ret) { if (!key.empty()) { delete_multi_obj_entry ops_log_entry; @@ -3973,7 +3974,11 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key, } ops_log_entries.push_back(std::move(ops_log_entry)); - rgw_flush_formatter(s, s->formatter); + if (formatter_flush_cond) { + formatter_flush_cond->cancel(); + } else { + rgw_flush_formatter(s, s->formatter); + } } } diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index 14b4e6b61ea7..9b32e4097c95 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -492,8 +492,8 @@ public: int get_params(optional_yield y) override; void send_status() override; void begin_response() override; - void send_partial_response(rgw_obj_key& key, bool delete_marker, - const string& marker_version_id, int ret) override; + void send_partial_response(const rgw_obj_key& key, bool delete_marker, + const std::string& marker_version_id, int ret) override; void end_response() override; };