]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW - Zipper - MultipartUpload 42266/head
authorDaniel Gryniewicz <dang@redhat.com>
Tue, 6 Jul 2021 14:32:19 +0000 (10:32 -0400)
committerDaniel Gryniewicz <dang@redhat.com>
Wed, 28 Jul 2021 16:19:18 +0000 (12:19 -0400)
Create a MultipartUpload object in the Zipper API.

Signed-off-by: Daniel Gryniewicz <dang@redhat.com>
12 files changed:
src/rgw/rgw_lc.cc
src/rgw/rgw_multi.cc
src/rgw/rgw_multi.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_orphan.cc
src/rgw/rgw_rest.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_rados.cc
src/rgw/rgw_sal_rados.h
src/rgw/services/svc_tier_rados.h

index 9ea78c93b35f0a6f4ffc35322ac1e4e7b0a26bf9..1931dd4d78ba33a22fe28ceedf5c9e0d2e9eee23 100644 (file)
@@ -840,14 +840,11 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
   auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
     auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
     auto& [rule, obj] = wt;
-    RGWMPObj mp_obj;
     if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
       rgw_obj_key key(obj.key);
-      if (!mp_obj.from_meta(key.name)) {
-       return;
-      }
+      std::unique_ptr<rgw::sal::MultipartUpload> mpu = store->get_multipart_upload(target, key.name);
       RGWObjectCtx rctx(store);
-      int ret = abort_multipart_upload(this, store, cct, &rctx, target, mp_obj);
+      int ret = mpu->abort(this, cct, &rctx);
       if (ret == 0) {
         if (perfcounter) {
           perfcounter->inc(l_rgw_lc_abort_mpu, 1);
index 3f209b9934a4062964d4e7e62e246dfec42de80b..b6edef13e72e9adbc99a70d0021e11cb6f5e0820 100644 (file)
@@ -79,296 +79,3 @@ bool is_v2_upload_id(const string& upload_id)
          (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0);
 }
 
-int list_multipart_parts(const DoutPrefixProvider *dpp, rgw::sal::Bucket* bucket,
-                        CephContext *cct,
-                        const string& upload_id,
-                        const string& meta_oid, int num_parts,
-                        int marker, map<uint32_t, RGWUploadPartInfo>& parts,
-                        int *next_marker, bool *truncated,
-                        bool assume_unsorted)
-{
-  map<string, bufferlist> parts_map;
-  map<string, bufferlist>::iterator iter;
-
-  std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
-                     rgw_obj_key(meta_oid, std::string(), RGW_OBJ_NS_MULTIPART));
-  obj->set_in_extra_data(true);
-
-  bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
-
-  parts.clear();
-
-  int ret;
-  if (sorted_omap) {
-    string p;
-    p = "part.";
-    char buf[32];
-
-    snprintf(buf, sizeof(buf), "%08d", marker);
-    p.append(buf);
-
-    ret = obj->omap_get_vals(dpp, p, num_parts + 1, &parts_map,
-                                 nullptr, null_yield);
-  } else {
-    ret = obj->omap_get_all(dpp, &parts_map, null_yield);
-  }
-  if (ret < 0) {
-    return ret;
-  }
-
-  int i;
-  int last_num = 0;
-
-  uint32_t expected_next = marker + 1;
-
-  for (i = 0, iter = parts_map.begin();
-       (i < num_parts || !sorted_omap) && iter != parts_map.end();
-       ++iter, ++i) {
-    bufferlist& bl = iter->second;
-    auto bli = bl.cbegin();
-    RGWUploadPartInfo info;
-    try {
-      decode(info, bli);
-    } catch (buffer::error& err) {
-      ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" <<
-       dendl;
-      return -EIO;
-    }
-    if (sorted_omap) {
-      if (info.num != expected_next) {
-        /* ouch, we expected a specific part num here, but we got a
-         * different one. Either a part is missing, or it could be a
-         * case of mixed rgw versions working on the same upload,
-         * where one gateway doesn't support correctly sorted omap
-         * keys for multipart upload just assume data is unsorted.
-         */
-        return list_multipart_parts(dpp, bucket, cct, upload_id,
-                                   meta_oid, num_parts, marker, parts,
-                                   next_marker, truncated, true);
-      }
-      expected_next++;
-    }
-    if (sorted_omap ||
-      (int)info.num > marker) {
-      parts[info.num] = info;
-      last_num = info.num;
-    }
-  }
-
-  if (sorted_omap) {
-    if (truncated) {
-      *truncated = (iter != parts_map.end());
-    }
-  } else {
-    /* rebuild a map with only num_parts entries */
-    map<uint32_t, RGWUploadPartInfo> new_parts;
-    map<uint32_t, RGWUploadPartInfo>::iterator piter;
-    for (i = 0, piter = parts.begin();
-        i < num_parts && piter != parts.end();
-        ++i, ++piter) {
-      new_parts[piter->first] = piter->second;
-      last_num = piter->first;
-    }
-
-    if (truncated) {
-      *truncated = (piter != parts.end());
-    }
-
-    parts.swap(new_parts);
-  }
-
-  if (next_marker) {
-    *next_marker = last_num;
-  }
-
-  return 0;
-}
-
-int list_multipart_parts(const DoutPrefixProvider *dpp, struct req_state *s,
-                        const string& upload_id,
-                        const string& meta_oid, int num_parts,
-                        int marker, map<uint32_t, RGWUploadPartInfo>& parts,
-                        int *next_marker, bool *truncated,
-                        bool assume_unsorted)
-{
-  return list_multipart_parts(dpp, s->bucket.get(), s->cct, upload_id,
-                             meta_oid, num_parts, marker, parts,
-                             next_marker, truncated, assume_unsorted);
-}
-
-int abort_multipart_upload(const DoutPrefixProvider *dpp,
-                          rgw::sal::Store* store, CephContext *cct,
-                          RGWObjectCtx *obj_ctx, rgw::sal::Bucket* bucket,
-                          RGWMPObj& mp_obj)
-{
-  std::unique_ptr<rgw::sal::Object> meta_obj = bucket->get_object(
-                   rgw_obj_key(mp_obj.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
-  meta_obj->set_in_extra_data(true);
-  meta_obj->set_hash_source(mp_obj.get_key());
-  std::unique_ptr<rgw::sal::GCChain> chain = store->get_gc_chain(meta_obj.get());
-  list<rgw_obj_index_key> remove_objs;
-  map<uint32_t, RGWUploadPartInfo> obj_parts;
-  bool truncated;
-  int marker = 0;
-  int ret;
-  uint64_t parts_accounted_size = 0;
-
-  do {
-    ret = list_multipart_parts(dpp, bucket, cct,
-                              mp_obj.get_upload_id(), mp_obj.get_meta(),
-                              1000, marker, obj_parts, &marker, &truncated);
-    if (ret < 0) {
-      ldpp_dout(dpp, 20) << __func__ << ": list_multipart_parts returned " <<
-       ret << dendl;
-      return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
-    }
-
-    for (auto obj_iter = obj_parts.begin();
-        obj_iter != obj_parts.end();
-        ++obj_iter) {
-      RGWUploadPartInfo& obj_part = obj_iter->second;
-      if (obj_part.manifest.empty()) {
-        string oid = mp_obj.get_part(obj_iter->second.num);
-       std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
-                                   rgw_obj_key(oid, std::string(), RGW_OBJ_NS_MULTIPART));
-       obj->set_hash_source(mp_obj.get_key());
-       ret = obj->delete_object(dpp, obj_ctx, null_yield);
-        if (ret < 0 && ret != -ENOENT)
-          return ret;
-      } else {
-       chain->update(dpp, &obj_part.manifest);
-        RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin(dpp);
-        if (oiter != obj_part.manifest.obj_end(dpp)) {
-         std::unique_ptr<rgw::sal::Object> head = bucket->get_object(rgw_obj_key());
-          rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
-         head->raw_obj_to_obj(raw_head);
-
-          rgw_obj_index_key key;
-          head->get_key().get_index_key(&key);
-          remove_objs.push_back(key);
-        }
-      }
-      parts_accounted_size += obj_part.accounted_size;
-    }
-  } while (truncated);
-
-  /* use upload id as tag and do it synchronously */
-  ret = chain->send(mp_obj.get_upload_id());
-  if (ret < 0) {
-    ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
-    if (ret == -ENOENT) {
-      return -ERR_NO_SUCH_UPLOAD;
-    }
-    //Delete objects inline if send chain to gc fails
-    chain->delete_inline(dpp, mp_obj.get_upload_id());
-  }
-
-  std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op(obj_ctx);
-  del_op->params.bucket_owner = bucket->get_acl_owner();
-  del_op->params.versioning_status = 0;
-  if (!remove_objs.empty()) {
-    del_op->params.remove_objs = &remove_objs;
-  }
-  
-  del_op->params.abortmp = true;
-  del_op->params.parts_accounted_size = parts_accounted_size;
-
-  // and also remove the metadata obj
-  ret = del_op->delete_obj(dpp, null_yield);
-  if (ret < 0) {
-    ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
-      ret << dendl;
-  }
-  return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
-}
-
-int list_bucket_multiparts(const DoutPrefixProvider *dpp,
-                          rgw::sal::Bucket* bucket,
-                          const string& prefix, const string& marker,
-                          const string& delim,
-                          const int& max_uploads,
-                          vector<rgw_bucket_dir_entry> *objs,
-                          map<string, bool> *common_prefixes, bool *is_truncated)
-{
-  rgw::sal::Bucket::ListParams params;
-  rgw::sal::Bucket::ListResults results;
-  MultipartMetaFilter mp_filter;
-
-  params.prefix = prefix;
-  params.delim = delim;
-  params.marker = marker;
-  params.ns = RGW_OBJ_NS_MULTIPART;
-  params.filter = &mp_filter;
-
-  int ret = bucket->list(dpp, params, max_uploads, results, null_yield);
-
-  if (ret < 0)
-    return ret;
-
-  *objs = std::move(results.objs);
-  *common_prefixes = std::move(results.common_prefixes);
-  *is_truncated = results.is_truncated;
-
-  return ret;
-}
-
-int abort_bucket_multiparts(const DoutPrefixProvider *dpp,
-                           rgw::sal::Store* store, CephContext *cct,
-                           rgw::sal::Bucket* bucket, string& prefix, string& delim)
-{
-  constexpr int max = 1000;
-  int ret, num_deleted = 0;
-  vector<rgw_bucket_dir_entry> objs;
-  RGWObjectCtx obj_ctx(store);
-  string marker;
-  bool is_truncated;
-
-  do {
-    ret = list_bucket_multiparts(dpp, bucket, prefix, marker, delim,
-                                max, &objs, nullptr, &is_truncated);
-    if (ret < 0) {
-      ldpp_dout(dpp, 0) << __func__ <<
-       " ERROR : calling list_bucket_multiparts; ret=" << ret <<
-       "; bucket=\"" << bucket << "\"; prefix=\"" <<
-       prefix << "\"; delim=\"" << delim << "\"" << dendl;
-      return ret;
-    }
-    ldpp_dout(dpp, 20) << __func__ <<
-      " INFO: aborting and cleaning up multipart upload(s); bucket=\"" <<
-      bucket << "\"; objs.size()=" << objs.size() <<
-      "; is_truncated=" << is_truncated << dendl;
-
-    if (!objs.empty()) {
-      RGWMPObj mp;
-      for (const auto& obj : objs) {
-        rgw_obj_key key(obj.key);
-        if (!mp.from_meta(key.name))
-          continue;
-        ret = abort_multipart_upload(dpp, store, cct, &obj_ctx, bucket, mp);
-        if (ret < 0) {
-         // we're doing a best-effort; if something cannot be found,
-         // log it and keep moving forward
-         if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) {
-           ldpp_dout(dpp, 0) << __func__ <<
-             " ERROR : failed to abort and clean-up multipart upload \"" <<
-             key.get_oid() << "\"" << dendl;
-           return ret;
-         } else {
-           ldpp_dout(dpp, 10) << __func__ <<
-             " NOTE : unable to find part(s) of "
-             "aborted multipart upload of \"" << key.get_oid() <<
-             "\" for cleaning up" << dendl;
-         }
-        }
-        num_deleted++;
-      }
-      if (num_deleted) {
-        ldpp_dout(dpp, 0) << __func__ <<
-         " WARNING : aborted " << num_deleted <<
-         " incomplete multipart uploads" << dendl;
-      }
-    }
-  } while (is_truncated);
-
-  return 0;
-}
index aeb983e0009b146e46a38ae5e6b5c0da34b97aaa..7174dfbb80c7f775deabfac0ab1c1bf0bccdd6a3 100644 (file)
@@ -109,38 +109,4 @@ public:
 
 extern bool is_v2_upload_id(const string& upload_id);
 
