#include "include/scope_guard.h"
#include "common/Clock.h"
#include "common/armor.h"
+#include "common/async/spawn_throttle.h"
#include "common/errno.h"
#include "common/mime.h"
#include "common/utf8.h"
entry.delete_multi_obj_meta.objects = std::move(ops_log_entries);
}
-void RGWDeleteMultiObj::wait_flush(optional_yield y,
- boost::asio::deadline_timer *formatter_flush_cond,
- std::function<bool()> predicate)
-{
- if (y && formatter_flush_cond) {
- auto yc = y.get_yield_context();
- while (!predicate()) {
- boost::system::error_code error;
- formatter_flush_cond->async_wait(yc[error]);
- rgw_flush_formatter(s, s->formatter);
- }
- }
-}
-
-void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y,
- boost::asio::deadline_timer *formatter_flush_cond)
+void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y)
{
std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(o);
if (o.empty()) {
- send_partial_response(o, false, "", -EINVAL, formatter_flush_cond);
+ send_partial_response(o, false, "", -EINVAL);
return;
}
s->bucket_acl, s->iam_policy,
s->iam_identity_policies,
s->session_policies, action)) {
- send_partial_response(o, false, "", -EACCES, formatter_flush_cond);
+ send_partial_response(o, false, "", -EACCES);
return;
}
check_obj_lock = false;
} else {
// Something went wrong.
- send_partial_response(o, false, "", ret, formatter_flush_cond);
+ send_partial_response(o, false, "", ret);
return;
}
} else {
ceph_assert(state_loaded == 0);
int object_lock_response = verify_object_lock(this, obj->get_attrs(), bypass_perm, bypass_governance_mode);
if (object_lock_response != 0) {
- send_partial_response(o, false, "", object_lock_response, formatter_flush_cond);
+ send_partial_response(o, false, "", object_lock_response);
return;
}
}
= driver->get_notification(obj.get(), s->src_object.get(), s, event_type, y);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
- send_partial_response(o, false, "", op_ret, formatter_flush_cond);
+ send_partial_response(o, false, "", op_ret);
return;
}
}
}
- send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret, formatter_flush_cond);
+ send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret);
}
void RGWDeleteMultiObj::execute(optional_yield y)
{
RGWMultiDelDelete *multi_delete;
- vector<rgw_obj_key>::iterator iter;
RGWMultiDelXMLParser parser;
- uint32_t aio_count = 0;
const uint32_t max_aio = std::max<uint32_t>(1, s->cct->_conf->rgw_multi_obj_del_max_aio);
+ auto group = ceph::async::spawn_throttle{y, max_aio};
char* buf;
- std::optional<boost::asio::deadline_timer> formatter_flush_cond;
- if (y) {
- auto ex = y.get_yield_context().get_executor();
- formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
- }
buf = data.c_str();
if (!buf) {
}
begin_response();
- if (multi_delete->objects.empty()) {
- goto done;
- }
- for (iter = multi_delete->objects.begin();
- iter != multi_delete->objects.end();
- ++iter) {
- rgw_obj_key obj_key = *iter;
- if (y) {
- wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] {
- return aio_count < max_aio;
- });
- aio_count++;
- boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) {
- handle_individual_object(obj_key, yield, &*formatter_flush_cond);
- aio_count--;
- }, [] (std::exception_ptr eptr) {
- if (eptr) std::rethrow_exception(eptr);
- });
- } else {
- handle_individual_object(obj_key, y, nullptr);
- }
- }
- if (formatter_flush_cond) {
- wait_flush(y, &*formatter_flush_cond, [this, n=multi_delete->objects.size()] {
- return n == ops_log_entries.size();
- });
+ // process up to max_aio object deletes in parallel
+ for (const auto& key : multi_delete->objects) {
+ boost::asio::spawn(group.get_executor(),
+ [this, &key] (boost::asio::yield_context yield) {
+ handle_individual_object(key, yield);
+ }, group);
+
+ rgw_flush_formatter(s, s->formatter);
}
+ group.wait();
/* set the return code to zero, errors at this point will be
dumped to the response */
op_ret = 0;
-done:
// will likely segfault if begin_response() has not been called
end_response();
return;
* Handles the deletion of an individual object and uses
* set_partial_response to record the outcome.
*/
- void handle_individual_object(const rgw_obj_key& o,
- optional_yield y,
- boost::asio::deadline_timer *formatter_flush_cond);
-
- /**
- * When the request is being executed in a coroutine, performs
- * the actual formatter flushing and is responsible for the
- * termination condition (when when all partial object responses
- * have been sent). Note that the formatter flushing must be handled
- * on the coroutine that invokes the execute method vs. the
- * coroutines that are spawned to handle individual objects because
- * the flush logic uses a yield context that was captured
- * and saved on the req_state vs. one that is passed on the stack.
- * This is a no-op in the case where we're not executing as a coroutine.
- */
- void wait_flush(optional_yield y,
- boost::asio::deadline_timer *formatter_flush_cond,
- std::function<bool()> predicate);
+ void handle_individual_object(const rgw_obj_key& o, optional_yield y);
protected:
std::vector<delete_multi_obj_entry> ops_log_entries;
virtual void send_status() = 0;
virtual void begin_response() = 0;
virtual void send_partial_response(const rgw_obj_key& key, bool delete_marker,
- const std::string& marker_version_id, int ret,
- boost::asio::deadline_timer *formatter_flush_cond) = 0;
+ const std::string& marker_version_id,
+ int ret) = 0;
virtual void end_response() = 0;
const char* name() const override { return "multi_object_delete"; }
RGWOpType get_type() override { return RGW_OP_DELETE_MULTI_OBJ; }