#include <boost/algorithm/string/predicate.hpp>
#include <boost/optional.hpp>
#include <boost/utility/in_place_factory.hpp>
+#include <boost/asio.hpp>
#include "include/scope_guard.h"
#include "common/Clock.h"
entry.delete_multi_obj_meta.objects = std::move(ops_log_entries);
}
+void RGWDeleteMultiObj::wait_flush(optional_yield y, size_t n)
+{
+ if (y) {
+ if (ops_log_entries.size() == n) {
+ rgw_flush_formatter(s, s->formatter);
+ return;
+ }
+ auto yc = y.get_yield_context();
+ for (;;) {
+ boost::system::error_code error;
+ formatter_flush_cond->async_wait(yc[error]);
+ rgw_flush_formatter(s, s->formatter);
+ if (ops_log_entries.size() == n) {
+ break;
+ }
+ }
+ }
+}
+
+void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key *o, optional_yield y)
+{
+ std::string version_id;
+ std::unique_ptr<rgw::sal::RGWObject> obj = bucket->get_object(*o);
+ if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) {
+ auto identity_policy_res = eval_identity_or_session_policies(s->iam_user_policies, s->env,
+ boost::none,
+ o->instance.empty() ?
+ rgw::IAM::s3DeleteObject :
+ rgw::IAM::s3DeleteObjectVersion,
+ ARN(obj->get_obj()));
+ if (identity_policy_res == Effect::Deny) {
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+
+ rgw::IAM::Effect e = Effect::Pass;
+ rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
+ if (s->iam_policy) {
+ e = s->iam_policy->eval(s->env,
+ *s->auth.identity,
+ o->instance.empty() ?
+ rgw::IAM::s3DeleteObject :
+ rgw::IAM::s3DeleteObjectVersion,
+ ARN(obj->get_obj()),
+ princ_type);
+ }
+ if (e == Effect::Deny) {
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+
+ if (!s->session_policies.empty()) {
+ auto session_policy_res = eval_identity_or_session_policies(s->session_policies, s->env,
+ boost::none,
+ o->instance.empty() ?
+ rgw::IAM::s3DeleteObject :
+ rgw::IAM::s3DeleteObjectVersion,
+ ARN(obj->get_obj()));
+ if (session_policy_res == Effect::Deny) {
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+ if (princ_type == rgw::IAM::PolicyPrincipal::Role) {
+ //Intersection of session policy and identity policy plus intersection of session policy and bucket policy
+ if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) &&
+ (session_policy_res != Effect::Allow || e != Effect::Allow)) {
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+ } else if (princ_type == rgw::IAM::PolicyPrincipal::Session) {
+ //Intersection of session policy and identity policy plus bucket policy
+ if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) {
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+ } else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy
+ if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) {
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+ }
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+
+ if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
+ send_partial_response(*o, false, "", -EACCES);
+ return;
+ }
+ }
+
+ uint64_t obj_size = 0;
+ std::string etag;
+
+ if (!rgw::sal::RGWObject::empty(obj.get())) {
+ RGWObjState* astate = nullptr;
+ bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
+ const auto ret = obj->get_obj_state(this, obj_ctx, *bucket, &astate, y, true);
+
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ // object maybe delete_marker, skip check_obj_lock
+ check_obj_lock = false;
+ } else {
+ // Something went wrong.
+ send_partial_response(*o, false, "", ret);
+ return;
+ }
+ } else {
+ obj_size = astate->size;
+ etag = astate->attrset[RGW_ATTR_ETAG].to_str();
+ }
+
+ if (check_obj_lock) {
+ ceph_assert(astate);
+ int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
+ if (object_lock_response != 0) {
+ send_partial_response(*o, false, "", object_lock_response);
+ return;
+ }
+ }
+ }
+
+ // make reservation for notification if needed
+ const auto versioned_object = s->bucket->versioning_enabled();
+ rgw::notify::reservation_t res(this, store, s, obj.get());
+ const auto event_type = versioned_object && obj->get_instance().empty() ?
+ rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
+ op_ret = rgw::notify::publish_reserve(this, event_type, res, nullptr);
+ if (op_ret < 0) {
+ send_partial_response(*o, false, "", op_ret);
+ return;
+ }
+
+ obj->set_atomic(obj_ctx);
+
+ op_ret = obj->delete_object(this, obj_ctx, s->owner, s->bucket_owner, ceph::real_time(),
+ false, 0, version_id, y);
+ if (op_ret == -ENOENT) {
+ op_ret = 0;
+ }
+
+ send_partial_response(*o, obj->get_delete_marker(), version_id, op_ret);
+
+ // send request to notification manager
+ const auto ret = rgw::notify::publish_commit(obj.get(), obj_size, ceph::real_clock::now(), etag, event_type, res, this);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+ // too late to rollback operation, hence op_ret is not set here
+ }
+}
+
void RGWDeleteMultiObj::execute(optional_yield y)
{
RGWMultiDelDelete *multi_delete;
RGWMultiDelXMLParser parser;
RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
char* buf;
+ if (y) {
+ formatter_flush_cond = std::make_unique<boost::asio::deadline_timer>(y.get_io_context());
+ }
buf = data.c_str();
if (!buf) {
for (iter = multi_delete->objects.begin();
iter != multi_delete->objects.end();
++iter) {
- std::string version_id;
- std::unique_ptr<rgw::sal::RGWObject> obj = bucket->get_object(*iter);
- if (s->iam_policy || ! s->iam_user_policies.empty() || !s->session_policies.empty()) {
- auto identity_policy_res = eval_identity_or_session_policies(s->iam_user_policies, s->env,
- boost::none,
- iter->instance.empty() ?
- rgw::IAM::s3DeleteObject :
- rgw::IAM::s3DeleteObjectVersion,
- ARN(obj->get_obj()));
- if (identity_policy_res == Effect::Deny) {
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
-
- rgw::IAM::Effect e = Effect::Pass;
- rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
- if (s->iam_policy) {
- e = s->iam_policy->eval(s->env,
- *s->auth.identity,
- iter->instance.empty() ?
- rgw::IAM::s3DeleteObject :
- rgw::IAM::s3DeleteObjectVersion,
- ARN(obj->get_obj()),
- princ_type);
- }
- if (e == Effect::Deny) {
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
-
- if (!s->session_policies.empty()) {
- auto session_policy_res = eval_identity_or_session_policies(s->session_policies, s->env,
- boost::none,
- iter->instance.empty() ?
- rgw::IAM::s3DeleteObject :
- rgw::IAM::s3DeleteObjectVersion,
- ARN(obj->get_obj()));
- if (session_policy_res == Effect::Deny) {
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
- if (princ_type == rgw::IAM::PolicyPrincipal::Role) {
- //Intersection of session policy and identity policy plus intersection of session policy and bucket policy
- if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) &&
- (session_policy_res != Effect::Allow || e != Effect::Allow)) {
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
- } else if (princ_type == rgw::IAM::PolicyPrincipal::Session) {
- //Intersection of session policy and identity policy plus bucket policy
- if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) {
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
- } else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy
- if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) {
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
- }
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
-
- if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
- send_partial_response(*iter, false, "", -EACCES);
- continue;
- }
- }
-
- uint64_t obj_size = 0;
- std::string etag;
-
- if (!rgw::sal::RGWObject::empty(obj.get())) {
- RGWObjState* astate = nullptr;
- bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
- const auto ret = obj->get_obj_state(this, obj_ctx, *bucket, &astate, s->yield, true);
-
- if (ret < 0) {
- if (ret == -ENOENT) {
- // object maybe delete_marker, skip check_obj_lock
- check_obj_lock = false;
- } else {
- // Something went wrong.
- send_partial_response(*iter, false, "", ret);
- continue;
- }
- } else {
- obj_size = astate->size;
- etag = astate->attrset[RGW_ATTR_ETAG].to_str();
- }
-
- if (check_obj_lock) {
- ceph_assert(astate);
- int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
- if (object_lock_response != 0) {
- send_partial_response(*iter, false, "", object_lock_response);
- continue;
- }
- }
- }
-
- // make reservation for notification if needed
- const auto versioned_object = s->bucket->versioning_enabled();
- rgw::notify::reservation_t res(this, store, s, obj.get());
- const auto event_type = versioned_object && obj->get_instance().empty() ?
- rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
- op_ret = rgw::notify::publish_reserve(this, event_type, res, nullptr);
- if (op_ret < 0) {
- send_partial_response(*iter, false, "", op_ret);
- continue;
- }
-
- obj->set_atomic(obj_ctx);
-
- op_ret = obj->delete_object(this, obj_ctx, s->owner, s->bucket_owner, ceph::real_time(),
- false, 0, version_id, s->yield);
- if (op_ret == -ENOENT) {
- op_ret = 0;
- }
-
- send_partial_response(*iter, obj->get_delete_marker(), version_id, op_ret);
-
- // send request to notification manager
- const auto ret = rgw::notify::publish_commit(obj.get(), obj_size, ceph::real_clock::now(), etag, event_type, res, this);
- if (ret < 0) {
- ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
- // too late to rollback operation, hence op_ret is not set here
+ rgw_obj_key* obj_key = &*iter;
+ if (y) {
+ spawn::spawn(y.get_yield_context(), [this, &y, obj_key] (yield_context yield) {
+ handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield });
+ });
+ } else {
+ handle_individual_object(obj_key, y);
}
}
+ wait_flush(y, multi_delete->objects.size());
+
/* set the return code to zero, errors at this point will be
dumped to the response */
op_ret = 0;