-extern int list_multipart_parts(const DoutPrefixProvider *dpp,
-                                rgw::sal::Bucket* bucket,
-                               CephContext *cct,
-                                const string& upload_id,
-                                const string& meta_oid, int num_parts,
-                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
-                                int *next_marker, bool *truncated,
-                                bool assume_unsorted = false);
-
-extern int list_multipart_parts(const DoutPrefixProvider *dpp,
-                                struct req_state *s,
-                                const string& upload_id,
-                                const string& meta_oid, int num_parts,
-                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
-                                int *next_marker, bool *truncated,
-                                bool assume_unsorted = false);
-
-extern int abort_multipart_upload(const DoutPrefixProvider *dpp, rgw::sal::Store* store,
-                                 CephContext *cct, RGWObjectCtx *obj_ctx,
-                                 rgw::sal::Bucket* bucket, RGWMPObj& mp_obj);
-
-extern int list_bucket_multiparts(const DoutPrefixProvider *dpp,
-                                 rgw::sal::Bucket* bucket,
-                                 const string& prefix,
-                                 const string& marker,
-                                 const string& delim,
-                                 const int& max_uploads,
-                                 vector<rgw_bucket_dir_entry> *objs,
-                                 map<string, bool> *common_prefixes, bool *is_truncated);
-
-extern int abort_bucket_multiparts(const DoutPrefixProvider *dpp,
-                                  rgw::sal::Store* store, CephContext *cct,
-                                  rgw::sal::Bucket* bucket,
-                                  string& prefix, string& delim);
 #endif
index 829774cbe2a095c267b1cf68d43208a4ce4b4d6f..0b578d1bab93ea8a4ee1b4bb79343c59df612eb5 100644 (file)
@@ -329,95 +329,6 @@ vector<Policy> get_iam_user_policy_from_attr(CephContext* cct,
   return policies;
 }
 
-static int get_obj_head(const DoutPrefixProvider *dpp,
-                        struct req_state *s,
-                        rgw::sal::Object* obj,
-                       bufferlist *pbl)
-{
-  std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op(s->obj_ctx);
-  obj->set_prefetch_data(s->obj_ctx);
-
-  int ret = read_op->prepare(s->yield, dpp);
-  if (ret < 0) {
-    return ret;
-  }
-
-  if (!pbl) {
-    return 0;
-  }
-
-  ret = read_op->read(0, s->cct->_conf->rgw_max_chunk_size, *pbl, s->yield, dpp);
-
-  return 0;
-}
-
-struct multipart_upload_info
-{
-  rgw_placement_rule dest_placement;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(dest_placement, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(dest_placement, bl);
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(multipart_upload_info)
-
-static int get_multipart_info(const DoutPrefixProvider *dpp, struct req_state *s,
-                             rgw::sal::Object* obj,
-                              multipart_upload_info *upload_info)
-{
-  bufferlist header;
-
-  bufferlist headbl;
-  bufferlist *pheadbl = (upload_info ? &headbl : nullptr);
-
-  int op_ret = get_obj_head(dpp, s, obj, pheadbl);
-  if (op_ret < 0) {
-    if (op_ret == -ENOENT) {
-      return -ERR_NO_SUCH_UPLOAD;
-    }
-    return op_ret;
-  }
-
-  if (upload_info && headbl.length() > 0) {
-    auto hiter = headbl.cbegin();
-    try {
-      decode(*upload_info, hiter);
-    } catch (buffer::error& err) {
-      ldpp_dout(s, 0) << "ERROR: failed to decode multipart upload info" << dendl;
-      return -EIO;
-    }
-  }
-
-  return 0;
-}
-
-static int get_multipart_info(const DoutPrefixProvider *dpp, struct req_state *s,
-                              const string& meta_oid,
-                              multipart_upload_info *upload_info,
-                              rgw::sal::Attrs* attrs = nullptr)
-{
-  map<string, bufferlist>::iterator iter;
-  bufferlist header;
-
-  std::unique_ptr<rgw::sal::Object> meta_obj;
-  meta_obj = s->bucket->get_object(rgw_obj_key(meta_oid, string(), mp_ns));
-  meta_obj->set_in_extra_data(true);
-
-  int ret = get_multipart_info(dpp, s, meta_obj.get(), upload_info);
-  if (ret >= 0 && attrs) {
-    *attrs = meta_obj->get_attrs();
-  }
-  return ret;
-}
-
 static int read_bucket_policy(const DoutPrefixProvider *dpp, 
                               rgw::sal::Store* store,
                               struct req_state *s,
@@ -473,9 +384,9 @@ static int read_obj_policy(const DoutPrefixProvider *dpp,
   // 'copy_src' is used to make this function backward compatible.
   if (!upload_id.empty() && !copy_src) {
     /* multipart upload */
-    RGWMPObj mp(object->get_name(), upload_id);
-    string oid = mp.get_meta();
-    mpobj = bucket->get_object(rgw_obj_key(oid, string(), mp_ns));
+    std::unique_ptr<rgw::sal::MultipartUpload> upload;
+    upload = store->get_multipart_upload(bucket, object->get_name(), upload_id);
+    mpobj = upload->get_meta_obj();
     mpobj->set_in_extra_data(true);
     object = mpobj.get();
   }
@@ -3830,11 +3741,11 @@ void RGWPutObj::execute(optional_yield y)
 
   rgw_placement_rule *pdest_placement;
 
-  multipart_upload_info upload_info;
   if (multipart) {
-    RGWMPObj mp(s->object->get_name(), multipart_upload_id);
-
-    op_ret = get_multipart_info(this, s, mp.get_meta(), &upload_info);
+    std::unique_ptr<rgw::sal::MultipartUpload> upload;
+    upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(),
+                                        multipart_upload_id);
+    op_ret = upload->get_info(this, s->yield, s->obj_ctx, &pdest_placement);
     if (op_ret < 0) {
       if (op_ret != -ENOENT) {
         ldpp_dout(this, 0) << "ERROR: get_multipart_info returned " << op_ret << ": " << cpp_strerror(-op_ret) << dendl;
@@ -3843,8 +3754,10 @@ void RGWPutObj::execute(optional_yield y)
       }
       return;
     }
-    pdest_placement = &upload_info.dest_placement;
-    ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl;
+    /* upload will go out of scope, so copy the dest placement for later use */
+    s->dest_placement = *pdest_placement;
+    pdest_placement = &s->dest_placement;
+    ldpp_dout(this, 20) << "dest_placement for part=" << *pdest_placement << dendl;
     processor.emplace<MultipartObjectProcessor>(
         &*aio, store, s->bucket.get(), pdest_placement,
         s->owner.get_id(), obj_ctx, s->object->clone(),
@@ -5998,43 +5911,15 @@ void RGWInitMultipart::execute(optional_yield y)
     return;
   }
 
-  do {
-    char buf[33];
-    std::unique_ptr<rgw::sal::Object> obj;
-    gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1);
-    upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
-    upload_id.append(buf);
-
-    string tmp_obj_name;
-    RGWMPObj mp(s->object->get_name(), upload_id);
-    tmp_obj_name = mp.get_meta();
-
-    obj = s->bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns));
-    // the meta object will be indexed with 0 size, we c
-    obj->set_in_extra_data(true);
-    obj->set_hash_source(s->object->get_name());
-
-    std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = obj->get_write_op(s->obj_ctx);
-
-    obj_op->params.versioning_disabled = true; /* no versioning for multipart meta */
-    obj_op->params.owner = s->owner;
-    obj_op->params.category = RGWObjCategory::MultiMeta;
-    obj_op->params.flags = PUT_OBJ_CREATE_EXCL;
-    obj_op->params.mtime = &mtime;
-    obj_op->params.attrs = &attrs;
+  std::unique_ptr<rgw::sal::MultipartUpload> upload;
+  upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(),
+                                      upload_id);
+  op_ret = upload->init(this, s->yield, s->obj_ctx, s->owner, s->dest_placement, attrs);
 
-    multipart_upload_info upload_info;
-    upload_info.dest_placement = s->dest_placement;
-
-    bufferlist bl;
-    encode(upload_info, bl);
-    obj_op->params.data = &bl;
-
-    op_ret = obj_op->prepare(s->yield);
+  if (op_ret == 0) {
+    upload_id = upload->get_upload_id();
+  }
 
-    op_ret = obj_op->write_meta(this, bl.length(), 0, s->yield);
-  } while (op_ret == -EEXIST);
-  
 }
 
 int RGWCompleteMultipart::verify_permission(optional_yield y)
