]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: fix multipart upload object leaks due to re-upload
authorYixin Jin <yjin77@yahoo.ca>
Mon, 9 Jan 2023 19:42:25 +0000 (19:42 +0000)
committerYixin Jin <yjin77@yahoo.ca>
Thu, 26 Jan 2023 19:42:23 +0000 (19:42 +0000)
This commit fixes the object leaks when an mp part object
is re-uploaded. Details of the fix are:

1. Upon re-upload, remember the prefix used in previous
part upload in a new field "past_prefixes" in RGWUploadPartInfo.
2. Create a new CLS function for updating part info
in the metadata object.
3. Utilize the new CLS function during mp upload.
4. At the upload conclusion (compete/abort), clean up
the part objects that are not used for the final
assembly, thus preventing the object leak.

Fixes: https://tracker.ceph.com/issues/16767
Signed-off-by: Yixin Jin <yjin77@yahoo.ca>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_const.h
src/cls/rgw/cls_rgw_ops.cc
src/cls/rgw/cls_rgw_ops.h
src/rgw/driver/rados/rgw_putobj_processor.cc
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/rgw_basic_types.h
src/rgw/rgw_multi.cc

index a4c531915514f11d62a3c6b30d807c405db04a9a..cda3e35d861245f38b503c15b4f7a69a0f12b09c 100644 (file)
@@ -83,7 +83,7 @@ static bool bi_is_plain_entry(const std::string& s) {
   return (s.empty() || (unsigned char)s[0] != BI_PREFIX_CHAR);
 }
 
-int bi_entry_type(const string& s)
+static int bi_entry_type(const string& s)
 {
   if (bi_is_plain_entry(s)) {
     return BI_BUCKET_OBJS_INDEX;
@@ -3538,7 +3538,7 @@ static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
   return 0;
 }
 
-int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   CLS_LOG(10, "entered %s", __func__);
 
@@ -4335,6 +4335,49 @@ static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in,  buffe
   return 0;
 }
 
+static int rgw_mp_upload_part_info_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(10, "entered %s", __func__);
+  cls_rgw_mp_upload_part_info_update_op op;
+  auto in_iter = in->cbegin();
+  try {
+    decode(op, in_iter);
+  } catch (ceph::buffer::error& err) {
+    CLS_LOG(1, "ERROR: rgw_cls_mp_upload_part_info_update(): failed to decode op\n");
+    return -EINVAL;
+  }
+
+  RGWUploadPartInfo stored_info;
+
+  int ret = read_omap_entry(hctx, op.part_key, &stored_info);
+  if (ret < 0 and ret != -ENOENT) {
+    return ret;
+  }
+
+  /* merge all the prior (stored) manifest prefixes to carry forward */
+  if (not stored_info.manifest.empty()) {
+    op.info.past_prefixes.insert(stored_info.manifest.get_prefix());
+  }
+  op.info.past_prefixes.merge(stored_info.past_prefixes);
+
+  if (op.info.past_prefixes.contains(op.info.manifest.get_prefix())) {
+    // Somehow the current chosen prefix collides with one of previous ones.
+    // Better fail this part upload so it can pick a different one in the next.
+    const object_info_t& oi = cls_get_object_info(hctx);
+    CLS_LOG(1, "ERROR: oid [%s]: Current prefix %s is also a past prefix for part %s", 
+            oi.soid.oid.name.c_str(),
+            op.info.manifest.get_prefix().c_str(),
+            op.part_key.c_str());
+    return -EINVAL;
+  }
+
+  bufferlist bl;
+  encode(op.info, bl);
+  ret = cls_cxx_map_set_val(hctx, op.part_key, &bl);
+  CLS_LOG(10, "part info update on key [%s]: %zu past prefixes, ret %d", op.part_key.c_str(), op.info.past_prefixes.size(), ret);
+  return ret;
+}
+
 static int rgw_reshard_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   CLS_LOG(10, "entered %s", __func__);
@@ -4608,6 +4651,7 @@ CLS_INIT(rgw)
   cls_method_handle_t h_rgw_lc_put_head;
   cls_method_handle_t h_rgw_lc_get_head;
   cls_method_handle_t h_rgw_lc_list_entries;
+  cls_method_handle_t h_rgw_mp_upload_part_info_update;
   cls_method_handle_t h_rgw_reshard_add;
   cls_method_handle_t h_rgw_reshard_list;
   cls_method_handle_t h_rgw_reshard_get;
