int RGWRados::Object::Delete::delete_obj(optional_yield y,
const DoutPrefixProvider* dpp,
bool log_op,
- const bool force)
+ const bool force,
+ const bool skip_olh_obj_update)
{
RGWRados *store = target->get_store();
const rgw_obj& src_obj = target->get_obj();
int r = store->set_olh(dpp, target->get_ctx(), target->get_bucket_info(), marker, true,
&meta, params.olh_epoch, params.unmod_since, params.high_precision_time,
- y, params.zones_trace, add_log);
+ y, params.zones_trace, add_log, skip_olh_obj_update);
if (r < 0) {
return r;
}
r = store->unlink_obj_instance(
dpp, target->get_ctx(), target->get_bucket_info(), obj,
params.olh_epoch, y, params.bilog_flags,
- params.null_verid, params.zones_trace, add_log, force);
+ params.null_verid, params.zones_trace, add_log, force, skip_olh_obj_update);
if (r < 0) {
return r;
}
const real_time& expiration_time,
rgw_zone_set *zones_trace,
bool log_op,
- const bool force) // force removal even if head object is broken
+ const bool force, // force removal even if head object is broken
+ const bool skip_olh_obj_update) // true for all deletes (except the last one) initiated by a multi-object delete op
{
RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
RGWRados::Object::Delete del_op(&del_target);
del_op.params.zones_trace = zones_trace;
del_op.params.null_verid = null_verid;
- return del_op.delete_obj(y, dpp, log_op, force);
+ return del_op.delete_obj(y, dpp, log_op, force, skip_olh_obj_update);
}
int RGWRados::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
rgw_obj obj_instance(bucket, key);
int ret = delete_obj(dpp, obj_ctx, bucket_info, obj_instance, 0, y,
null_verid, RGW_BILOG_FLAG_VERSIONED_OP,
- ceph::real_time(), zones_trace, log_op, force);
+ ceph::real_time(), zones_trace, log_op, force, true /* skip_olh_obj_update */);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
return ret;
// it's possible that the pending xattr from this op prevented the olh
// object from being cleaned by another thread that was deleting the last
// existing version. We invoke a best-effort update_olh here to handle this case.
- int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y, zones_trace, log_data_change);
- if (r < 0 && r != -ECANCELED) {
- ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+ if (! skip_olh_obj_update) {
+ int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y, zones_trace, log_data_change);
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+ }
}
return ret;
}
// exit early if we're skipping the olh update and just updating the index
if (skip_olh_obj_update) {
+ ldpp_dout(dpp, 20) << "skip update_olh() target_obj=" << olh_obj << dendl;
return 0;
}
bool null_verid,
rgw_zone_set* zones_trace,
bool log_op,
- const bool force)
+ const bool force,
+ const bool skip_olh_obj_update)
{
string op_tag;
// it's possible that the pending xattr from this op prevented the olh
// object from being cleaned by another thread that was deleting the last
// existing version. We invoke a best-effort update_olh here to handle this case.
- int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y,
- zones_trace, null_verid, log_op, force);
- if (r < 0 && r != -ECANCELED) {
- ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+ if (! skip_olh_obj_update) {
+ int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y,
+ zones_trace, null_verid, log_op, force);
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
+ }
}
return ret;
} // if error in bucket_index_unlink_instance call
return -EIO;
}
+ if (skip_olh_obj_update) {
+ ldpp_dout(dpp, 20) << "skip update_olh() target_obj=" << olh_obj << dendl;
+ return 0;
+ }
+
ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, y,
zones_trace, null_verid, log_op, force);
if (ret == -ECANCELED) { /* already did what we needed, no need to retry, raced with another user */
int delete_obj(optional_yield y,
const DoutPrefixProvider* dpp,
bool log_op,
- const bool force); // if head object missing, do a best effort
+ const bool force, // if head object missing, do a best effort
+ const bool skip_olh_obj_update // true for all deletes (except the last one) initiated by a multi-object delete op
+ );
}; // struct RGWRados::Object::Delete
struct Stat {
const ceph::real_time& expiration_time = ceph::real_time(),
rgw_zone_set *zones_trace = nullptr,
bool log_op = true,
- const bool force = false); // if head object missing, do a best effort
+ const bool force = false, // if head object missing, do a best effort
+ const bool skip_olh_obj_update = false); // true for all deletes (except the last one) initiated by a multi-object delete op
int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y);
uint64_t olh_epoch, optional_yield y,
uint16_t bilog_flags, bool null_verid,
rgw_zone_set *zones_trace = nullptr,
- bool log_op = true, const bool force = false);
+ bool log_op = true, const bool force = false, const bool skip_olh_obj_update = false);
void check_pending_olh_entries(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist>& pending_entries, std::map<std::string, bufferlist> *rm_pending_entries);
int remove_olh_pending_entries(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::map<std::string, bufferlist>& pending_attrs, optional_yield y);
parent_op.params.parts_accounted_size = params.parts_accounted_size;
parent_op.params.null_verid = params.null_verid;
- int ret = parent_op.delete_obj(y, dpp, flags & FLAG_LOG_OP, flags & FLAG_FORCE_OP);
+ int ret = parent_op.delete_obj(y, dpp, flags & FLAG_LOG_OP, flags & FLAG_FORCE_OP, flags & FLAG_SKIP_UPDATE_OLH);
if (ret < 0) {
return ret;
}
}
// convert flags to bool params
- return del_op.delete_obj(y, dpp, flags & FLAG_LOG_OP, flags & FLAG_FORCE_OP);
+ return del_op.delete_obj(y, dpp, flags & FLAG_LOG_OP, flags & FLAG_FORCE_OP, flags & FLAG_SKIP_UPDATE_OLH);
} // RadosObject::delete_object
int RadosObject::copy_object(const ACLOwner& owner,
}
void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_yield y,
- boost::asio::deadline_timer *formatter_flush_cond)
+ boost::asio::deadline_timer *formatter_flush_cond,
+ const bool skip_olh_obj_update)
{
std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(o);
if (o.empty()) {
del_op->params.bucket_owner = s->bucket_owner.id;
del_op->params.marker_version_id = version_id;
- op_ret = del_op->delete_obj(this, y, rgw::sal::FLAG_LOG_OP);
+ op_ret = del_op->delete_obj(this, y,
+ rgw::sal::FLAG_LOG_OP | (skip_olh_obj_update ? rgw::sal::FLAG_SKIP_UPDATE_OLH : 0));
if (op_ret == -ENOENT) {
op_ret = 0;
}
+
if (op_ret == 0) {
// send request to notification manager
int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret, formatter_flush_cond);
}
+void RGWDeleteMultiObj::handle_versioned_objects(const std::vector<rgw_obj_key>& objects,
+ uint32_t max_aio,
+ boost::asio::yield_context y)
+{
+ std::optional<boost::asio::deadline_timer> formatter_flush_cond;
+ auto ex = y.get_executor();
+ formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
+ uint32_t aio_count = 0;
+ std::map<std::string, std::vector<rgw_obj_key>> grouped_objects;
+
+ // group objects by their keys
+ for (const auto& object : objects) {
+ const std::string& key = object.name;
+ grouped_objects[key].push_back(object);
+ }
+
+ // for each group of objects, handle all but the last object and skip update_olh
+ for (const auto& kv : grouped_objects) {
+ const auto& group = kv.second;
+ for (size_t i = 0; i + 1 < group.size(); ++i) { // skip the last element
+ wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] {
+ return aio_count < max_aio;
+ });
+ aio_count++;
+ const rgw_obj_key obj = group[i];
+ boost::asio::spawn(y, [this, &aio_count, obj, &formatter_flush_cond](boost::asio::yield_context yield) {
+ handle_individual_object(obj, yield, &*formatter_flush_cond, true /* skip_olh_obj_update */);
+ aio_count--;
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
+ }
+ }
+ wait_flush(y, &*formatter_flush_cond, [this, n=objects.size()-grouped_objects.size()] {
+ return n == ops_log_entries.size();
+ });
+
+ // Now handle the last object of each group with update_olh
+ for (const auto& kv : grouped_objects) {
+ const rgw_obj_key obj = kv.second.back();
+
+ wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] {
+ return aio_count < max_aio;
+ });
+ aio_count++;
+ boost::asio::spawn(y, [this, &aio_count, obj, &formatter_flush_cond] (boost::asio::yield_context yield) {
+ handle_individual_object(obj, yield, &*formatter_flush_cond);
+ aio_count--;
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
+ }
+ wait_flush(y, &*formatter_flush_cond, [this, n=objects.size()] {
+ return n == ops_log_entries.size();
+ });
+}
+
+void RGWDeleteMultiObj::handle_non_versioned_objects(const std::vector<rgw_obj_key>& objects,
+ uint32_t max_aio,
+ boost::asio::yield_context y)
+{
+ std::optional<boost::asio::deadline_timer> formatter_flush_cond;
+ auto ex = y.get_executor();
+ formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
+ uint32_t aio_count = 0;
+
+ for (const auto& object : objects) {
+ wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] {
+ return aio_count < max_aio;
+ });
+ aio_count++;
+ boost::asio::spawn(y, [this, &aio_count, object, &formatter_flush_cond] (boost::asio::yield_context yield) {
+ handle_individual_object(object, yield, &*formatter_flush_cond);
+ aio_count--;
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
+ }
+
+ wait_flush(y, &*formatter_flush_cond, [this, n=objects.size()] {
+ return n == ops_log_entries.size();
+ });
+}
+
+void RGWDeleteMultiObj::handle_objects(const std::vector<rgw_obj_key>& objects,
+ uint32_t max_aio,
+ boost::asio::yield_context yield)
+{
+ if (bucket->versioned()) {
+ handle_versioned_objects(objects, max_aio, yield);
+ } else {
+ handle_non_versioned_objects(objects, max_aio, yield);
+ }
+}
+
void RGWDeleteMultiObj::execute(optional_yield y)
{
RGWMultiDelDelete *multi_delete;
- vector<rgw_obj_key>::iterator iter;
RGWMultiDelXMLParser parser;
- uint32_t aio_count = 0;
const uint32_t max_aio = std::max<uint32_t>(1, s->cct->_conf->rgw_multi_obj_del_max_aio);
char* buf;
std::optional<boost::asio::deadline_timer> formatter_flush_cond;
goto done;
}
- for (iter = multi_delete->objects.begin();
- iter != multi_delete->objects.end();
- ++iter) {
- rgw_obj_key obj_key = *iter;
- if (y) {
- wait_flush(y, &*formatter_flush_cond, [&aio_count, max_aio] {
- return aio_count < max_aio;
- });
- aio_count++;
- boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) {
- handle_individual_object(obj_key, yield, &*formatter_flush_cond);
- aio_count--;
- }, [] (std::exception_ptr eptr) {
- if (eptr) std::rethrow_exception(eptr);
- });
- } else {
- handle_individual_object(obj_key, y, nullptr);
- }
- }
- if (formatter_flush_cond) {
- wait_flush(y, &*formatter_flush_cond, [this, n=multi_delete->objects.size()] {
- return n == ops_log_entries.size();
- });
+ // if we're not already running in a coroutine, spawn one
+ if (!y) {
+ auto& objects = multi_delete->objects;
+
+ boost::asio::io_context context;
+ boost::asio::spawn(context,
+ [this, &objects, max_aio] (boost::asio::yield_context yield) {
+ handle_objects(objects, max_aio, yield);
+ },
+ [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
+ context.run();
+ } else {
+ // use the existing coroutine's yield context
+ handle_objects(multi_delete->objects, max_aio, y.get_yield_context());
}
/* set the return code to zero, errors at this point will be
* set_partial_response to record the outcome.
*/
void handle_individual_object(const rgw_obj_key& o,
- optional_yield y,
- boost::asio::deadline_timer *formatter_flush_cond);
+ optional_yield y,
+ boost::asio::deadline_timer *formatter_flush_cond,
+ const bool skip_olh_obj_update = false);
+ void handle_versioned_objects(const std::vector<rgw_obj_key>& objects,
+ uint32_t max_aio, boost::asio::yield_context yield);
+ void handle_non_versioned_objects(const std::vector<rgw_obj_key>& objects,
+ uint32_t max_aio, boost::asio::yield_context yield);
+ void handle_objects(const std::vector<rgw_obj_key>& objects,
+ uint32_t max_aio, boost::asio::yield_context yield);
/**
* When the request is being executed in a coroutine, performs
// if cannot do all elements of op, do as much as possible (e.g.,
// delete object where head object is missing)
static constexpr uint32_t FLAG_FORCE_OP = 0x0004;
+static constexpr uint32_t FLAG_SKIP_UPDATE_OLH = 0x0008;
// a simple streaming data processing abstraction