@@ -6106,20 +5991,13 @@ void RGWCompleteMultipart::pre_exec()
 void RGWCompleteMultipart::execute(optional_yield y)
 {
   RGWMultiCompleteUpload *parts;
-  map<int, string>::iterator iter;
   RGWMultiXMLParser parser;
-  string meta_oid;
-  map<uint32_t, RGWUploadPartInfo> obj_parts;
-  map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
+  std::unique_ptr<rgw::sal::MultipartUpload> upload;
   rgw::sal::Attrs attrs;
   off_t ofs = 0;
-  MD5 hash;
-  char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
-  char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
   bufferlist etag_bl;
   std::unique_ptr<rgw::sal::Object> meta_obj;
   std::unique_ptr<rgw::sal::Object> target_obj;
-  RGWMPObj mp;
   RGWObjManifest manifest;
   uint64_t olh_epoch = 0;
 
@@ -6158,29 +6036,18 @@ void RGWCompleteMultipart::execute(optional_yield y)
     return;
   }
 
-  mp.init(s->object->get_name(), upload_id);
+  upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id);
 
 
-  meta_oid = mp.get_meta();
-
-  int total_parts = 0;
-  int handled_parts = 0;
-  int max_parts = 1000;
-  int marker = 0;
-  bool truncated;
   RGWCompressionInfo cs_info;
   bool compressed = false;
   uint64_t accounted_size = 0;
 
-  uint64_t min_part_size = s->cct->_conf->rgw_multipart_min_part_size;
-
   list<rgw_obj_index_key> remove_objs; /* objects to be removed from index listing */
 
   bool versioned_object = s->bucket->versioning_enabled();
 
-  iter = parts->parts.begin();
-
-  meta_obj = s->bucket->get_object(rgw_obj_key(meta_oid, string(), mp_ns));
+  meta_obj = upload->get_meta_obj();
   meta_obj->set_in_extra_data(true);
   meta_obj->set_hash_source(s->object->get_name());
 
@@ -6220,115 +6087,13 @@ void RGWCompleteMultipart::execute(optional_yield y)
     return;
   }
 
-  do {
-    op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts,
-                                 marker, obj_parts, &marker, &truncated);
-    if (op_ret == -ENOENT) {
-      op_ret = -ERR_NO_SUCH_UPLOAD;
-    }
-    if (op_ret < 0)
-      return;
-
-    total_parts += obj_parts.size();
-    if (!truncated && total_parts != (int)parts->parts.size()) {
-      ldpp_dout(this, 0) << "NOTICE: total parts mismatch: have: " << total_parts
-                      << " expected: " << parts->parts.size() << dendl;
-      op_ret = -ERR_INVALID_PART;
-      return;
-    }
-
-    for (obj_iter = obj_parts.begin(); iter != parts->parts.end() && obj_iter != obj_parts.end(); ++iter, ++obj_iter, ++handled_parts) {
-      uint64_t part_size = obj_iter->second.accounted_size;
-      if (handled_parts < (int)parts->parts.size() - 1 &&
-          part_size < min_part_size) {
-        op_ret = -ERR_TOO_SMALL;
-        return;
-      }
-
-      char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
-      if (iter->first != (int)obj_iter->first) {
-        ldpp_dout(this, 0) << "NOTICE: parts num mismatch: next requested: "
-                        << iter->first << " next uploaded: "
-                        << obj_iter->first << dendl;
-        op_ret = -ERR_INVALID_PART;
-        return;
-      }
-      string part_etag = rgw_string_unquote(iter->second);
-      if (part_etag.compare(obj_iter->second.etag) != 0) {
-        ldpp_dout(this, 0) << "NOTICE: etag mismatch: part: " << iter->first
-                        << " etag: " << iter->second << dendl;
-        op_ret = -ERR_INVALID_PART;
-        return;
-      }
-
-      hex_to_buf(obj_iter->second.etag.c_str(), petag,
-               CEPH_CRYPTO_MD5_DIGESTSIZE);
-      hash.Update((const unsigned char *)petag, sizeof(petag));
-
-      RGWUploadPartInfo& obj_part = obj_iter->second;
-
-      /* update manifest for part */
-      string oid = mp.get_part(obj_iter->second.num);
-      rgw_obj src_obj;
-      src_obj.init_ns(s->bucket->get_key(), oid, mp_ns);
-
-      if (obj_part.manifest.empty()) {
-        ldpp_dout(this, 0) << "ERROR: empty manifest for object part: obj="
-                        << src_obj << dendl;
-        op_ret = -ERR_INVALID_PART;
-        return;
-      } else {
-        manifest.append(this, obj_part.manifest, store->get_zone());
-      }
-
-      bool part_compressed = (obj_part.cs_info.compression_type != "none");
-      if ((handled_parts > 0) &&
-          ((part_compressed != compressed) ||
-            (cs_info.compression_type != obj_part.cs_info.compression_type))) {
-          ldpp_dout(this, 0) << "ERROR: compression type was changed during multipart upload ("
-                           << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
-          op_ret = -ERR_INVALID_PART;
-          return; 
-      }
-      
-      if (part_compressed) {
-        int64_t new_ofs; // offset in compression data for new part
-        if (cs_info.blocks.size() > 0)
-          new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
-        else
-          new_ofs = 0;
-        for (const auto& block : obj_part.cs_info.blocks) {
-          compression_block cb;
-          cb.old_ofs = block.old_ofs + cs_info.orig_size;
-          cb.new_ofs = new_ofs;
-          cb.len = block.len;
-          cs_info.blocks.push_back(cb);
-          new_ofs = cb.new_ofs + cb.len;
-        } 
-        if (!compressed)
-          cs_info.compression_type = obj_part.cs_info.compression_type;
-        cs_info.orig_size += obj_part.cs_info.orig_size;
-        compressed = true;
-      }
-
-      rgw_obj_index_key remove_key;
-      src_obj.key.get_index_key(&remove_key);
-
-      remove_objs.push_back(remove_key);
-
-      ofs += obj_part.size;
-      accounted_size += obj_part.accounted_size;
-    }
-  } while (truncated);
-  hash.Final((unsigned char *)final_etag);
-
-  buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
-  snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],  sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
-           "-%lld", (long long)parts->parts.size());
-  etag = final_etag_str;
-  ldpp_dout(this, 10) << "calculated etag: " << final_etag_str << dendl;
+  op_ret = upload->complete(this, s->cct, etag, manifest, parts->parts, remove_objs, accounted_size, compressed, cs_info, ofs);
+  if (op_ret < 0) {
+    ldpp_dout(this, 0) << "ERROR: upload complete failed ret=" << op_ret << dendl;
+    return;
+  }
 