@@ -4671,6 +4715,9 @@ CLS_INIT(rgw)
   cls_register_cxx_method(h_class, RGW_LC_GET_HEAD, CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head);
   cls_register_cxx_method(h_class, RGW_LC_LIST_ENTRIES, CLS_METHOD_RD, rgw_cls_lc_list_entries, &h_rgw_lc_list_entries);
 
+  /* multipart */
+  cls_register_cxx_method(h_class, RGW_MP_UPLOAD_PART_INFO_UPDATE, CLS_METHOD_RD | CLS_METHOD_WR, rgw_mp_upload_part_info_update, &h_rgw_mp_upload_part_info_update);
+
   /* resharding */
   cls_register_cxx_method(h_class, RGW_RESHARD_ADD, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_add, &h_rgw_reshard_add);
   cls_register_cxx_method(h_class, RGW_RESHARD_LIST, CLS_METHOD_RD, rgw_reshard_list, &h_rgw_reshard_list);
index 4b544b3b20af856a3edc6a4aeed99a273e74c0a8..08374b361391adca11fd5503199cf713fb78fa20 100644 (file)
@@ -1068,6 +1068,26 @@ int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid,
   return r;
 }
 
+int cls_rgw_mp_upload_part_info_update(librados::IoCtx& io_ctx,
+                                       const std::string& oid,
+                                       const std::string& part_key,
+                                       const RGWUploadPartInfo& info)
+{
+  buffer::list in, out;
+  cls_rgw_mp_upload_part_info_update_op op;
+
+  // For now, there doesn't appear to be a need for an encoded
+  // result -- we might in future want to return a copy of the final
+  // RGWUploadPartInfo
+  op.part_key = part_key;
+  op.info = info;
+  encode(op, in);
+
+  int r = io_ctx.exec(oid, RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in, out);
+  return r;
+}
+
 void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
 {
   bufferlist in;
index 57d55548b1eeb2cff681f9a6cead595447c1a9be..674533ae3dd256a63938ac1d5311a9e979dcf428 100644 (file)
@@ -618,6 +618,9 @@ int cls_rgw_lc_list(librados::IoCtx& io_ctx, const std::string& oid,
                     std::vector<cls_rgw_lc_entry>& entries);
 #endif
 
+/* multipart */
+int cls_rgw_mp_upload_part_info_update(librados::IoCtx& io_ctx, const std::string& oid, const std::string& part_key, const RGWUploadPartInfo& info);
+
 /* resharding */
 void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
 void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
index 6015872f8be811f6f891dc701610f950fdbecc96..8595db3c9e8b930ddb44ba51a85012b9eaa6fcae 100644 (file)
@@ -64,6 +64,9 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
 #define RGW_LC_GET_HEAD "lc_get_head"
 #define RGW_LC_LIST_ENTRIES "lc_list_entries"
 
+/* multipart */
+#define RGW_MP_UPLOAD_PART_INFO_UPDATE "mp_upload_part_info_update"
+
 /* resharding */
 #define RGW_RESHARD_ADD "reshard_add"
 #define RGW_RESHARD_LIST "reshard_list"
index c3df98411167ca2e0de92c1b2e4a985d2686fd35..15bcba33330da80d11c2605fd1ff0992f3d89e21 100644 (file)
@@ -430,6 +430,21 @@ void cls_rgw_bi_log_list_ret::generate_test_instances(list<cls_rgw_bi_log_list_r
   ls.back()->truncated = true;
 }
 
+void cls_rgw_mp_upload_part_info_update_op::generate_test_instances(std::list<cls_rgw_mp_upload_part_info_update_op*>& ls)
+{
+  ls.push_back(new cls_rgw_mp_upload_part_info_update_op);
+  ls.back()->part_key = "part1";
+  ls.push_back(new cls_rgw_mp_upload_part_info_update_op);
+  ls.back()->part_key = "part2";
+}
+
+void cls_rgw_mp_upload_part_info_update_op::dump(Formatter* f) const
+{
+  encode_json("part_key", part_key, f);
+  encode_json("part_num", info.num, f);
+  encode_json("part_prefix", info.manifest.get_prefix(), f);
+}
+
 void cls_rgw_reshard_add_op::generate_test_instances(list<cls_rgw_reshard_add_op*>& ls)
 {
   ls.push_back(new cls_rgw_reshard_add_op);
index f6015eacea022d9eedfe9c35e00a4bd15d14c65f..2891a3b61049ba06f5c9fd015de9f787a628670b 100644 (file)
@@ -1287,6 +1287,31 @@ cls_rgw_lc_list_entries_ret(uint8_t compat_v = 3)
 };
 WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_ret)
 
