}
int DaosObject::delete_object(const DoutPrefixProvider* dpp, optional_yield y,
- uint32_t flags) {
+ uint32_t flags, std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv) {
ldpp_dout(dpp, 20) << "DEBUG: delete_object" << dendl;
DaosObject::DaosDeleteOp del_op(this);
del_op.params.bucket_owner = bucket->get_info().owner;
map<int, string>& part_etags, list<rgw_obj_index_key>& remove_objs,
uint64_t& accounted_size, bool& compressed, RGWCompressionInfo& cs_info,
off_t& off, std::string& tag, ACLOwner& owner, uint64_t olh_epoch,
- rgw::sal::Object* target_obj) {
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) {
ldpp_dout(dpp, 20) << "DEBUG: complete" << dendl;
char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
return ret;
}
+int DaosMultipartUpload::cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes)
+{
+ return -ENOTSUP;
+}
+
int DaosMultipartUpload::get_info(const DoutPrefixProvider* dpp,
optional_yield y, rgw_placement_rule** rule,
rgw::sal::Attrs* attrs) {
virtual ~DaosObject();
virtual int delete_object(const DoutPrefixProvider* dpp, optional_yield y,
- uint32_t flags) override;
+ uint32_t flags, std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv) override;
virtual int copy_object(
const ACLOwner& owner, const rgw_user& remote_user,
req_info* info, const rgw_zone_id& source_zone,
uint64_t& accounted_size, bool& compressed,
RGWCompressionInfo& cs_info, off_t& off,
std::string& tag, ACLOwner& owner, uint64_t olh_epoch,
- rgw::sal::Object* target_obj) override;
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) override;
+ virtual int cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes) override;
virtual int get_info(const DoutPrefixProvider* dpp, optional_yield y,
rgw_placement_rule** rule,
rgw::sal::Attrs* attrs = nullptr) override;
std::unique_ptr<rgw::sal::Object> object = get_object(key);
- ret = object->delete_object(dpp, null_yield, rgw::sal::FLAG_LOG_OP);
+ ret = object->delete_object(dpp, null_yield, rgw::sal::FLAG_LOG_OP, nullptr, nullptr);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 0) << "ERROR: remove_bucket rgw_remove_object failed rc=" << ret << dendl;
return ret;
return 0;
}
-int MotrObject::delete_object(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags)
+int MotrObject::delete_object(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv)
{
MotrObject::MotrDeleteOp del_op(this);
del_op.params.bucket_owner = bucket->get_info().owner;
RGWCompressionInfo& cs_info, off_t& off,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj)
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes)
{
char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
M0_IC_DEL, meta_obj->get_key().get_oid(), bl);
}
+int MotrMultipartUpload::cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes)
+{
+ return -ENOTSUP;
+}
+
int MotrMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
{
if (!rule && !attrs) {
virtual int delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags) override;
+ uint32_t flags,
+ td::list<rgw_obj_index_key>* remove_objs,
+ GWObjVersionTracker* objv) override;
virtual int copy_object(const ACLOwner& owner,
const rgw_user& remote_user,
req_info* info, const rgw_zone_id& source_zone,
RGWCompressionInfo& cs_info, off_t& off,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj) override;
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) override;
+ virtual int cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes) override;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
optional_yield y,
int POSIXObject::delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags)
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv)
{
POSIXBucket *b = static_cast<POSIXBucket*>(get_bucket());
if (!b) {
int POSIXObject::POSIXDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
optional_yield y, uint32_t flags)
{
- return source->delete_object(dpp, y, flags);
+ return source->delete_object(dpp, y, flags, nullptr, nullptr);
}
int POSIXObject::copy(const DoutPrefixProvider *dpp, optional_yield y,
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj)
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes)
{
char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
return 0;
}
+int POSIXMultipartUpload::cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes)
+{
+ return -ENOTSUP;
+}
+
int POSIXMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y,
rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
{
virtual int delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags) override;
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv) override;
virtual int copy_object(const ACLOwner& owner,
const rgw_user& remote_user,
req_info* info, const rgw_zone_id& source_zone,
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj) override;
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) override;
+ virtual int cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes) override;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y,
rgw_placement_rule** rule, rgw::sal::Attrs* attrs) override;
std::unique_ptr<rgw::sal::Object> object = bucket->get_object(key);
- return object->delete_object(dpp, y, rgw::sal::FLAG_LOG_OP);
+ return object->delete_object(dpp, y, rgw::sal::FLAG_LOG_OP, nullptr, nullptr);
}
static void set_err_msg(std::string *sink, std::string msg)
std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(key);
obj->set_atomic();
- ret = obj->delete_object(dpp, null_yield, rgw::sal::FLAG_LOG_OP);
+ ret = obj->delete_object(dpp, null_yield, rgw::sal::FLAG_LOG_OP, nullptr, nullptr);
return ret;
}
#include "services/svc_zone.h"
#include "rgw_sal_rados.h"
+#include "cls/version/cls_version_client.h"
+
#define dout_subsys ceph_subsys_rgw
using namespace std;
}
librados::ObjectWriteOperation op;
+ op.assert_exists();
cls_rgw_mp_upload_part_info_update(op, p, info);
+ cls_version_inc(op);
r = rgw_rados_operate(rctx.dpp, meta_obj_ref.ioctx, meta_obj_ref.obj.oid, &op, rctx.y);
ldpp_dout(rctx.dpp, 20) << "Update meta: " << meta_obj_ref.obj.oid << " part " << p << " prefix " << info.manifest.get_prefix() << " return " << r << dendl;
op = librados::ObjectWriteOperation{};
op.assert_exists(); // detect races with abort
op.omap_set(m);
+ cls_version_inc(op);
r = rgw_rados_operate(rctx.dpp, meta_obj_ref.ioctx, meta_obj_ref.obj.oid, &op, rctx.y);
}
+
if (r < 0) {
return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
}
store->remove_rgw_head_obj(op);
+ if (params.check_objv != nullptr) {
+ cls_version_check(op, *params.check_objv, VER_COND_EQ);
+ }
+
auto& ioctx = ref.ioctx;
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
int r = -ENOENT;
if (!assume_noent) {
- r = RGWRados::raw_obj_stat(dpp, raw_obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : NULL), NULL, y);
+ r = RGWRados::raw_obj_stat(dpp, raw_obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : NULL), &s->objv_tracker, y);
}
if (r == -ENOENT) {
return -ENOENT;
}
+ if (params.objv_tracker) {
+ *params.objv_tracker = astate->objv_tracker;
+ }
+
RGWBucketInfo& bucket_info = source->get_bucket_info();
if (params.part_num) {
if (first_chunk) {
op.read(0, cct->_conf->rgw_max_chunk_size, first_chunk, NULL);
}
+
bufferlist outbl;
r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, &outbl, y);
uint64_t *epoch;
int* part_num = nullptr;
std::optional<int> parts_count;
+ RGWObjVersionTracker *objv_tracker = nullptr;
Params() : lastmod(nullptr), obj_size(nullptr), attrs(nullptr),
target_obj(nullptr), epoch(nullptr)
rgw_zone_set *zones_trace;
bool abortmp;
uint64_t parts_accounted_size;
+ obj_version *check_objv;
- DeleteParams() : versioning_status(0), null_verid(false), olh_epoch(0), bilog_flags(0), remove_objs(NULL), high_precision_time(false), zones_trace(nullptr), abortmp(false), parts_accounted_size(0) {}
+ DeleteParams() : versioning_status(0), null_verid(false), olh_epoch(0), bilog_flags(0), remove_objs(NULL), high_precision_time(false), zones_trace(nullptr), abortmp(false), parts_accounted_size(0), check_objv(nullptr) {}
} params;
struct DeleteResult {
read_op.params.target_obj = target_obj;
read_op.params.obj_size = &state.size;
read_op.params.lastmod = &state.mtime;
+ read_op.params.objv_tracker = &state.objv_tracker;
return read_op.prepare(y, dpp);
}
parent_op.params.abortmp = params.abortmp;
parent_op.params.parts_accounted_size = params.parts_accounted_size;
parent_op.params.null_verid = params.null_verid;
+ if (params.objv_tracker) {
+ parent_op.params.check_objv = params.objv_tracker->version_for_check();
+ }
int ret = parent_op.delete_obj(y, dpp, flags & FLAG_LOG_OP);
if (ret < 0)
int RadosObject::delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags)
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv)
{
RGWRados::Object del_target(store->getRados(), bucket->get_info(), *rados_ctx, get_obj());
RGWRados::Object::Delete del_op(&del_target);
del_op.params.bucket_owner = bucket->get_info().owner;
del_op.params.versioning_status = (flags & FLAG_PREVENT_VERSIONING)
? 0 : bucket->get_info().versioning_status();
+ del_op.params.remove_objs = remove_objs;
+ if (objv) {
+ del_op.params.check_objv = objv->version_for_check();
+ }
return del_op.delete_obj(y, dpp, flags & FLAG_LOG_OP);
}
y);
}
+int RadosMultipartUpload::cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes)
+{
+ bool truncated;
+ int ret;
+ int max_parts = 1000;
+ int marker = 0;
+ cls_rgw_obj_chain chain;
+
+ do {
+ ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y);
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << ": RadosMultipartUpload::list_parts returned " << ret << dendl;
+ return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+ }
+
+ for (auto part_it = parts.begin(); part_it != parts.end(); ++part_it) {
+ RadosMultipartPart* part = dynamic_cast<RadosMultipartPart*>(part_it->second.get());
+
+ auto& part_prefixes = processed_prefixes[part->info.num];
+
+ if (!part->info.manifest.empty()) {
+ auto manifest_prefix = part->info.manifest.get_prefix();
+ if (not manifest_prefix.empty() && part_prefixes.find(manifest_prefix) == part_prefixes.end()) {
+ store->getRados()->update_gc_chain(dpp, obj, part->info.manifest, &chain);
+
+ RGWObjManifest::obj_iterator oiter = part->info.manifest.obj_begin(dpp);
+ if (oiter != part->info.manifest.obj_end(dpp)) {
+ rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store->getRados());
+
+ rgw_obj head_obj;
+ RGWSI_Tier_RADOS::raw_obj_to_obj(bucket->get_key(), raw_head, &head_obj);
+
+ rgw_obj_index_key remove_key;
+ head_obj.key.get_index_key(&remove_key);
+ remove_objs.push_back(remove_key);
+ }
+ }
+ }
+ cleanup_part_history(dpp, y, part, remove_objs, part_prefixes);
+ }
+ } while (truncated);
+
+ if (store->getRados()->get_gc() == nullptr) {
+ //Delete objects inline if gc hasn't been initialised (in case when bypass gc is specified)
+ store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id(), y);
+ } else {
+ /* use upload id as tag and do it synchronously */
+ auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id(), y);
+ if (ret < 0 && leftover_chain) {
+ ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ //Delete objects inline if send chain to gc fails
+ store->getRados()->delete_objs_inline(dpp, *leftover_chain, mp_obj.get_upload_id(), y);
+ }
+ }
+ return 0;
+}
+
int RadosMultipartUpload::cleanup_part_history(const DoutPrefixProvider* dpp,
optional_yield y,
RadosMultipartPart *part,
- list<rgw_obj_index_key>& remove_objs)
+ list<rgw_obj_index_key>& remove_objs,
+ boost::container::flat_set<std::string>& processed_prefixes)
{
cls_rgw_obj_chain chain;
for (auto& ppfx : part->get_past_prefixes()) {
+ auto [it, inserted] = processed_prefixes.emplace(ppfx);
+ if (!inserted) {
+ continue; // duplicate
+ }
+
rgw_obj past_obj;
past_obj.init_ns(bucket->get_key(), ppfx + "." + std::to_string(part->info.num), mp_ns);
rgw_obj_index_key past_key;
int ret;
uint64_t parts_accounted_size = 0;
- do {
- ret = list_parts(dpp, cct, 1000, marker, &marker, &truncated, y);
+ prefix_map_t processed_prefixes;
+
+ static constexpr auto MAX_DELETE_RETRIES = 15u;
+ for (auto i = 0u; i < MAX_DELETE_RETRIES; i++) {
+ ret = meta_obj->get_obj_attrs(y, dpp);
if (ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << ": RadosMultipartUpload::list_parts returned " <<
- ret << dendl;
+ ldpp_dout(dpp, 0) << __func__ << ": ERROR: failed to get obj attrs, obj=" << meta_obj
+ << " ret=" << ret << dendl;
return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
}
- for (auto part_it = parts.begin();
- part_it != parts.end();
- ++part_it) {
- RadosMultipartPart* obj_part = dynamic_cast<RadosMultipartPart*>(part_it->second.get());
- if (obj_part->info.manifest.empty()) {
- std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
- rgw_obj_key(obj_part->oid, std::string(), RGW_OBJ_NS_MULTIPART));
- obj->set_hash_source(mp_obj.get_key());
- ret = obj->delete_object(dpp, y, 0);
- if (ret < 0 && ret != -ENOENT)
- return ret;
- } else {
- auto target = meta_obj->get_obj();
- store->getRados()->update_gc_chain(dpp, target, obj_part->info.manifest, &chain);
- RGWObjManifest::obj_iterator oiter = obj_part->info.manifest.obj_begin(dpp);
- if (oiter != obj_part->info.manifest.obj_end(dpp)) {
- std::unique_ptr<rgw::sal::Object> head = bucket->get_object(rgw_obj_key());
- rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store->getRados());
- dynamic_cast<rgw::sal::RadosObject*>(head.get())->raw_obj_to_obj(raw_head);
-
- rgw_obj_index_key key;
- head->get_key().get_index_key(&key);
- remove_objs.push_back(key);
-
- cleanup_part_history(dpp, null_yield, obj_part, remove_objs);
+ RGWObjVersionTracker objv_tracker = meta_obj->get_version_tracker();
+
+ do {
+ ret = list_parts(dpp, cct, 1000, marker, &marker, &truncated, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << ": RadosMultipartUpload::list_parts returned " << ret << dendl;
+ return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+ }
+
+ for (auto part_it = parts.begin(); part_it != parts.end(); ++part_it) {
+ RadosMultipartPart* obj_part = dynamic_cast<RadosMultipartPart*>(part_it->second.get());
+
+ if (obj_part->info.manifest.empty()) {
+ std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
+ rgw_obj_key(obj_part->oid, std::string(), RGW_OBJ_NS_MULTIPART));
+ obj->set_hash_source(mp_obj.get_key());
+ ret = obj->delete_object(dpp, y, 0, nullptr, nullptr);
+ if (ret < 0 && ret != -ENOENT)
+ return ret;
+ } else {
+ auto manifest_prefix = obj_part->info.manifest.get_prefix();
+ auto [it, inserted] = processed_prefixes.emplace(obj_part->info.num, boost::container::flat_set<std::string>{});
+ if (not manifest_prefix.empty()) {
+ if (it->second.find(manifest_prefix) != it->second.end()) {
+ continue;
+ }
+ it->second.emplace(manifest_prefix);
+ }
+
+ auto target = meta_obj->get_obj();
+ store->getRados()->update_gc_chain(dpp, target, obj_part->info.manifest, &chain);
+ RGWObjManifest::obj_iterator oiter = obj_part->info.manifest.obj_begin(dpp);
+ if (oiter != obj_part->info.manifest.obj_end(dpp)) {
+ std::unique_ptr<rgw::sal::Object> head = bucket->get_object(rgw_obj_key());
+ rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store->getRados());
+ dynamic_cast<rgw::sal::RadosObject*>(head.get())->raw_obj_to_obj(raw_head);
+
+ rgw_obj_index_key key;
+ head->get_key().get_index_key(&key);
+ remove_objs.push_back(key);
+
+ cleanup_part_history(dpp, null_yield, obj_part, remove_objs, it->second);
+ }
}
+ parts_accounted_size += obj_part->info.accounted_size;
}
- parts_accounted_size += obj_part->info.accounted_size;
- }
- } while (truncated);
+ } while (truncated);
- if (store->getRados()->get_gc() == nullptr) {
- //Delete objects inline if gc hasn't been initialised (in case when bypass gc is specified)
- store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id(), y);
- } else {
- /* use upload id as tag and do it synchronously */
- auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id(), y);
- if (ret < 0 && leftover_chain) {
- ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
- if (ret == -ENOENT) {
- return -ERR_NO_SUCH_UPLOAD;
+ if (store->getRados()->get_gc() == nullptr) {
+ //Delete objects inline if gc hasn't been initialised (in case when bypass gc is specified)
+ store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id(), y);
+ } else {
+ /* use upload id as tag and do it synchronously */
+ auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id(), y);
+ if (ret < 0 && leftover_chain) {
+ ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ //Delete objects inline if send chain to gc fails
+ store->getRados()->delete_objs_inline(dpp, *leftover_chain, mp_obj.get_upload_id(), y);
}
- //Delete objects inline if send chain to gc fails
- store->getRados()->delete_objs_inline(dpp, *leftover_chain, mp_obj.get_upload_id(), y);
}
- }
- std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op();
- del_op->params.bucket_owner = bucket->get_info().owner;
- del_op->params.versioning_status = 0;
- if (!remove_objs.empty()) {
- del_op->params.remove_objs = &remove_objs;
- }
+ std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op();
+ del_op->params.bucket_owner = bucket->get_info().owner;
+ del_op->params.versioning_status = 0;
+ if (!remove_objs.empty()) {
+ del_op->params.remove_objs = &remove_objs;
+ }
- del_op->params.abortmp = true;
- del_op->params.parts_accounted_size = parts_accounted_size;
+ del_op->params.abortmp = true;
+ del_op->params.parts_accounted_size = parts_accounted_size;
+ del_op->params.objv_tracker = &objv_tracker;
- // and also remove the metadata obj
- ret = del_op->delete_obj(dpp, y, 0);
- if (ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
- ret << dendl;
+ // and also remove the metadata obj
+ ret = del_op->delete_obj(dpp, y, 0);
+ if (ret != -ECANCELED) {
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " << ret << dendl;
+ }
+ break;
+ }
+ ldpp_dout(dpp, 20) << "deleting meta_obj is cancelled due to mismatch cls_version: " << objv_tracker << dendl;
+ chain.objs.clear();
+ marker = 0;
}
+
return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
}
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj)
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes)
{
char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
rgw_obj src_obj;
src_obj.init_ns(bucket->get_key(), oid, mp_ns);
+ auto [it, inserted] = processed_prefixes.emplace(part->info.num, boost::container::flat_set<std::string>{});
+
if (obj_part.manifest.empty()) {
ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj="
<< src_obj << dendl;
if (not manifest_prefix.empty()) {
// It has an explicit prefix. Override the default one.
src_obj.init_ns(bucket->get_key(), manifest_prefix + "." + std::to_string(part->info.num), mp_ns);
+ it->second.emplace(manifest_prefix);
}
}
remove_objs.push_back(remove_key);
- cleanup_part_history(dpp, y, part, remove_objs);
+ cleanup_part_history(dpp, y, part, remove_objs, it->second);
ofs += obj_part.size;
accounted_size += obj_part.accounted_size;
rados_ctx->invalidate(get_obj());
}
virtual int delete_object(const DoutPrefixProvider* dpp,
- optional_yield y, uint32_t flags) override;
+ optional_yield y, uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv) override;
virtual int copy_object(const ACLOwner& owner,
const rgw_user& remote_user,
req_info* info, const rgw_zone_id& source_zone,
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj) override;
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) override;
+ virtual int cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes) override;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
optional_yield y,
int cleanup_part_history(const DoutPrefixProvider* dpp,
optional_yield y,
RadosMultipartPart* part,
- std::list<rgw_obj_index_key>& remove_objs);
+ std::list<rgw_obj_index_key>& remove_objs,
+ boost::container::flat_set<std::string>& processed_prefixes);
};
class MPRadosSerializer : public StoreMPSerializer {
return;
}
+ RGWObjVersionTracker& objv_tracker = meta_obj->get_version_tracker();
+
+ using prefix_map_t = rgw::sal::MultipartUpload::prefix_map_t;
+ prefix_map_t processed_prefixes;
+
op_ret =
upload->complete(this, y, s->cct, parts->parts, remove_objs, accounted_size,
- compressed, cs_info, ofs, s->req_id, s->owner, olh_epoch,
- s->object.get());
+ compressed, cs_info, ofs, s->req_id, s->owner, olh_epoch,
+ s->object.get(), processed_prefixes);
if (op_ret < 0) {
ldpp_dout(this, 0) << "ERROR: upload complete failed ret=" << op_ret << dendl;
return;
}
+ remove_objs.clear();
+
+ // use cls_version_check() when deleting the meta object to detect part uploads that raced
+ // with upload->complete(). any parts that finish after that won't be part of the final
+ // upload, so they need to be gc'd and removed from the bucket index before retrying
+ // deletion of the multipart meta object
+ static constexpr auto MAX_DELETE_RETRIES = 15u;
+ for (auto i = 0u; i < MAX_DELETE_RETRIES; i++) {
+ // remove the upload meta object ; the meta object is not versioned
+ // when the bucket is, as that would add an unneeded delete marker
+ int ret = meta_obj->delete_object(this, y, rgw::sal::FLAG_PREVENT_VERSIONING, &remove_objs, &objv_tracker);
+ if (ret != -ECANCELED || i == MAX_DELETE_RETRIES - 1) {
+ if (ret >= 0) {
+ /* serializer's exclusive lock is released */
+ serializer->clear_locked();
+ } else {
+ ldpp_dout(this, 1) << "ERROR: failed to remove object " << meta_obj << ", ret: " << ret << dendl;
+ }
+ break;
+ }
+
+ ldpp_dout(this, 20) << "deleting meta_obj is cancelled due to mismatch cls_version: " << objv_tracker << dendl;
+ objv_tracker.clear();
+
+ ret = meta_obj->get_obj_attrs(s->yield, this);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to get obj attrs, obj=" << meta_obj
+ << " ret=" << ret << dendl;
+
+ if (ret != -ENOENT) {
+ ldpp_dout(this, 0) << "ERROR: failed to remove object " << meta_obj << dendl;
+ }
+ break;
+ }
+
+ ret = upload->cleanup_orphaned_parts(this, s->cct, y, meta_obj->get_obj(), remove_objs, processed_prefixes);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: failed to clenup orphaned parts. ret=" << ret << dendl;
+ }
+ }
+
const ceph::real_time upload_time = upload->get_mtime();
etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
}
-
- // remove the upload meta object ; the meta object is not versioned
- // when the bucket is, as that would add an unneeded delete marker
- ret = meta_obj->delete_object(this, y, rgw::sal::FLAG_PREVENT_VERSIONING);
- if (ret >= 0) {
- /* serializer's exclusive lock is released */
- serializer->clear_locked();
- } else {
- ldpp_dout(this, 4) << "WARNING: failed to remove object " << meta_obj << ", ret: " << ret << dendl;
- }
-
} // RGWCompleteMultipart::execute
bool RGWCompleteMultipart::check_previously_completed(const RGWMultiCompleteUpload* parts)
rgw_zone_set* zones_trace{nullptr};
bool abortmp{false};
uint64_t parts_accounted_size{0};
+ RGWObjVersionTracker* objv_tracker = nullptr;
} params;
struct Result {
/** Shortcut synchronous delete call for common deletes */
virtual int delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags) = 0;
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv) = 0;
/** Copy an this object to another object. */
virtual int copy_object(const ACLOwner& owner, const rgw_user& remote_user,
req_info* info, const rgw_zone_id& source_zone,
virtual int get_torrent_info(const DoutPrefixProvider* dpp,
optional_yield y, bufferlist& bl) = 0;
+ /** Get the version tracker for this object */
+ virtual RGWObjVersionTracker& get_version_tracker() = 0;
+
/** Get the OMAP values matching the given set of keys */
virtual int omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid,
const std::set<std::string>& keys,
*/
class MultipartUpload {
public:
+ using prefix_map_t = boost::container::flat_map<uint32_t, boost::container::flat_set<std::string>>;
+
//object lock
std::optional<RGWObjectRetention> obj_retention = std::nullopt;
std::optional<RGWObjectLegalHold> obj_legal_hold = std::nullopt;
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj) = 0;
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) = 0;
+ /** Cleanup orphaned parts caused by racing condition involving part upload retry */
+ virtual int cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes) = 0;
/** Get placement and/or attribute info for this upload */
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0;
return ret;
}
- int DBObject::delete_object(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags)
+ int DBObject::delete_object(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv)
{
DB::Object del_target(store->getDB(), bucket->get_info(), get_obj());
DB::Object::Delete del_op(&del_target);
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj)
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes)
{
char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
return ret;
}
+ int DBMultipartUpload::cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes)
+ {
+ return -ENOTSUP;
+ }
+
int DBMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
{
if (!rule && !attrs) {
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj) override;
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) override;
+ virtual int cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes) override;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
optional_yield y,
virtual int delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags) override;
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv) override;
virtual int copy_object(const ACLOwner& owner,
const rgw_user& remote_user,
req_info* info, const rgw_zone_id& source_zone,
int FilterObject::delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags)
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv)
{
- return next->delete_object(dpp, y, flags);
+ return next->delete_object(dpp, y, flags, remove_objs, objv);
}
int FilterObject::copy_object(const ACLOwner& owner,
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj)
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes)
{
return next->complete(dpp, y, cct, part_etags, remove_objs, accounted_size,
compressed, cs_info, ofs, tag, owner, olh_epoch,
- nextObject(target_obj));
+ nextObject(target_obj), processed_prefixes);
+}
+
+int FilterMultipartUpload::cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes)
+{
+ return next->cleanup_orphaned_parts(dpp, cct, y, obj, remove_objs, processed_prefixes);
}
int FilterMultipartUpload::get_info(const DoutPrefixProvider *dpp,
virtual int delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
- uint32_t flags) override;
+ uint32_t flags,
+ std::list<rgw_obj_index_key>* remove_objs,
+ RGWObjVersionTracker* objv) override;
virtual int copy_object(const ACLOwner& owner,
const rgw_user& remote_user,
req_info* info, const rgw_zone_id& source_zone,
virtual int get_torrent_info(const DoutPrefixProvider* dpp,
optional_yield y, bufferlist& bl) override;
+ virtual RGWObjVersionTracker& get_version_tracker() override { return next->get_version_tracker(); }
+
virtual int omap_get_vals_by_keys(const DoutPrefixProvider *dpp,
const std::string& oid,
const std::set<std::string>& keys,
RGWCompressionInfo& cs_info, off_t& ofs,
std::string& tag, ACLOwner& owner,
uint64_t olh_epoch,
- rgw::sal::Object* target_obj) override;
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes) override;
+ virtual int cleanup_orphaned_parts(const DoutPrefixProvider *dpp,
+ CephContext *cct, optional_yield y,
+ const rgw_obj& obj,
+ std::list<rgw_obj_index_key>& remove_objs,
+ prefix_map_t& processed_prefixes) override;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y,
rgw_placement_rule** rule,
return -ENOENT;
}
+ virtual RGWObjVersionTracker& get_version_tracker() override { return state.objv_tracker; }
+
virtual void print(std::ostream& out) const override {
if (bucket)
out << bucket << ":";
off_t ofs{0};
uint64_t accounted_size{0};
std::string tag;
+ rgw::sal::MultipartUpload::prefix_map_t processed_prefixes;
ACLOwner owner;
owner.id = bucket->get_owner();
int ret = upload->complete(env->dpp, null_yield, get_pointer(env->cct), parts,
remove_objs, accounted_size, compressed, cs_info,
- ofs, tag, owner, 0, mp_obj.get());
+ ofs, tag, owner, 0, mp_obj.get(), processed_prefixes);
EXPECT_EQ(ret, 0);
EXPECT_EQ(write_size, ofs);
EXPECT_EQ(write_size, accounted_size);
off_t ofs{0};
uint64_t accounted_size{0};
std::string tag;
+ rgw::sal::MultipartUpload::prefix_map_t processed_prefixes;
ACLOwner owner;
owner.id = bucket->get_owner();
mp_obj->gen_rand_obj_instance_name();
int ret = upload->complete(env->dpp, null_yield, get_pointer(env->cct), parts,
remove_objs, accounted_size, compressed, cs_info,
- ofs, tag, owner, 0, mp_obj.get());
+ ofs, tag, owner, 0, mp_obj.get(), processed_prefixes);
EXPECT_EQ(ret, 0);
EXPECT_EQ(write_size, ofs);
EXPECT_EQ(write_size, accounted_size);