-  etag_bl.append(final_etag_str, strlen(final_etag_str));
+  etag_bl.append(etag);
 
   attrs[RGW_ATTR_ETAG] = etag_bl;
 
@@ -6384,7 +6149,7 @@ void RGWCompleteMultipart::execute(optional_yield y)
   }
 
   // send request to notification manager
-  int ret = res->publish_commit(this, ofs, ceph::real_clock::now(), final_etag_str, target_obj->get_instance());
+  int ret = res->publish_commit(this, ofs, target_obj->get_mtime(), etag, target_obj->get_instance());
   if (ret < 0) {
     ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
     // too late to rollback operation, hence op_ret is not set here
@@ -6511,23 +6276,16 @@ void RGWAbortMultipart::execute(optional_yield y)
 {
   op_ret = -EINVAL;
   string upload_id;
-  string meta_oid;
   upload_id = s->info.args.get("uploadId");
   rgw_obj meta_obj;
-  RGWMPObj mp;
+  std::unique_ptr<rgw::sal::MultipartUpload> upload;
 
   if (upload_id.empty() || rgw::sal::Object::empty(s->object.get()))
     return;
 
-  mp.init(s->object->get_name(), upload_id);
-  meta_oid = mp.get_meta();
-
-  op_ret = get_multipart_info(this, s, meta_oid, nullptr);
-  if (op_ret < 0)
-    return;
-
+  upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id);
   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
-  op_ret = abort_multipart_upload(this, store, s->cct, obj_ctx, s->bucket.get(), mp);
+  op_ret = upload->abort(this, s->cct, obj_ctx);
 }
 
 int RGWListMultipart::verify_permission(optional_yield y)
@@ -6545,18 +6303,14 @@ void RGWListMultipart::pre_exec()
 
 void RGWListMultipart::execute(optional_yield y)
 {
-  string meta_oid;
-  RGWMPObj mp;
-
   op_ret = get_params(y);
   if (op_ret < 0)
     return;
 
-  mp.init(s->object->get_name(), upload_id);
-  meta_oid = mp.get_meta();
+  upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id);
 
   rgw::sal::Attrs attrs;
-  op_ret = get_multipart_info(this, s, meta_oid, nullptr, &attrs);
+  op_ret = upload->get_info(this, s->yield, s->obj_ctx, nullptr, &attrs);
   /* decode policy */
   map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_ACL);
   if (iter != attrs.end()) {
@@ -6571,8 +6325,7 @@ void RGWListMultipart::execute(optional_yield y)
   if (op_ret < 0)
     return;
 
-  op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts,
-                               marker, parts, NULL, &truncated);
+  op_ret = upload->list_parts(this, s->cct, max_parts, marker, NULL, &truncated);
 }
 
 int RGWListBucketMultiparts::verify_permission(optional_yield y)
@@ -6592,9 +6345,6 @@ void RGWListBucketMultiparts::pre_exec()
 
 void RGWListBucketMultiparts::execute(optional_yield y)
 {
-  vector<rgw_bucket_dir_entry> objs;
-  string marker_meta;
-
   op_ret = get_params(y);
   if (op_ret < 0)
     return;
@@ -6611,25 +6361,17 @@ void RGWListBucketMultiparts::execute(optional_yield y)
       delimiter="/";
     }
   }
-  marker_meta = marker.get_meta();
 
-  op_ret = list_bucket_multiparts(this, s->bucket.get(), prefix, marker_meta, delimiter,
-                                  max_uploads, &objs, &common_prefixes, &is_truncated);
+  op_ret = s->bucket->list_multiparts(this, prefix, marker_meta,
+                                     delimiter, max_uploads, uploads,
+                                     &common_prefixes, &is_truncated);
   if (op_ret < 0) {
     return;
   }
 
-  if (!objs.empty()) {
-    vector<rgw_bucket_dir_entry>::iterator iter;
-    RGWMultipartUploadEntry entry;
-    for (iter = objs.begin(); iter != objs.end(); ++iter) {
-      rgw_obj_key key(iter->key);
-      if (!entry.mp.from_meta(key.name))
-        continue;
-      entry.obj = *iter;
-      uploads.push_back(entry);
-    }
-    next_marker = entry;
+  if (!uploads.empty()) {
+    next_marker_key = uploads.back()->get_key();
+    next_marker_upload_id = uploads.back()->get_upload_id();
   }
 }
 
index 156d0c541b0dc426ed469b79f146730ef7e7dd7e..0c8e8ad40b52af2d817d85bdb419ddeefe8ba1b2 100644 (file)
@@ -1884,7 +1884,7 @@ public:
 class RGWListMultipart : public RGWOp {
 protected:
   string upload_id;
-  map<uint32_t, RGWUploadPartInfo> parts;
+  std::unique_ptr<rgw::sal::MultipartUpload> upload;
   int max_parts;
   int marker;
   RGWAccessControlPolicy policy;
@@ -1912,26 +1912,17 @@ public:
   uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
 };
 