+struct cls_rgw_mp_upload_part_info_update_op {
+  std::string part_key;
+  RGWUploadPartInfo info;
+
+  cls_rgw_mp_upload_part_info_update_op() {}
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(part_key, bl);
+    encode(info, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(part_key, bl);
+    decode(info, bl);
+    DECODE_FINISH(bl);
+  }
+
+  static void generate_test_instances(std::list<cls_rgw_mp_upload_part_info_update_op*>& ls);
+  void dump(Formatter* f) const;
+};
+WRITE_CLASS_ENCODER(cls_rgw_mp_upload_part_info_update_op)
+
 struct cls_rgw_reshard_add_op {
  cls_rgw_reshard_entry entry;
 
index 8a6a157018ef024a0bce8d56a94ea55e40fc3716..fae8d557bba5cff4d784930f4003bdce86bfb16c 100644 (file)
@@ -497,13 +497,23 @@ int MultipartObjectProcessor::complete(size_t accounted_size,
     return r;
   }
 
-  encode(info, bl);
-
   std::unique_ptr<rgw::sal::Object> meta_obj =
     head_obj->get_bucket()->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
   meta_obj->set_in_extra_data(true);
 
-  r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield);
+  rgw_raw_obj meta_raw_obj;
+  dynamic_cast<rgw::sal::RadosObject*>(meta_obj.get())->get_raw_obj(&meta_raw_obj);
+
+  rgw_rados_ref meta_obj_ref;
+  r = store->getRados()->get_raw_obj_ref(dpp, meta_raw_obj, &meta_obj_ref);
+  if (r < 0) {
+    ldpp_dout(dpp, -1) << "ERROR: failed to get obj ref of meta obj with ret=" << r << dendl;
+    return r;
+  }
+
+  r = cls_rgw_mp_upload_part_info_update(meta_obj_ref.pool.ioctx(), meta_obj_ref.obj.oid, p, info);
+  ldpp_dout(dpp, 20) << "Update meta: " << meta_obj_ref.obj.oid << " part " << p << " prefix " << info.manifest.get_prefix() << " return " << r << dendl;
+
   if (r < 0) {
     return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
   }
index 53d5d2464009c5a49062bfe84d59ee97d2d1a4b7..949125b7df82e8916378279efc6e8982410c16d3 100644 (file)
@@ -2283,6 +2283,48 @@ int RadosObject::swift_versioning_copy(const DoutPrefixProvider* dpp, optional_y
                                         y);
 }
 
+int RadosMultipartUpload::cleanup_part_history(const DoutPrefixProvider* dpp,
+                                               optional_yield y,
+                                               RadosMultipartPart *part,
+                                               list<rgw_obj_index_key>& remove_objs)
+{
+  cls_rgw_obj_chain chain;
+  for (auto& ppfx : part->get_past_prefixes()) {
+    rgw_obj past_obj;
+    past_obj.init_ns(bucket->get_key(), ppfx + "." + std::to_string(part->info.num), mp_ns);
+    rgw_obj_index_key past_key;
+    past_obj.key.get_index_key(&past_key);
+    // Remove past upload part objects from index, too.
+    remove_objs.push_back(past_key);
+
+    RGWObjManifest manifest = part->get_manifest();
+    manifest.set_prefix(ppfx);
+    RGWObjManifest::obj_iterator miter = manifest.obj_begin(dpp);
+    for (; miter != manifest.obj_end(dpp); ++miter) {
+      rgw_raw_obj raw_part_obj = miter.get_location().get_raw_obj(store);
+      cls_rgw_obj_key part_key(raw_part_obj.oid);
+      chain.push_obj(raw_part_obj.pool.to_str(), part_key, raw_part_obj.loc);
+    }
+  }
+  if (store->getRados()->get_gc() == nullptr) {
+    // Delete objects inline if gc hasn't been initialised (in case when bypass gc is specified)
+    store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id());
+  } else {
+    // use upload id as tag and do it synchronously
+    auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id());
+    if (ret < 0 && leftover_chain) {
+      ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
+      if (ret == -ENOENT) {
+        return -ERR_NO_SUCH_UPLOAD;
+      }
+      // Delete objects inline if send chain to gc fails
+      store->getRados()->delete_objs_inline(dpp, *leftover_chain, mp_obj.get_upload_id());
+    }
+  }
+  return 0;
+}
+
+
 int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct)
 {
   std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
@@ -2326,6 +2368,8 @@ int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct)
           rgw_obj_index_key key;
           head->get_key().get_index_key(&key);
           remove_objs.push_back(key);
