entry.delete_multi_obj_meta.objects = std::move(ops_log_entries);
}
-void RGWDeleteMultiObj::wait_flush(optional_yield y, std::function<bool()> predicate)
+void RGWDeleteMultiObj::wait_flush(optional_yield y,
+ boost::asio::deadline_timer *formatter_flush_cond,
+ std::function<bool()> predicate)
{
- if (y) {
+ if (y && formatter_flush_cond) {
auto yc = y.get_yield_context();
while (!predicate()) {
boost::system::error_code error;
}
}
-void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key *o, optional_yield y)
+void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key *o, optional_yield y,
+ boost::asio::deadline_timer *formatter_flush_cond)
{
std::string version_id;
std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(*o);
rgw::IAM::s3DeleteObjectVersion,
ARN(obj->get_obj()));
if (identity_policy_res == Effect::Deny) {
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
princ_type);
}
if (e == Effect::Deny) {
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
rgw::IAM::s3DeleteObjectVersion,
ARN(obj->get_obj()));
if (session_policy_res == Effect::Deny) {
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
if (princ_type == rgw::IAM::PolicyPrincipal::Role) {
//Intersection of session policy and identity policy plus intersection of session policy and bucket policy
if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) &&
(session_policy_res != Effect::Allow || e != Effect::Allow)) {
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
} else if (princ_type == rgw::IAM::PolicyPrincipal::Session) {
//Intersection of session policy and identity policy plus bucket policy
if ((session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) && e != Effect::Allow) {
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
} else if (princ_type == rgw::IAM::PolicyPrincipal::Other) {// there was no match in the bucket policy
if (session_policy_res != Effect::Allow || identity_policy_res != Effect::Allow) {
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
}
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
if ((identity_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
- send_partial_response(*o, false, "", -EACCES);
+ send_partial_response(*o, false, "", -EACCES, formatter_flush_cond);
return;
}
}
check_obj_lock = false;
} else {
// Something went wrong.
- send_partial_response(*o, false, "", ret);
+ send_partial_response(*o, false, "", ret, formatter_flush_cond);
return;
}
} else {
ceph_assert(astate);
int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
if (object_lock_response != 0) {
- send_partial_response(*o, false, "", object_lock_response);
+ send_partial_response(*o, false, "", object_lock_response, formatter_flush_cond);
return;
}
}
= store->get_notification(obj.get(), s->src_object.get(), s, event_type);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
- send_partial_response(*o, false, "", op_ret);
+ send_partial_response(*o, false, "", op_ret, formatter_flush_cond);
return;
}
op_ret = 0;
}
- send_partial_response(*o, obj->get_delete_marker(), del_op->result.version_id, op_ret);
+ send_partial_response(*o, obj->get_delete_marker(), del_op->result.version_id, op_ret, formatter_flush_cond);
// send request to notification manager
int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
vector<rgw_obj_key>::iterator iter;
RGWMultiDelXMLParser parser;
uint32_t aio_count = 0;
- uint32_t max_aio = s->cct->_conf->rgw_multi_obj_del_max_aio;
+ const uint32_t max_aio = s->cct->_conf->rgw_multi_obj_del_max_aio;
char* buf;
+ std::unique_ptr<boost::asio::deadline_timer> formatter_flush_cond;
if (y) {
formatter_flush_cond = std::make_unique<boost::asio::deadline_timer>(y.get_io_context());
}
++iter) {
rgw_obj_key* obj_key = &*iter;
if (y && max_aio > 1) {
- wait_flush(y, [&aio_count, max_aio] {
+ wait_flush(y, formatter_flush_cond.get(), [&aio_count, max_aio] {
return aio_count < max_aio;
});
aio_count++;
- spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key] (yield_context yield) {
- handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield });
+ spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key, &formatter_flush_cond] (yield_context yield) {
+ handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }, formatter_flush_cond.get());
aio_count--;
});
} else {
- handle_individual_object(obj_key, y);
+ handle_individual_object(obj_key, y, formatter_flush_cond.get());
}
}
- wait_flush(y, [this, n=multi_delete->objects.size()] {
+ wait_flush(y, formatter_flush_cond.get(), [this, n=multi_delete->objects.size()] {
return n == ops_log_entries.size();
});
* Handles the deletion of an individual object and uses
* set_partial_response to record the outcome.
*/
- void handle_individual_object(const rgw_obj_key *o, optional_yield y);
+ void handle_individual_object(const rgw_obj_key *o,
+ optional_yield y,
+ boost::asio::deadline_timer *formatter_flush_cond);
/**
* When the request is being executed in a coroutine, performs
* and saved on the req_state vs. one that is passed on the stack.
* This is a no-op in the case where we're not executing as a coroutine.
*/
- void wait_flush(optional_yield y, std::function<bool()> predicate);
+ void wait_flush(optional_yield y,
+ boost::asio::deadline_timer *formatter_flush_cond,
+ std::function<bool()> predicate);
protected:
std::vector<delete_multi_obj_entry> ops_log_entries;
-
- /**
- * Acts as an async condition variable when the request is being
- * executed on a coroutine. Formatter flushing must happen on the main
- * request coroutine vs. spawned coroutines, so spawned coroutines use
- * the cancellation of this timer to notify the main coroutine when
- * data is ready to flush.
- */
- std::unique_ptr<boost::asio::deadline_timer> formatter_flush_cond;
-
bufferlist data;
rgw::sal::Bucket* bucket;
bool quiet;
virtual void send_status() = 0;
virtual void begin_response() = 0;
virtual void send_partial_response(const rgw_obj_key& key, bool delete_marker,
- const std::string& marker_version_id, int ret) = 0;
+ const std::string& marker_version_id, int ret,
+ boost::asio::deadline_timer *formatter_flush_cond) = 0;
virtual void end_response() = 0;
const char* name() const override { return "multi_object_delete"; }
RGWOpType get_type() override { return RGW_OP_DELETE_MULTI_OBJ; }