From 2ac63de15bcd59cd45a4f05c4af6297995aee944 Mon Sep 17 00:00:00 2001 From: Yixin Jin Date: Mon, 9 Jan 2023 19:42:25 +0000 Subject: [PATCH] rgw: fix multipart upload object leaks due to re-upload This commit fixes the object leaks when an mp part object is re-uploaded. Details of the fix are: 1. Upon re-upload, remember the prefix used in previous part upload in a new field "past_prefixes" in RGWUploadPartInfo. 2. Create a new CLS function for updating part info in the metadata object. 3. Utilize the new CLS function during mp upload. 4. At the upload conclusion (compete/abort), clean up the part objects that are not used for the final assembly, thus preventing the object leak. Fixes: https://tracker.ceph.com/issues/16767 Signed-off-by: Yixin Jin --- src/cls/rgw/cls_rgw.cc | 51 +++++++++++++++++++- src/cls/rgw/cls_rgw_client.cc | 20 ++++++++ src/cls/rgw/cls_rgw_client.h | 3 ++ src/cls/rgw/cls_rgw_const.h | 3 ++ src/cls/rgw/cls_rgw_ops.cc | 15 ++++++ src/cls/rgw/cls_rgw_ops.h | 25 ++++++++++ src/rgw/driver/rados/rgw_putobj_processor.cc | 16 ++++-- src/rgw/driver/rados/rgw_sal_rados.cc | 51 ++++++++++++++++++++ src/rgw/driver/rados/rgw_sal_rados.h | 6 +++ src/rgw/rgw_basic_types.h | 11 ++++- src/rgw/rgw_multi.cc | 1 + 11 files changed, 195 insertions(+), 7 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index a4c531915514..cda3e35d8612 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -83,7 +83,7 @@ static bool bi_is_plain_entry(const std::string& s) { return (s.empty() || (unsigned char)s[0] != BI_PREFIX_CHAR); } -int bi_entry_type(const string& s) +static int bi_entry_type(const string& s) { if (bi_is_plain_entry(s)) { return BI_BUCKET_OBJS_INDEX; @@ -3538,7 +3538,7 @@ static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e) return 0; } -int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +static int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { CLS_LOG(10, "entered %s", __func__); @@ -4335,6 +4335,49 @@ static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in, buffe return 0; } +static int rgw_mp_upload_part_info_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG(10, "entered %s", __func__); + cls_rgw_mp_upload_part_info_update_op op; + auto in_iter = in->cbegin(); + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_mp_upload_part_info_update(): failed to decode op\n"); + return -EINVAL; + } + + RGWUploadPartInfo stored_info; + + int ret = read_omap_entry(hctx, op.part_key, &stored_info); + if (ret < 0 and ret != -ENOENT) { + return ret; + } + + /* merge all the prior (stored) manifest prefixes to carry forward */ + if (not stored_info.manifest.empty()) { + op.info.past_prefixes.insert(stored_info.manifest.get_prefix()); + } + op.info.past_prefixes.merge(stored_info.past_prefixes); + + if (op.info.past_prefixes.contains(op.info.manifest.get_prefix())) { + // Somehow the current chosen prefix collides with one of previous ones. + // Better fail this part upload so it can pick a different one in the next. + const object_info_t& oi = cls_get_object_info(hctx); + CLS_LOG(1, "ERROR: oid [%s]: Current prefix %s is also a past prefix for part %s", + oi.soid.oid.name.c_str(), + op.info.manifest.get_prefix().c_str(), + op.part_key.c_str()); + return -EINVAL; + } + + bufferlist bl; + encode(op.info, bl); + ret = cls_cxx_map_set_val(hctx, op.part_key, &bl); + CLS_LOG(10, "part info update on key [%s]: %zu past prefixes, ret %d", op.part_key.c_str(), op.info.past_prefixes.size(), ret); + return ret; +} + static int rgw_reshard_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { CLS_LOG(10, "entered %s", __func__); @@ -4608,6 +4651,7 @@ CLS_INIT(rgw) cls_method_handle_t h_rgw_lc_put_head; cls_method_handle_t h_rgw_lc_get_head; cls_method_handle_t h_rgw_lc_list_entries; + cls_method_handle_t h_rgw_mp_upload_part_info_update; cls_method_handle_t h_rgw_reshard_add; cls_method_handle_t h_rgw_reshard_list; cls_method_handle_t h_rgw_reshard_get; @@ -4671,6 +4715,9 @@ CLS_INIT(rgw) cls_register_cxx_method(h_class, RGW_LC_GET_HEAD, CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head); cls_register_cxx_method(h_class, RGW_LC_LIST_ENTRIES, CLS_METHOD_RD, rgw_cls_lc_list_entries, &h_rgw_lc_list_entries); + /* multipart */ + cls_register_cxx_method(h_class, RGW_MP_UPLOAD_PART_INFO_UPDATE, CLS_METHOD_RD | CLS_METHOD_WR, rgw_mp_upload_part_info_update, &h_rgw_mp_upload_part_info_update); + /* resharding */ cls_register_cxx_method(h_class, RGW_RESHARD_ADD, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_add, &h_rgw_reshard_add); cls_register_cxx_method(h_class, RGW_RESHARD_LIST, CLS_METHOD_RD, rgw_reshard_list, &h_rgw_reshard_list); diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 4b544b3b20af..08374b361391 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -1068,6 +1068,26 @@ int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid, return r; } +int cls_rgw_mp_upload_part_info_update(librados::IoCtx& io_ctx, + const std::string& oid, + const std::string& part_key, + const RGWUploadPartInfo& info) +{ + buffer::list in, out; + cls_rgw_mp_upload_part_info_update_op op; + + // For now, there doesn't appear to be a need for an encoded + // result -- we might in future want to return a copy of the final + // RGWUploadPartInfo + + op.part_key = part_key; + op.info = info; + encode(op, in); + + int r = io_ctx.exec(oid, RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in, out); + return r; +} + void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry) { bufferlist in; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 57d55548b1ee..674533ae3dd2 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -618,6 +618,9 @@ int cls_rgw_lc_list(librados::IoCtx& io_ctx, const std::string& oid, std::vector& entries); #endif +/* multipart */ +int cls_rgw_mp_upload_part_info_update(librados::IoCtx& io_ctx, const std::string& oid, const std::string& part_key, const RGWUploadPartInfo& info); + /* resharding */ void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry); void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry); diff --git a/src/cls/rgw/cls_rgw_const.h b/src/cls/rgw/cls_rgw_const.h index 6015872f8be8..8595db3c9e8b 100644 --- a/src/cls/rgw/cls_rgw_const.h +++ b/src/cls/rgw/cls_rgw_const.h @@ -64,6 +64,9 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG; #define RGW_LC_GET_HEAD "lc_get_head" #define RGW_LC_LIST_ENTRIES "lc_list_entries" +/* multipart */ +#define RGW_MP_UPLOAD_PART_INFO_UPDATE "mp_upload_part_info_update" + /* resharding */ #define RGW_RESHARD_ADD "reshard_add" #define RGW_RESHARD_LIST "reshard_list" diff --git a/src/cls/rgw/cls_rgw_ops.cc b/src/cls/rgw/cls_rgw_ops.cc index c3df98411167..15bcba33330d 100644 --- a/src/cls/rgw/cls_rgw_ops.cc +++ b/src/cls/rgw/cls_rgw_ops.cc @@ -430,6 +430,21 @@ void cls_rgw_bi_log_list_ret::generate_test_instances(listtruncated = true; } +void cls_rgw_mp_upload_part_info_update_op::generate_test_instances(std::list& ls) +{ + ls.push_back(new cls_rgw_mp_upload_part_info_update_op); + ls.back()->part_key = "part1"; + ls.push_back(new cls_rgw_mp_upload_part_info_update_op); + ls.back()->part_key = "part2"; +} + +void cls_rgw_mp_upload_part_info_update_op::dump(Formatter* f) const +{ + encode_json("part_key", part_key, f); + encode_json("part_num", info.num, f); + encode_json("part_prefix", info.manifest.get_prefix(), f); +} + void cls_rgw_reshard_add_op::generate_test_instances(list& ls) { ls.push_back(new cls_rgw_reshard_add_op); diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index f6015eacea02..2891a3b61049 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -1287,6 +1287,31 @@ cls_rgw_lc_list_entries_ret(uint8_t compat_v = 3) }; WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_ret) +struct cls_rgw_mp_upload_part_info_update_op { + std::string part_key; + RGWUploadPartInfo info; + + cls_rgw_mp_upload_part_info_update_op() {} + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(part_key, bl); + encode(info, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(part_key, bl); + decode(info, bl); + DECODE_FINISH(bl); + } + + static void generate_test_instances(std::list& ls); + void dump(Formatter* f) const; +}; +WRITE_CLASS_ENCODER(cls_rgw_mp_upload_part_info_update_op) + struct cls_rgw_reshard_add_op { cls_rgw_reshard_entry entry; diff --git a/src/rgw/driver/rados/rgw_putobj_processor.cc b/src/rgw/driver/rados/rgw_putobj_processor.cc index 8a6a157018ef..fae8d557bba5 100644 --- a/src/rgw/driver/rados/rgw_putobj_processor.cc +++ b/src/rgw/driver/rados/rgw_putobj_processor.cc @@ -497,13 +497,23 @@ int MultipartObjectProcessor::complete(size_t accounted_size, return r; } - encode(info, bl); - std::unique_ptr meta_obj = head_obj->get_bucket()->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART)); meta_obj->set_in_extra_data(true); - r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield); + rgw_raw_obj meta_raw_obj; + dynamic_cast(meta_obj.get())->get_raw_obj(&meta_raw_obj); + + rgw_rados_ref meta_obj_ref; + r = store->getRados()->get_raw_obj_ref(dpp, meta_raw_obj, &meta_obj_ref); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to get obj ref of meta obj with ret=" << r << dendl; + return r; + } + + r = cls_rgw_mp_upload_part_info_update(meta_obj_ref.pool.ioctx(), meta_obj_ref.obj.oid, p, info); + ldpp_dout(dpp, 20) << "Update meta: " << meta_obj_ref.obj.oid << " part " << p << " prefix " << info.manifest.get_prefix() << " return " << r << dendl; + if (r < 0) { return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r; } diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 53d5d2464009..949125b7df82 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -2283,6 +2283,48 @@ int RadosObject::swift_versioning_copy(const DoutPrefixProvider* dpp, optional_y y); } +int RadosMultipartUpload::cleanup_part_history(const DoutPrefixProvider* dpp, + optional_yield y, + RadosMultipartPart *part, + list& remove_objs) +{ + cls_rgw_obj_chain chain; + for (auto& ppfx : part->get_past_prefixes()) { + rgw_obj past_obj; + past_obj.init_ns(bucket->get_key(), ppfx + "." + std::to_string(part->info.num), mp_ns); + rgw_obj_index_key past_key; + past_obj.key.get_index_key(&past_key); + // Remove past upload part objects from index, too. + remove_objs.push_back(past_key); + + RGWObjManifest manifest = part->get_manifest(); + manifest.set_prefix(ppfx); + RGWObjManifest::obj_iterator miter = manifest.obj_begin(dpp); + for (; miter != manifest.obj_end(dpp); ++miter) { + rgw_raw_obj raw_part_obj = miter.get_location().get_raw_obj(store); + cls_rgw_obj_key part_key(raw_part_obj.oid); + chain.push_obj(raw_part_obj.pool.to_str(), part_key, raw_part_obj.loc); + } + } + if (store->getRados()->get_gc() == nullptr) { + // Delete objects inline if gc hasn't been initialised (in case when bypass gc is specified) + store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id()); + } else { + // use upload id as tag and do it synchronously + auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id()); + if (ret < 0 && leftover_chain) { + ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl; + if (ret == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + // Delete objects inline if send chain to gc fails + store->getRados()->delete_objs_inline(dpp, *leftover_chain, mp_obj.get_upload_id()); + } + } + return 0; +} + + int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct) { std::unique_ptr meta_obj = get_meta_obj(); @@ -2326,6 +2368,8 @@ int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct) rgw_obj_index_key key; head->get_key().get_index_key(&key); remove_objs.push_back(key); + + cleanup_part_history(dpp, null_yield, obj_part, remove_objs); } } parts_accounted_size += obj_part->info.accounted_size; @@ -2605,6 +2649,11 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp, return ret; } else { manifest.append(dpp, obj_part.manifest, store->svc()->zone->get_zonegroup(), store->svc()->zone->get_zone_params()); + auto manifest_prefix = part->info.manifest.get_prefix(); + if (not manifest_prefix.empty()) { + // It has an explicit prefix. Override the default one. + src_obj.init_ns(bucket->get_key(), manifest_prefix + "." + std::to_string(part->info.num), mp_ns); + } } bool part_compressed = (obj_part.cs_info.compression_type != "none"); @@ -2642,6 +2691,8 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp, remove_objs.push_back(remove_key); + cleanup_part_history(dpp, y, part, remove_objs); + ofs += obj_part.size; accounted_size += obj_part.accounted_size; } diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index da1cea5608fa..15d8022710b0 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -630,6 +630,7 @@ public: /* For RadosStore code */ RGWObjManifest& get_manifest() { return info.manifest; } + const std::set& get_past_prefixes() const { return info.past_prefixes; } friend class RadosMultipartUpload; }; @@ -679,6 +680,11 @@ public: const rgw_placement_rule *ptail_placement_rule, uint64_t part_num, const std::string& part_num_str) override; +protected: + int cleanup_part_history(const DoutPrefixProvider* dpp, + optional_yield y, + RadosMultipartPart* part, + std::list& remove_objs); }; class MPRadosSerializer : public StoreMPSerializer { diff --git a/src/rgw/rgw_basic_types.h b/src/rgw/rgw_basic_types.h index 4e473a0a9de7..25d70bdbf1b8 100644 --- a/src/rgw/rgw_basic_types.h +++ b/src/rgw/rgw_basic_types.h @@ -249,10 +249,13 @@ struct RGWUploadPartInfo { RGWObjManifest manifest; RGWCompressionInfo cs_info; + // Previous part obj prefixes. Recorded here for later cleanup. + std::set past_prefixes; + RGWUploadPartInfo() : num(0), size(0) {} void encode(bufferlist& bl) const { - ENCODE_START(4, 2, bl); + ENCODE_START(5, 2, bl); encode(num, bl); encode(size, bl); encode(etag, bl); @@ -260,10 +263,11 @@ struct RGWUploadPartInfo { encode(manifest, bl); encode(cs_info, bl); encode(accounted_size, bl); + encode(past_prefixes, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl); + DECODE_START_LEGACY_COMPAT_LEN(5, 2, 2, bl); decode(num, bl); decode(size, bl); decode(etag, bl); @@ -276,6 +280,9 @@ struct RGWUploadPartInfo { } else { accounted_size = size; } + if (struct_v >= 5) { + decode(past_prefixes, bl); + } DECODE_FINISH(bl); } void dump(Formatter *f) const; diff --git a/src/rgw/rgw_multi.cc b/src/rgw/rgw_multi.cc index e559bb5573b8..6e090d6b545f 100644 --- a/src/rgw/rgw_multi.cc +++ b/src/rgw/rgw_multi.cc @@ -98,5 +98,6 @@ void RGWUploadPartInfo::dump(Formatter *f) const encode_json("etag", etag, f); utime_t ut(modified); encode_json("modified", ut, f); + encode_json("past_prefixes", past_prefixes, f); } -- 2.47.3