+
+          cleanup_part_history(dpp, null_yield, obj_part, remove_objs);
         }
       }
       parts_accounted_size += obj_part->info.accounted_size;
@@ -2605,6 +2649,11 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
         return ret;
       } else {
         manifest.append(dpp, obj_part.manifest, store->svc()->zone->get_zonegroup(), store->svc()->zone->get_zone_params());
+        auto manifest_prefix = part->info.manifest.get_prefix();
+        if (not manifest_prefix.empty()) {
+          // It has an explicit prefix. Override the default one.
+          src_obj.init_ns(bucket->get_key(), manifest_prefix + "." + std::to_string(part->info.num), mp_ns);
+        }
       }
 
       bool part_compressed = (obj_part.cs_info.compression_type != "none");
@@ -2642,6 +2691,8 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
 
       remove_objs.push_back(remove_key);
 
+      cleanup_part_history(dpp, y, part, remove_objs);
+
       ofs += obj_part.size;
       accounted_size += obj_part.accounted_size;
     }
index da1cea5608fa358e9cf010229063d52a3121b9bb..15d8022710b0c1efe047a0d6e5ba20734a9babd4 100644 (file)
@@ -630,6 +630,7 @@ public:
 
   /* For RadosStore code */
   RGWObjManifest& get_manifest() { return info.manifest; }
+  const std::set<std::string>& get_past_prefixes() const { return info.past_prefixes; }
 
   friend class RadosMultipartUpload;
 };
@@ -679,6 +680,11 @@ public:
                          const rgw_placement_rule *ptail_placement_rule,
                          uint64_t part_num,
                          const std::string& part_num_str) override;
+protected:
+  int cleanup_part_history(const DoutPrefixProvider* dpp,
+                           optional_yield y,
+                           RadosMultipartPart* part,
+                           std::list<rgw_obj_index_key>& remove_objs);
 };
 
 class MPRadosSerializer : public StoreMPSerializer {
index 4e473a0a9de77917130b1a851332c9040d154f0f..25d70bdbf1b812d94091a5ac3d8cd751d15b81d3 100644 (file)
@@ -249,10 +249,13 @@ struct RGWUploadPartInfo {
   RGWObjManifest manifest;
   RGWCompressionInfo cs_info;
 
+  // Previous part obj prefixes. Recorded here for later cleanup.
+  std::set<std::string> past_prefixes; 
+
   RGWUploadPartInfo() : num(0), size(0) {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(4, 2, bl);
+    ENCODE_START(5, 2, bl);
     encode(num, bl);
     encode(size, bl);
     encode(etag, bl);
@@ -260,10 +263,11 @@ struct RGWUploadPartInfo {
     encode(manifest, bl);
     encode(cs_info, bl);
     encode(accounted_size, bl);
+    encode(past_prefixes, bl);
     ENCODE_FINISH(bl);
   }
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
+    DECODE_START_LEGACY_COMPAT_LEN(5, 2, 2, bl);
     decode(num, bl);
     decode(size, bl);
     decode(etag, bl);
@@ -276,6 +280,9 @@ struct RGWUploadPartInfo {
     } else {
       accounted_size = size;
     }
+    if (struct_v >= 5) {
+      decode(past_prefixes, bl);
+    }
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
index e559bb5573b8e0e4506f0636ff735c7ebecd8e28..6e090d6b545f8376b90f781fc85e74a98e2547ba 100644 (file)
@@ -98,5 +98,6 @@ void RGWUploadPartInfo::dump(Formatter *f) const
   encode_json("etag", etag, f);
   utime_t ut(modified);
   encode_json("modified", ut, f);
+  encode_json("past_prefixes", past_prefixes, f);
 }