-struct RGWMultipartUploadEntry {
-  rgw_bucket_dir_entry obj;
-  RGWMPObj mp;
-
-  friend std::ostream& operator<<(std::ostream& out,
-                                 const RGWMultipartUploadEntry& e) {
-    constexpr char quote = '"';
-    return out << "RGWMultipartUploadEntry{ obj.key=" <<
-      quote << e.obj.key << quote << " mp=" << e.mp << " }";
-  }
-};
-
 class RGWListBucketMultiparts : public RGWOp {
 protected:
   string prefix;
-  RGWMPObj marker; 
-  RGWMultipartUploadEntry next_marker; 
+  string marker_meta;
+  string marker_key;
+  string marker_upload_id;
+  string next_marker_key;
+  string next_marker_upload_id;
   int max_uploads;
   string delimiter;
-  vector<RGWMultipartUploadEntry> uploads;
+  vector<std::unique_ptr<rgw::sal::MultipartUpload>> uploads;
   map<string, bool> common_prefixes;
   bool is_truncated;
   int default_max;
index 39f2bb00a1359e9e00cafca0cbabeacabd699c7d..921b226738948bb975b069da52e4131fd2351759 100644 (file)
@@ -1449,21 +1449,15 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp,
 {
   constexpr int max_uploads = 1000;
   constexpr int max_parts = 1000;
-  static const std::string mp_ns = RGW_OBJ_NS_MULTIPART;
-  static MultipartMetaFilter mp_filter;
-
+  std::string marker;
+  vector<std::unique_ptr<rgw::sal::MultipartUpload>> uploads;
+  bool is_truncated;
   int ret;
 
-  rgw::sal::Bucket::ListParams params;
-  rgw::sal::Bucket::ListResults results;
-
-  params.ns = mp_ns;
-  params.filter = &mp_filter;
-  // use empty string for initial params.marker
   // use empty strings for params.{prefix,delim}
 
   do {
-    ret = bucket->list(dpp, params, max_uploads, results, null_yield);
+    ret = bucket->list_multiparts(dpp, string(), marker, string(), max_uploads, uploads, nullptr, &is_truncated);
     if (ret == -ENOENT) {
       // could bucket have been removed while this is running?
       ldpp_dout(dpp, 5) << "RGWRadosList::" << __func__ <<
@@ -1476,39 +1470,19 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp,
       return ret;
     }
 
-    if (!results.objs.empty()) {
-      std::vector<RGWMultipartUploadEntry> uploads;
-      RGWMultipartUploadEntry entry;
-      for (const rgw_bucket_dir_entry& obj : results.objs) {
-       const rgw_obj_key& key = obj.key;
-       if (!entry.mp.from_meta(key.name)) {
-         // we only want the meta objects, so skip all the components
-         continue;
-       }
-       entry.obj = obj;
-       uploads.push_back(entry);
-       ldpp_dout(dpp, 20) << "RGWRadosList::" << __func__ <<
-         " processing incomplete multipart entry " <<
-         entry << dendl;
-      }
-
+    if (!uploads.empty()) {
       // now process the uploads vector
       for (const auto& upload : uploads) {
-       const RGWMPObj& mp = upload.mp;
        int parts_marker = 0;
        bool is_parts_truncated = false;
 
        do { // while (is_parts_truncated);
-         std::map<uint32_t, RGWUploadPartInfo> parts;
-         ret = list_multipart_parts(dpp, bucket, store->ctx(),
-                                    mp.get_upload_id(), mp.get_meta(),
-                                    max_parts, parts_marker,
-                                    parts, &parts_marker,
-                                    &is_parts_truncated);
+         ret = upload->list_parts(dpp, store->ctx(), max_parts, parts_marker,
+                                  &parts_marker, &is_parts_truncated);
          if (ret == -ENOENT) {
            ldpp_dout(dpp, 5) <<  "RGWRadosList::" << __func__ <<
              ": WARNING: list_multipart_parts returned ret=-ENOENT "
-             "for " << mp.get_upload_id() << ", moving on" << dendl;
+             "for " << upload->get_upload_id() << ", moving on" << dendl;
            break;
          } else if (ret < 0) {
            ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
@@ -1517,8 +1491,10 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp,
            return ret;
          }
 
-         for (auto& p : parts) {
-           RGWObjManifest& manifest = p.second.manifest;
+         for (auto& p : upload->get_parts()) {
+           rgw::sal::RadosMultipartPart* part =
+             dynamic_cast<rgw::sal::RadosMultipartPart*>(p.second.get());
+           RGWObjManifest& manifest = part->get_manifest();
            for (auto obj_it = manifest.obj_begin(dpp);
                 obj_it != manifest.obj_end(dpp);
                 ++obj_it) {
@@ -1530,7 +1506,7 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp,
        } while (is_parts_truncated);
       } // for (const auto& upload
     } // if objs not empty
-  } while (results.is_truncated);
+  } while (is_truncated);
 
   return 0;
 } // RGWRadosList::do_incomplete_multipart
index e807c295ac4f71ab7349be1cc37110405054baa9..dc1135d9bfd533f2328bb7a76033ffdefd39b34c 100644 (file)
@@ -1612,8 +1612,14 @@ int RGWListBucketMultiparts_ObjStore::get_params(optional_yield y)
 
   string key_marker = s->info.args.get("key-marker");
   string upload_id_marker = s->info.args.get("upload-id-marker");
-  if (!key_marker.empty())
-    marker.init(key_marker, upload_id_marker);
+  if (!key_marker.empty()) {
+    std::unique_ptr<rgw::sal::MultipartUpload> upload;
+    upload = store->get_multipart_upload(s->bucket.get(), key_marker,
+                                        upload_id_marker);
+    marker_meta = upload->get_meta();
+    marker_key = upload->get_key();
+    marker_upload_id = upload->get_upload_id();
+  }
 
   return 0;
 }
index 316b36ef2f4fd0e54eae8c568b11be054f0da692..03608d9e09d83c5276130877c094f4025fb58c37 100644 (file)
@@ -2577,11 +2577,10 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter(
 {
   int res = 0;
   if (!multipart_upload_id.empty()) {
-    RGWMPObj mp(s->object->get_name(), multipart_upload_id);
-    std::unique_ptr<rgw::sal::Object> obj = s->bucket->get_object(
-                                               rgw_obj_key(mp.get_meta(),
-                                                           std::string(),
-                                                           RGW_OBJ_NS_MULTIPART));
+    std::unique_ptr<rgw::sal::MultipartUpload> upload =
+      store->get_multipart_upload(s->bucket.get(), s->object->get_name(),
+                                 multipart_upload_id);
+    std::unique_ptr<rgw::sal::Object> obj = upload->get_meta_obj();
     obj->set_in_extra_data(true);
     res = obj->get_obj_attrs(s->obj_ctx, s->yield, this);
     if (res == 0) {
@@ -3786,13 +3785,13 @@ void RGWListMultipart_ObjStore_S3::send_response()
   if (op_ret == 0) {
     dump_start(s);
     s->formatter->open_object_section_in_ns("ListPartsResult", XMLNS_AWS_S3);
-    map<uint32_t, RGWUploadPartInfo>::iterator iter;
-    map<uint32_t, RGWUploadPartInfo>::reverse_iterator test_iter;
+    map<uint32_t, std::unique_ptr<rgw::sal::MultipartPart>>::iterator iter;
+    map<uint32_t, std::unique_ptr<rgw::sal::MultipartPart>>::reverse_iterator test_iter;
     int cur_max = 0;
 
-    iter = parts.begin();
-    test_iter = parts.rbegin();
-    if (test_iter != parts.rend()) {
+    iter = upload->get_parts().begin();
+    test_iter = upload->get_parts().rbegin();
+    if (test_iter != upload->get_parts().rend()) {
       cur_max = test_iter->first;
     }
     if (!s->bucket_tenant.empty())
@@ -3809,16 +3808,16 @@ void RGWListMultipart_ObjStore_S3::send_response()
     ACLOwner& owner = policy.get_owner();
     dump_owner(s, owner.get_id(), owner.get_display_name());
 
-    for (; iter != parts.end(); ++iter) {
-      RGWUploadPartInfo& info = iter->second;
+    for (; iter != upload->get_parts().end(); ++iter) {
+      rgw::sal::MultipartPart* part = iter->second.get();
 
       s->formatter->open_object_section("Part");
 
-      dump_time(s, "LastModified", &info.modified);
+      dump_time(s, "LastModified", &part->get_mtime());
 
-      s->formatter->dump_unsigned("PartNumber", info.num);
-      s->formatter->dump_format("ETag", "\"%s\"", info.etag.c_str());
-      s->formatter->dump_unsigned("Size", info.accounted_size);
+      s->formatter->dump_unsigned("PartNumber", part->get_num());
+      s->formatter->dump_format("ETag", "\"%s\"", part->get_etag().c_str());
+      s->formatter->dump_unsigned("Size", part->get_size());
       s->formatter->close_section();
     }
     s->formatter->close_section();
@@ -3845,38 +3844,34 @@ void RGWListBucketMultiparts_ObjStore_S3::send_response()
   s->formatter->dump_string("Bucket", s->bucket_name);
   if (!prefix.empty())
     s->formatter->dump_string("ListMultipartUploadsResult.Prefix", prefix);
-  const string& key_marker = marker.get_key();
-  if (!key_marker.empty())
-    s->formatter->dump_string("KeyMarker", key_marker);
-  const string& upload_id_marker = marker.get_upload_id();
-  if (!upload_id_marker.empty())
-    s->formatter->dump_string("UploadIdMarker", upload_id_marker);
-  string next_key = next_marker.mp.get_key();
-  if (!next_key.empty())
-    s->formatter->dump_string("NextKeyMarker", next_key);
-  string next_upload_id = next_marker.mp.get_upload_id();
-  if (!next_upload_id.empty())
-    s->formatter->dump_string("NextUploadIdMarker", next_upload_id);
+  if (!marker_key.empty())
+    s->formatter->dump_string("KeyMarker", marker_key);
+  if (!marker_upload_id.empty())
+    s->formatter->dump_string("UploadIdMarker", marker_upload_id);
+  if (!next_marker_key.empty())
+    s->formatter->dump_string("NextKeyMarker", next_marker_key);
+  if (!next_marker_upload_id.empty())
+    s->formatter->dump_string("NextUploadIdMarker", next_marker_upload_id);
   s->formatter->dump_int("MaxUploads", max_uploads);
   if (!delimiter.empty())
     s->formatter->dump_string("Delimiter", delimiter);
   s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false"));
 
   if (op_ret >= 0) {
-    vector<RGWMultipartUploadEntry>::iterator iter;
+    vector<std::unique_ptr<rgw::sal::MultipartUpload>>::iterator iter;
     for (iter = uploads.begin(); iter != uploads.end(); ++iter) {
-      RGWMPObj& mp = iter->mp;
+      rgw::sal::MultipartUpload* upload = iter->get();
       s->formatter->open_array_section("Upload");
       if (encode_url) {
-        s->formatter->dump_string("Key", url_encode(mp.get_key(), false));
+        s->formatter->dump_string("Key", url_encode(upload->get_key(), false));
       } else {
-        s->formatter->dump_string("Key", mp.get_key());
+        s->formatter->dump_string("Key", upload->get_key());
       }
-      s->formatter->dump_string("UploadId", mp.get_upload_id());
+      s->formatter->dump_string("UploadId", upload->get_upload_id());
       dump_owner(s, s->user->get_id(), s->user->get_display_name(), "Initiator");
       dump_owner(s, s->user->get_id(), s->user->get_display_name());
       s->formatter->dump_string("StorageClass", "STANDARD");
-      dump_time(s, "Initiated", &iter->obj.meta.mtime);
+      dump_time(s, "Initiated", &upload->get_mtime());
       s->formatter->close_section();
     }
     if (!common_prefixes.empty()) {
index 7aa34080240a17b9753677e4f0520b901835830f..79bced546b9c95bab62e34ee3f07fcd46b49d3bb 100644 (file)
@@ -37,6 +37,8 @@ using RGWBucketSyncPolicyHandlerRef = std::shared_ptr<RGWBucketSyncPolicyHandler
 class RGWDataSyncStatusManager;
 class RGWSyncModuleInstance;
 typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
+class RGWCompressionInfo;
+
 namespace rgw {
   class Aio;
   namespace IAM { struct Policy; }
@@ -99,6 +101,7 @@ class User;
 class Bucket;
 class Object;
 class BucketList;
+class MultipartUpload;
 struct MPSerializer;
 class Lifecycle;
 class Notification;
@@ -228,6 +231,7 @@ class Store {
     virtual int get_oidc_providers(const DoutPrefixProvider *dpp,
                                   const std::string& tenant,
                                   vector<std::unique_ptr<RGWOIDCProvider>>& providers) = 0;
+    virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) = 0;
 
     virtual void finalize(void) = 0;
 
@@ -438,6 +442,18 @@ class Bucket {
     static bool empty(Bucket* b) { return (!b || b->empty()); }
     virtual std::unique_ptr<Bucket> clone() = 0;
 
+    virtual int list_multiparts(const DoutPrefixProvider *dpp,
+                               const string& prefix,
+                               string& marker,
+                               const string& delim,
+                               const int& max_uploads,
+                               vector<std::unique_ptr<MultipartUpload>>& uploads,
+                               map<string, bool> *common_prefixes,
+                               bool *is_truncated) = 0;
+    virtual int abort_multiparts(const DoutPrefixProvider *dpp,
+                                CephContext *cct,
+                                string& prefix, string& delim) = 0;
+
     /* dang - This is temporary, until the API is completed */
     rgw_bucket& get_key() { return info.bucket; }
     RGWBucketInfo& get_info() { return info; }
@@ -784,6 +800,74 @@ class Object {
     }
 };
 
+class MultipartPart {
+protected:
+  std::string oid;
+
+public:
+  MultipartPart() = default;
+  virtual ~MultipartPart() = default;
+
+  virtual uint32_t get_num() = 0;
+  virtual uint64_t get_size() = 0;
+  virtual const std::string& get_etag() = 0;
+  virtual ceph::real_time& get_mtime() = 0;
+};
+
+class MultipartUpload {
+protected:
+  Bucket* bucket;
+  std::map<uint32_t, std::unique_ptr<MultipartPart>> parts;
+
+public:
+  MultipartUpload(Bucket* _bucket) : bucket(_bucket) {}
+  virtual ~MultipartUpload() = default;
+
+  virtual const std::string& get_meta() const = 0;
+  virtual const std::string& get_key() const = 0;
+  virtual const std::string& get_upload_id() const = 0;
+  virtual ceph::real_time& get_mtime() = 0;
+
+  std::map<uint32_t, std::unique_ptr<MultipartPart>>& get_parts() { return parts; }
+
+  virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() = 0;
+
+  virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) = 0;
+  virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct,
+                        int num_parts, int marker,
+                        int* next_marker, bool* truncated,
+                        bool assume_unsorted = false) = 0;
+  virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
+                   RGWObjectCtx* obj_ctx) = 0;
+  virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct,
+                      std::string& etag, RGWObjManifest& manifest,
+                      map<int, string>& part_etags,
+                      list<rgw_obj_index_key>& remove_objs,
+                      uint64_t& accounted_size, bool& compressed,
+                      RGWCompressionInfo& cs_info, off_t& ofs) = 0;
+
+  virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0;
+
+  friend inline ostream& operator<<(ostream& out, const MultipartUpload& u) {
+    out << u.get_meta();
+    if (!u.get_upload_id().empty())
+      out << ":" << u.get_upload_id();
+    return out;
+  }
+  friend inline ostream& operator<<(ostream& out, const MultipartUpload* u) {
+    if (!u)
+      out << "<NULL>";
+    else
+      out << *u;
+    return out;
+  }
+  friend inline ostream& operator<<(ostream& out, const
+                                   std::unique_ptr<MultipartUpload>& p) {
+    out << p.get();
+    return out;
+  }
+};
+
 struct Serializer {
   Serializer() = default;
   virtual ~Serializer() = default;
index be0c29eec1023bbb26f6ea2a0d5a0000fd1de9ad..94b0d04e739a36c601d6beab84ba04266db07cba 100644 (file)
 
 #define dout_subsys ceph_subsys_rgw
 
+static string mp_ns = RGW_OBJ_NS_MULTIPART;
+
 namespace rgw::sal {
 
+struct multipart_upload_info
+{
+  rgw_placement_rule dest_placement;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(dest_placement, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(dest_placement, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(multipart_upload_info)
+
 // default number of entries to list with each bucket listing call
 // (use marker to bridge between calls)
 static constexpr size_t listing_max_entries = 1000;
@@ -266,7 +286,7 @@ int RadosBucket::remove_bucket(const DoutPrefixProvider* dpp,
 
   /* If there's a prefix, then we are aborting multiparts as well */
   if (!prefix.empty()) {
-    ret = abort_bucket_multiparts(dpp, store, store->ctx(), this, prefix, delimiter);
+    ret = abort_multiparts(dpp, store->ctx(), prefix, delimiter);
     if (ret < 0) {
       return ret;
     }
@@ -341,7 +361,7 @@ int RadosBucket::remove_bucket_bypass_gc(int concurrent_max, bool
 
   string prefix, delimiter;
 
-  ret = abort_bucket_multiparts(dpp, store, cct, this, prefix, delimiter);
+  ret = abort_multiparts(dpp, cct, prefix, delimiter);
   if (ret < 0) {
     return ret;
   }
@@ -744,6 +764,102 @@ int RadosBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int max
   return ret;
 }
 
+int RadosBucket::list_multiparts(const DoutPrefixProvider *dpp,
+                                const string& prefix,
+                                string& marker,
+                                const string& delim,
+                                const int& max_uploads,
+                                vector<std::unique_ptr<MultipartUpload>>& uploads,
+                                map<string, bool> *common_prefixes,
+                                bool *is_truncated)
+{
+  rgw::sal::Bucket::ListParams params;
+  rgw::sal::Bucket::ListResults results;
+  MultipartMetaFilter mp_filter;
+
+  params.prefix = prefix;
+  params.delim = delim;
+  params.marker = marker;
+  params.ns = RGW_OBJ_NS_MULTIPART;
+  params.filter = &mp_filter;
+
+  int ret = list(dpp, params, max_uploads, results, null_yield);
+
+  if (ret < 0)
+    return ret;
+
+  if (!results.objs.empty()) {
+    for (const rgw_bucket_dir_entry& dentry : results.objs) {
+      rgw_obj_key key(dentry.key);
+      uploads.push_back(store->get_multipart_upload(this, key.name));
+    }
+  }
+  if (common_prefixes) {
+    *common_prefixes = std::move(results.common_prefixes);
+  }
+  *is_truncated = results.is_truncated;
+  marker = params.marker.name;
+
+  return 0;
+}
+
+int RadosBucket::abort_multiparts(const DoutPrefixProvider *dpp,
+                                CephContext *cct,
+                                string& prefix, string& delim)
+{
+  constexpr int max = 1000;
+  int ret, num_deleted = 0;
+  vector<std::unique_ptr<MultipartUpload>> uploads;
+  RGWObjectCtx obj_ctx(store);
+  string marker;
+  bool is_truncated;
+
+  do {
+    ret = list_multiparts(dpp, prefix, marker, delim,
+                                max, uploads, nullptr, &is_truncated);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << __func__ <<
+       " ERROR : calling list_bucket_multiparts; ret=" << ret <<
+       "; bucket=\"" << this << "\"; prefix=\"" <<
+       prefix << "\"; delim=\"" << delim << "\"" << dendl;
+      return ret;
+    }
+    ldpp_dout(dpp, 20) << __func__ <<
+      " INFO: aborting and cleaning up multipart upload(s); bucket=\"" <<
+      this << "\"; uploads.size()=" << uploads.size() <<
+      "; is_truncated=" << is_truncated << dendl;
+
+    if (!uploads.empty()) {
+      for (const auto& upload : uploads) {
+       ret = upload->abort(dpp, cct, &obj_ctx);
+        if (ret < 0) {
+         // we're doing a best-effort; if something cannot be found,
+         // log it and keep moving forward
+         if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) {
+           ldpp_dout(dpp, 0) << __func__ <<
+             " ERROR : failed to abort and clean-up multipart upload \"" <<
+             upload->get_meta() << "\"" << dendl;
+           return ret;
+         } else {
+           ldpp_dout(dpp, 10) << __func__ <<
+             " NOTE : unable to find part(s) of "
+             "aborted multipart upload of \"" << upload->get_meta() <<
+             "\" for cleaning up" << dendl;
+         }
+        }
+        num_deleted++;
+      }
+      if (num_deleted) {
+        ldpp_dout(dpp, 0) << __func__ <<
+         " WARNING : aborted " << num_deleted <<
+         " incomplete multipart uploads" << dendl;
+      }
+    }
+  } while (is_truncated);
+
+  return 0;
+}
+
 std::unique_ptr<User> RadosStore::get_user(const rgw_user &u)
 {
   return std::unique_ptr<User>(new RadosUser(this, u));
@@ -1350,6 +1466,11 @@ int RadosStore::get_oidc_providers(const DoutPrefixProvider *dpp,
   return 0;
 }
 
+std::unique_ptr<MultipartUpload> RadosStore::get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id, ceph::real_time mtime)
+{
+  return std::unique_ptr<MultipartUpload>(new RadosMultipartUpload(this, bucket, oid, upload_id, mtime));
+}
+
 int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx* ioctx)
 {
   return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx);
@@ -1922,6 +2043,441 @@ int RadosObject::swift_versioning_copy(RGWObjectCtx* obj_ctx,
                                         y);
 }
 
+int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
+                               RGWObjectCtx *obj_ctx)
+{
+  std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
+  meta_obj->set_in_extra_data(true);
+  meta_obj->set_hash_source(mp_obj.get_key());
+  std::unique_ptr<rgw::sal::GCChain> chain = store->get_gc_chain(meta_obj.get());
+  list<rgw_obj_index_key> remove_objs;
+  bool truncated;
+  int marker = 0;
+  int ret;
+  uint64_t parts_accounted_size = 0;
+
+  do {
+    ret = list_parts(dpp, cct, 1000, marker, &marker, &truncated);
+    if (ret < 0) {
+      ldpp_dout(dpp, 20) << __func__ << ": RadosMultipartUpload::list_parts returned " <<
+       ret << dendl;
+      return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+    }
+
+    for (auto part_it = parts.begin();
+        part_it != parts.end();
+        ++part_it) {
+      RadosMultipartPart* obj_part = dynamic_cast<RadosMultipartPart*>(part_it->second.get());
+      if (obj_part->info.manifest.empty()) {
+       std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
+                                   rgw_obj_key(obj_part->oid, std::string(), RGW_OBJ_NS_MULTIPART));
+       obj->set_hash_source(mp_obj.get_key());
+       ret = obj->delete_object(dpp, obj_ctx, null_yield);
+        if (ret < 0 && ret != -ENOENT)
+          return ret;
+      } else {
+       chain->update(dpp, &obj_part->info.manifest);
+        RGWObjManifest::obj_iterator oiter = obj_part->info.manifest.obj_begin(dpp);
+        if (oiter != obj_part->info.manifest.obj_end(dpp)) {
+         std::unique_ptr<rgw::sal::Object> head = bucket->get_object(rgw_obj_key());
+          rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
+         head->raw_obj_to_obj(raw_head);
+
+          rgw_obj_index_key key;
+          head->get_key().get_index_key(&key);
+          remove_objs.push_back(key);
+        }
+      }
+      parts_accounted_size += obj_part->info.accounted_size;
+    }
+  } while (truncated);
+
+  /* use upload id as tag and do it synchronously */
+  ret = chain->send(mp_obj.get_upload_id());
+  if (ret < 0) {
+    ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
+    if (ret == -ENOENT) {
+      return -ERR_NO_SUCH_UPLOAD;
+    }
+    //Delete objects inline if send chain to gc fails
+    chain->delete_inline(dpp, mp_obj.get_upload_id());
+  }
+
+  std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op(obj_ctx);
+  del_op->params.bucket_owner = bucket->get_acl_owner();
+  del_op->params.versioning_status = 0;
+  if (!remove_objs.empty()) {
+    del_op->params.remove_objs = &remove_objs;
+  }
+  
+  del_op->params.abortmp = true;
+  del_op->params.parts_accounted_size = parts_accounted_size;
+
+  // and also remove the metadata obj
+  ret = del_op->delete_obj(dpp, null_yield);
+  if (ret < 0) {
+    ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
+      ret << dendl;
+  }
+  return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+}
+
+std::unique_ptr<rgw::sal::Object> RadosMultipartUpload::get_meta_obj()
+{
+  return bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns));
+}
+
+int RadosMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs)
+{
+  int ret;
+  std::string oid = mp_obj.get_key();
+
+  do {
+    char buf[33];
+    string tmp_obj_name;
+    std::unique_ptr<rgw::sal::Object> obj;
+    gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+    std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
+    upload_id.append(buf);
+
+    mp_obj.init(oid, upload_id);
+    tmp_obj_name = mp_obj.get_meta();
+
+    obj = bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns));
+    // the meta object will be indexed with 0 size, we c
+    obj->set_in_extra_data(true);
+    obj->set_hash_source(oid);
+
+    std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = obj->get_write_op(obj_ctx);
+
+    obj_op->params.versioning_disabled = true; /* no versioning for multipart meta */
+    obj_op->params.owner = owner;
+    obj_op->params.category = RGWObjCategory::MultiMeta;
+    obj_op->params.flags = PUT_OBJ_CREATE_EXCL;
+    obj_op->params.mtime = &mtime;
+    obj_op->params.attrs = &attrs;
+
+    multipart_upload_info upload_info;
+    upload_info.dest_placement = dest_placement;
+
+    bufferlist bl;
+    encode(upload_info, bl);
+    obj_op->params.data = &bl;
+
+    ret = obj_op->prepare(y);
+
+    ret = obj_op->write_meta(dpp, bl.length(), 0, y);
+  } while (ret == -EEXIST);
+
+  return ret;
+}
+
+int RadosMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
+                                    int num_parts, int marker,
+                                    int *next_marker, bool *truncated,
+                                    bool assume_unsorted)
+{
+  map<string, bufferlist> parts_map;
+  map<string, bufferlist>::iterator iter;
+
+  std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
+                     rgw_obj_key(get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
+  obj->set_in_extra_data(true);
+
+  bool sorted_omap = is_v2_upload_id(get_upload_id()) && !assume_unsorted;
+
+  parts.clear();
+
+  int ret;
+  if (sorted_omap) {
+    string p;
+    p = "part.";
+    char buf[32];
+
+    snprintf(buf, sizeof(buf), "%08d", marker);
+    p.append(buf);
+
+    ret = obj->omap_get_vals(dpp, p, num_parts + 1, &parts_map,
+                                 nullptr, null_yield);
+  } else {
+    ret = obj->omap_get_all(dpp, &parts_map, null_yield);
+  }
+  if (ret < 0) {
+    return ret;
+  }
+
+  int i;
+  int last_num = 0;
+
+  uint32_t expected_next = marker + 1;
+
+  for (i = 0, iter = parts_map.begin();
+       (i < num_parts || !sorted_omap) && iter != parts_map.end();
+       ++iter, ++i) {
+    bufferlist& bl = iter->second;
+    auto bli = bl.cbegin();
+    std::unique_ptr<RadosMultipartPart> part = std::make_unique<RadosMultipartPart>();
+    try {
+      decode(part->info, bli);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" <<
+       dendl;
+      return -EIO;
+    }
+    if (sorted_omap) {
+      if (part->info.num != expected_next) {
+        /* ouch, we expected a specific part num here, but we got a
+         * different one. Either a part is missing, or it could be a
+         * case of mixed rgw versions working on the same upload,
+         * where one gateway doesn't support correctly sorted omap
+         * keys for multipart upload just assume data is unsorted.
+         */
+        return list_parts(dpp, cct, num_parts, marker, next_marker, truncated, true);
+      }
+      expected_next++;
+    }
+    if (sorted_omap ||
+      (int)part->info.num > marker) {
+      last_num = part->info.num;
+      parts[part->info.num] = std::move(part);
+    }
+  }
+
+  if (sorted_omap) {
+    if (truncated) {
+      *truncated = (iter != parts_map.end());
+    }
+  } else {
+    /* rebuild a map with only num_parts entries */
+    std::map<uint32_t, std::unique_ptr<MultipartPart>> new_parts;
+    std::map<uint32_t, std::unique_ptr<MultipartPart>>::iterator piter;
+    for (i = 0, piter = parts.begin();
+        i < num_parts && piter != parts.end();
+        ++i, ++piter) {
+      last_num = piter->first;
+      new_parts[piter->first] = std::move(piter->second);
+    }
+
+    if (truncated) {
+      *truncated = (piter != parts.end());
+    }
+
+    parts.swap(new_parts);
+  }
+
+  if (next_marker) {
+    *next_marker = last_num;
+  }
+
+  return 0;
+}
+
+int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp, CephContext* cct,
+                                  std::string& etag, RGWObjManifest& manifest,
+                                  map<int, string>& part_etags,
+                                  list<rgw_obj_index_key>& remove_objs,
+                                  uint64_t& accounted_size, bool& compressed,
+                                  RGWCompressionInfo& cs_info, off_t& ofs)
+{
+  char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
+  MD5 hash;
+  bool truncated;
+  int ret;
+
+  int total_parts = 0;
+  int handled_parts = 0;
+  int max_parts = 1000;
+  int marker = 0;
+  uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
+  auto etags_iter = part_etags.begin();
+
+  do {
+    ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
+    if (ret == -ENOENT) {
+      ret = -ERR_NO_SUCH_UPLOAD;
+    }
+    if (ret < 0)
+      return ret;
+
+    total_parts += parts.size();
+    if (!truncated && total_parts != (int)part_etags.size()) {
+      ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts
+                      << " expected: " << part_etags.size() << dendl;
+      ret = -ERR_INVALID_PART;
+      return ret;
+    }
+
+    for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) {
+      RadosMultipartPart* part = dynamic_cast<rgw::sal::RadosMultipartPart*>(obj_iter->second.get());
+      uint64_t part_size = part->get_size();
+      if (handled_parts < (int)part_etags.size() - 1 &&
+          part_size < min_part_size) {
+        ret = -ERR_TOO_SMALL;
+        return ret;
+      }
+
+      char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+      if (etags_iter->first != (int)obj_iter->first) {
+        ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: "
+                        << etags_iter->first << " next uploaded: "
+                        << obj_iter->first << dendl;
+        ret = -ERR_INVALID_PART;
+        return ret;
+      }
+      string part_etag = rgw_string_unquote(etags_iter->second);
+      if (part_etag.compare(part->get_etag()) != 0) {
+        ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first
+                        << " etag: " << etags_iter->second << dendl;
+        ret = -ERR_INVALID_PART;
+        return ret;
+      }
+
+      hex_to_buf(part->get_etag().c_str(), petag,
+               CEPH_CRYPTO_MD5_DIGESTSIZE);
+      hash.Update((const unsigned char *)petag, sizeof(petag));
+
+      RGWUploadPartInfo& obj_part = part->info;
+
+      /* update manifest for part */
+      string oid = mp_obj.get_part(part->info.num);
+      rgw_obj src_obj;
+      src_obj.init_ns(bucket->get_key(), oid, mp_ns);
+
+      if (obj_part.manifest.empty()) {
+        ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj="
+                        << src_obj << dendl;
+        ret = -ERR_INVALID_PART;
+        return ret;
+      } else {
+        manifest.append(dpp, obj_part.manifest, store->get_zone());
+      }
+
+      bool part_compressed = (obj_part.cs_info.compression_type != "none");
+      if ((handled_parts > 0) &&
+          ((part_compressed != compressed) ||
+            (cs_info.compression_type != obj_part.cs_info.compression_type))) {
+          ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload ("
+                           << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
+          ret = -ERR_INVALID_PART;
+          return ret; 
+      }
+      
+      if (part_compressed) {
+        int64_t new_ofs; // offset in compression data for new part
+        if (cs_info.blocks.size() > 0)
+          new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
+        else
+          new_ofs = 0;
+        for (const auto& block : obj_part.cs_info.blocks) {
+          compression_block cb;
+          cb.old_ofs = block.old_ofs + cs_info.orig_size;
+          cb.new_ofs = new_ofs;
+          cb.len = block.len;
+          cs_info.blocks.push_back(cb);
+          new_ofs = cb.new_ofs + cb.len;
+        } 
+        if (!compressed)
+          cs_info.compression_type = obj_part.cs_info.compression_type;
+        cs_info.orig_size += obj_part.cs_info.orig_size;
+        compressed = true;
+      }
+
+      rgw_obj_index_key remove_key;
+      src_obj.key.get_index_key(&remove_key);
+
+      remove_objs.push_back(remove_key);
+
+      ofs += obj_part.size;
+      accounted_size += obj_part.accounted_size;
+    }
+  } while (truncated);
+  hash.Final((unsigned char *)final_etag);
+
+  buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
+  snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
+          sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
+           "-%lld", (long long)part_etags.size());
+  etag = final_etag_str;
+  ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl;
+
+  return ret;
+}
+
+int RadosMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
+{
+  if (!rule && !attrs) {
+    return 0;
+  }
+
+  if (rule) {
+    if (!placement.empty()) {
+      *rule = &placement;
+      if (!attrs) {
+       /* Don't need attrs, done */
+       return 0;
+      }
+    } else {
+      *rule = nullptr;
+    }
+  }
+
+  /* We need either attributes or placement, so we need a read */
+  std::unique_ptr<rgw::sal::Object> meta_obj;
+  meta_obj = get_meta_obj();
+  meta_obj->set_in_extra_data(true);
+
+  multipart_upload_info upload_info;
+  bufferlist headbl;
+
+  /* Read the obj head which contains the multipart_upload_info */
+  std::unique_ptr<rgw::sal::Object::ReadOp> read_op = meta_obj->get_read_op(obj_ctx);
+  meta_obj->set_prefetch_data(obj_ctx);
+
+  int ret = read_op->prepare(y, dpp);
+  if (ret < 0) {
+    if (ret == -ENOENT) {
+      return -ERR_NO_SUCH_UPLOAD;
+    }
+    return ret;
+  }
+
+  if (attrs) {
+    /* Attrs are filled in by prepare */
+    *attrs = meta_obj->get_attrs();
+    if (!rule || *rule != nullptr) {
+      /* placement was cached; don't actually read */
+      return 0;
+    }
+  }
+
+  /* Now read the placement from the head */
+  ret = read_op->read(0, store->ctx()->_conf->rgw_max_chunk_size, headbl, y, dpp);
+  if (ret < 0) {
+    if (ret == -ENOENT) {
+      return -ERR_NO_SUCH_UPLOAD;
+    }
+    return ret;
+  }
+
+  if (headbl.length() <= 0) {
+    return -ERR_NO_SUCH_UPLOAD;
+  }
+
+  /* Decode multipart_upload_info */
+  auto hiter = headbl.cbegin();
+  try {
+    decode(upload_info, hiter);
+  } catch (buffer::error& err) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to decode multipart upload info" << dendl;
+    return -EIO;
+  }
+  placement = upload_info.dest_placement;
+  *rule = &placement;
+
+  return 0;
+}
+
 MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) :
   lock(lock_name)
 {
index 5fdf440bff5867228ba92ee565459246b72e4d5c..dc057ffba40312d1c86b94393952ab6ca062dafc 100644 (file)
 #include "rgw_notify.h"
 #include "rgw_oidc_provider.h"
 #include "rgw_role.h"
+#include "rgw_multi.h"
+#include "services/svc_tier_rados.h"
 #include "cls/lock/cls_lock_client.h"
 
 namespace rgw { namespace sal {
 
 class RadosStore;
+class RadosMultipartUpload;
 
 class RadosCompletions : public Completions {
   public:
@@ -331,6 +334,17 @@ class RadosBucket : public Bucket {
     virtual std::unique_ptr<Bucket> clone() override {
       return std::make_unique<RadosBucket>(*this);
     }
+    virtual int list_multiparts(const DoutPrefixProvider *dpp,
+                               const string& prefix,
+                               string& marker,
+                               const string& delim,
+                               const int& max_uploads,
+                               vector<std::unique_ptr<MultipartUpload>>& uploads,
+                               map<string, bool> *common_prefixes,
+                               bool *is_truncated) override;
+    virtual int abort_multiparts(const DoutPrefixProvider *dpp,
+                                CephContext *cct,
+                                string& prefix, string& delim) override;
 
   private:
     int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);
@@ -465,6 +479,7 @@ class RadosStore : public Store {
     virtual int get_oidc_providers(const DoutPrefixProvider *dpp,
                                   const std::string& tenant,
                                   vector<std::unique_ptr<RGWOIDCProvider>>& providers) override;
+    virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) override;
 
     virtual void finalize(void) override;
 
@@ -493,6 +508,56 @@ class RadosStore : public Store {
     void setUserCtl(RGWUserCtl *_ctl) { user_ctl = _ctl; }
 };
 
+class RadosMultipartPart : public MultipartPart {
+protected:
+  RGWUploadPartInfo info;
+
+public:
+  RadosMultipartPart() = default;
+  virtual ~RadosMultipartPart() = default;
+
+  virtual uint32_t get_num() { return info.num; }
+  virtual uint64_t get_size() { return info.accounted_size; }
+  virtual const std::string& get_etag() { return info.etag; }
+  virtual ceph::real_time& get_mtime() { return info.modified; }
+
+  /* For RadosStore code */
+  RGWObjManifest& get_manifest() { return info.manifest; }
+
+  friend class RadosMultipartUpload;
+};
+
+class RadosMultipartUpload : public MultipartUpload {
+  RadosStore* store;
+  RGWMPObj mp_obj;
+  ceph::real_time mtime;
+  rgw_placement_rule placement;
+
+public:
+  RadosMultipartUpload(RadosStore* _store, Bucket* _bucket, const std::string& oid, std::optional<std::string> upload_id, ceph::real_time _mtime) : MultipartUpload(_bucket), store(_store), mp_obj(oid, upload_id), mtime(_mtime) {}
+  virtual ~RadosMultipartUpload() = default;
+
+  virtual const std::string& get_meta() const { return mp_obj.get_meta(); }
+  virtual const std::string& get_key() const { return mp_obj.get_key(); }
+  virtual const std::string& get_upload_id() const { return mp_obj.get_upload_id(); }
+  virtual ceph::real_time& get_mtime() { return mtime; }
+  virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() override;
+  virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) override;
+  virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct,
+                        int num_parts, int marker,
+                        int* next_marker, bool* truncated,
+                        bool assume_unsorted = false) override;
+  virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
+                   RGWObjectCtx* obj_ctx) override;
+  virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct,
+                      std::string& etag, RGWObjManifest& manifest,
+                      map<int, string>& part_etags,
+                      list<rgw_obj_index_key>& remove_objs,
+                      uint64_t& accounted_size, bool& compressed,
+                      RGWCompressionInfo& cs_info, off_t& ofs) override;
+  virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
+};
+
 class MPRadosSerializer : public MPSerializer {
   librados::IoCtx ioctx;
   rados::cls::lock::Lock lock;
index e46868db225ece63be4be217ded282ed6a831b2a..fd48e40e8a76d750bb7aa87a53d18cdd00a95ec1 100644 (file)
@@ -34,6 +34,13 @@ public:
   RGWMPObj(const string& _oid, const string& _upload_id) {
     init(_oid, _upload_id, _upload_id);
   }
+  RGWMPObj(const string& _oid, std::optional<string> _upload_id) {
+    if (_upload_id) {
+      init(_oid, *_upload_id, *_upload_id);
+    } else {
+      from_meta(_oid);
+    }
+  }
   void init(const string& _oid, const string& _upload_id) {
     init(_oid, _upload_id, _upload_id);
   }