]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/dbstore: Multipart upload APIs 43258/head
authorSoumya Koduri <skoduri@redhat.com>
Mon, 22 Nov 2021 16:58:19 +0000 (22:28 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Wed, 8 Dec 2021 17:33:12 +0000 (23:03 +0530)
For multipart upload processing, below is the method applied -

MultipartUpload::Init - create head object entry for meta obj (src_obj_name + "." + upload_id)
[ Meta object stores all the parts upload info]

MultipartWriter::process - create all data/tail objects with obj_name same as
meta obj (so that they can all be identified & deleted during abort)

MultipartUpload::Abort - Just delete meta obj .. that will indirectly delete all the uploads
associated with that upload id / meta obj so far.

MultipartUpload::Complete - Create head object of the original object (if not exists).
Rename all data/tail object entries' obj name to orig object name and update metadata of the orig object.

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
src/rgw/rgw_common.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_rados.cc
src/rgw/store/dbstore/common/dbstore.cc
src/rgw/store/dbstore/common/dbstore.h
src/rgw/store/dbstore/sqlite/sqliteDB.cc
src/rgw/store/dbstore/sqlite/sqliteDB.h
src/rgw/store/dbstore/tests/dbstore_tests.cc

index 3d90d5728ae9dc64b0e000116dd47515cd9b59fe..c1bfb55e106d2284d925573727c606815b6479d9 100644 (file)
@@ -1891,6 +1891,24 @@ inline std::ostream& operator<<(std::ostream& out, const rgw_obj &o) {
   return out << o.bucket.name << ":" << o.get_oid();
 }
 
+struct multipart_upload_info
+{
+  rgw_placement_rule dest_placement;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(dest_placement, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(dest_placement, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(multipart_upload_info)
+
 static inline void buf_to_hex(const unsigned char* const buf,
                               const size_t len,
                               char* const str)
index 92f85b641e093f80a0897395a5cbf392bb54919f..0dce21ec7684783b8822388bf8d51ad3ac99b911 100644 (file)
@@ -446,7 +446,8 @@ namespace rgw::sal {
                                const std::string& oid,
                                std::optional<std::string> upload_id,
                                ACLOwner owner, ceph::real_time mtime) {
-    return nullptr;
+    return std::make_unique<DBMultipartUpload>(this->store, this, oid, upload_id,
+                                               std::move(owner), mtime);
   }
 
   int DBBucket::list_multiparts(const DoutPrefixProvider *dpp,
@@ -653,7 +654,7 @@ namespace rgw::sal {
 
   MPSerializer* DBObject::get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name)
   {
-    return nullptr;
+    return new MPDBSerializer(dpp, store, this, lock_name);
   }
 
   int DBObject::transition(RGWObjectCtx& rctx,
@@ -839,6 +840,489 @@ namespace rgw::sal {
     return 0;
   }
 
+  int DBMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
+                               RGWObjectCtx *obj_ctx)
+  {
+    std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
+    meta_obj->set_in_extra_data(true);
+    meta_obj->set_hash_source(mp_obj.get_key());
+    int ret;
+
+    std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op(obj_ctx);
+    del_op->params.bucket_owner = bucket->get_acl_owner();
+    del_op->params.versioning_status = 0;
+  
+    // Since the data objects are associated with meta obj till
+    // MultipartUpload::Complete() is done, removing the metadata obj
+    // should remove all the uploads so far.
+    ret = del_op->delete_obj(dpp, null_yield);
+    if (ret < 0) {
+      ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
+        ret << dendl;
+    }
+    return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+  }
+
+  static string mp_ns = RGW_OBJ_NS_MULTIPART;
+
+  std::unique_ptr<rgw::sal::Object> DBMultipartUpload::get_meta_obj()
+  {
+    return bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns));
+  }
+
+  int DBMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs)
+  {
+    int ret;
+    std::string oid = mp_obj.get_key();
+
+    char buf[33];
+    std::unique_ptr<rgw::sal::Object> obj; // create meta obj
+    gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+    std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
+    upload_id.append(buf);
+
+    mp_obj.init(oid, upload_id);
+    obj = get_meta_obj();
+
+    DB::Object op_target(store->getDB(), obj->get_bucket()->get_info(),
+                              obj->get_obj());
+    DB::Object::Write obj_op(&op_target);
+
+    obj_op.meta.owner = owner.get_id();
+    obj_op.meta.category = RGWObjCategory::MultiMeta;
+    obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
+    obj_op.meta.mtime = &mtime;
+
+    multipart_upload_info upload_info;
+    upload_info.dest_placement = dest_placement;
+
+    bufferlist bl;
+    encode(upload_info, bl);
+    obj_op.meta.data = &bl; 
+    ret = obj_op.prepare(dpp);
+    if (ret < 0)
+      return ret;
+    ret = obj_op.write_meta(dpp, bl.length(), bl.length(), attrs);
+
+    return ret;
+  }
+
+  int DBMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
+                                    int num_parts, int marker,
+                                    int *next_marker, bool *truncated,
+                                    bool assume_unsorted)
+  {
+    std::list<RGWUploadPartInfo> parts_map;
+
+    std::unique_ptr<rgw::sal::Object> obj = get_meta_obj();
+
+    parts.clear();
+    int ret;
+
+    DB::Object op_target(store->getDB(),
+        obj->get_bucket()->get_info(), obj->get_obj());
+    ret = op_target.get_mp_parts_list(dpp, parts_map);
+    if (ret < 0) {
+      return ret;
+    }
+
+    int last_num = 0;
+
+    while (!parts_map.empty()) {
+      std::unique_ptr<DBMultipartPart> part = std::make_unique<DBMultipartPart>();
+      RGWUploadPartInfo &pinfo = parts_map.front();
+      part->set_info(pinfo);
+      if ((int)pinfo.num > marker) {
+        last_num = pinfo.num;
+        parts[pinfo.num] = std::move(part);
+      }
+      parts_map.pop_front();
+    }
+
+    /* rebuild a map with only num_parts entries */
+    std::map<uint32_t, std::unique_ptr<MultipartPart>> new_parts;
+    std::map<uint32_t, std::unique_ptr<MultipartPart>>::iterator piter;
+    int i;
+    for (i = 0, piter = parts.begin();
+        i < num_parts && piter != parts.end();
+        ++i, ++piter) {
+      last_num = piter->first;
+      new_parts[piter->first] = std::move(piter->second);
+    }
+
+    if (truncated) {
+      *truncated = (piter != parts.end());
+    }
+
+    parts.swap(new_parts);
+
+    if (next_marker) {
+      *next_marker = last_num;
+    }
+
+    return 0;
+  }
+
+  int DBMultipartUpload::complete(const DoutPrefixProvider *dpp,
+                                  optional_yield y, CephContext* cct,
+                                  map<int, string>& part_etags,
+                                  list<rgw_obj_index_key>& remove_objs,
+                                  uint64_t& accounted_size, bool& compressed,
+                                  RGWCompressionInfo& cs_info, off_t& ofs,
+                                  std::string& tag, ACLOwner& owner,
+                                  uint64_t olh_epoch,
+                                  rgw::sal::Object* target_obj,
+                                  RGWObjectCtx* obj_ctx)
+  {
+    char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+    char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
+    std::string etag;
+    bufferlist etag_bl;
+    MD5 hash;
+    bool truncated;
+    int ret;
+
+    int total_parts = 0;
+    int handled_parts = 0;
+    int max_parts = 1000;
+    int marker = 0;
+    uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
+    auto etags_iter = part_etags.begin();
+    rgw::sal::Attrs attrs = target_obj->get_attrs();
+
+    ofs = 0;
+    accounted_size = 0;
+    do {
+      ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
+      if (ret == -ENOENT) {
+        ret = -ERR_NO_SUCH_UPLOAD;
+      }
+      if (ret < 0)
+        return ret;
+
+      total_parts += parts.size();
+      if (!truncated && total_parts != (int)part_etags.size()) {
+        ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts
+                      << " expected: " << part_etags.size() << dendl;
+        ret = -ERR_INVALID_PART;
+        return ret;
+      }
+
+      for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) {
+        DBMultipartPart* part = dynamic_cast<rgw::sal::DBMultipartPart*>(obj_iter->second.get());
+        uint64_t part_size = part->get_size();
+        if (handled_parts < (int)part_etags.size() - 1 &&
+            part_size < min_part_size) {
+          ret = -ERR_TOO_SMALL;
+          return ret;
+        }
+
+        char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+        if (etags_iter->first != (int)obj_iter->first) {
+          ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: "
+                        << etags_iter->first << " next uploaded: "
+                        << obj_iter->first << dendl;
+          ret = -ERR_INVALID_PART;
+          return ret;
+        }
+        string part_etag = rgw_string_unquote(etags_iter->second);
+        if (part_etag.compare(part->get_etag()) != 0) {
+          ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first
+                        << " etag: " << etags_iter->second << dendl;
+          ret = -ERR_INVALID_PART;
+          return ret;
+        }
+
+        hex_to_buf(part->get_etag().c_str(), petag,
+                 CEPH_CRYPTO_MD5_DIGESTSIZE);
+        hash.Update((const unsigned char *)petag, sizeof(petag));
+
+        RGWUploadPartInfo& obj_part = part->get_info();
+
+        ofs += obj_part.size;
+        accounted_size += obj_part.accounted_size;
+      }
+    } while (truncated);
+    hash.Final((unsigned char *)final_etag);
+
+    buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
+    snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
+            sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
+           "-%lld", (long long)part_etags.size());
+    etag = final_etag_str;
+    ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl;
+
+    etag_bl.append(etag);
+
+    attrs[RGW_ATTR_ETAG] = etag_bl;
+
+    /* XXX: handle compression ? */
+
+    /* Rename all the object data entries with original object name (i.e
+     * from 'head_obj.name + "." + upload_id' to head_obj.name) */
+
+    /* Original head object */
+    DB::Object op_target(store->getDB(),
+                            target_obj->get_bucket()->get_info(),
+                            target_obj->get_obj());
+    DB::Object::Write obj_op(&op_target);
+    obj_op.prepare(NULL);
+
+    /* Meta object */
+    std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
+    DB::Object meta_op_target(store->getDB(),
+                            meta_obj->get_bucket()->get_info(),
+                            meta_obj->get_obj());
+    DB::Object::Write mp_op(&meta_op_target);
+    mp_op.update_mp_parts(dpp, target_obj->get_obj().key);
+
+    obj_op.meta.owner = owner.get_id();
+    obj_op.meta.flags = PUT_OBJ_CREATE;
+    obj_op.meta.modify_tail = true;
+    obj_op.meta.completeMultipart = true;
+
+    ret = obj_op.write_meta(dpp, ofs, accounted_size, attrs);
+    if (ret < 0)
+      return ret;
+
+    /* No need to delete Meta obj here. It is deleted from sal */
+    return ret;
+  }
+
+  int DBMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
+  {
+    if (!rule && !attrs) {
+      return 0;
+    }
+
+    if (rule) {
+      if (!placement.empty()) {
+        *rule = &placement;
+        if (!attrs) {
+           /* Don't need attrs, done */
+             return 0;
+        }
+      } else {
+        *rule = nullptr;
+      }
+    }
+
+    /* We need either attributes or placement, so we need a read */
+    std::unique_ptr<rgw::sal::Object> meta_obj;
+    meta_obj = get_meta_obj();
+    meta_obj->set_in_extra_data(true);
+
+    multipart_upload_info upload_info;
+    bufferlist headbl;
+
+    /* Read the obj head which contains the multipart_upload_info */
+    std::unique_ptr<rgw::sal::Object::ReadOp> read_op = meta_obj->get_read_op(obj_ctx);
+    int ret = read_op->prepare(y, dpp);
+    if (ret < 0) {
+      if (ret == -ENOENT) {
+        return -ERR_NO_SUCH_UPLOAD;
+      }
+      return ret;
+    }
+
+    if (attrs) {
+      /* Attrs are filled in by prepare */
+      *attrs = meta_obj->get_attrs();
+      if (!rule || *rule != nullptr) {
+        /* placement was cached; don't actually read */
+        return 0;
+      }
+    }
+
+    /* Now read the placement from the head */
+    ret = read_op->read(0, store->getDB()->get_max_head_size(), headbl, y, dpp);
+    if (ret < 0) {
+      if (ret == -ENOENT) {
+        return -ERR_NO_SUCH_UPLOAD;
+      }
+      return ret;
+    }
+
+    if (headbl.length() <= 0) {
+      return -ERR_NO_SUCH_UPLOAD;
+    }
+
+    /* Decode multipart_upload_info */
+    auto hiter = headbl.cbegin();
+    try {
+      decode(upload_info, hiter);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to decode multipart upload info" << dendl;
+      return -EIO;
+    }
+    placement = upload_info.dest_placement;
+    *rule = &placement;
+
+    return 0;
+  }
+
+  std::unique_ptr<Writer> DBMultipartUpload::get_writer(
+                                 const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 uint64_t part_num,
+                                 const std::string& part_num_str)
+  {
+    return std::make_unique<DBMultipartWriter>(dpp, y, this,
+                                std::move(_head_obj), store, owner,
+                                obj_ctx, ptail_placement_rule, part_num, part_num_str);
+  }
+
+  DBMultipartWriter::DBMultipartWriter(const DoutPrefixProvider *dpp,
+                   optional_yield y,
+                MultipartUpload* upload,
+                       std::unique_ptr<rgw::sal::Object> _head_obj,
+                       DBStore* _store,
+                   const rgw_user& _owner, RGWObjectCtx& obj_ctx,
+                   const rgw_placement_rule *_ptail_placement_rule,
+                uint64_t _part_num, const std::string& _part_num_str):
+                       Writer(dpp, y),
+                       store(_store),
+                owner(_owner),
+                ptail_placement_rule(_ptail_placement_rule),
+                head_obj(std::move(_head_obj)),
+                upload_id(upload->get_upload_id()),
+                oid(head_obj->get_name() + "." + upload_id +
+                    "." + std::to_string(part_num)),
+                meta_obj(((DBMultipartUpload*)upload)->get_meta_obj()),
+                op_target(_store->getDB(), meta_obj->get_bucket()->get_info(), meta_obj->get_obj()),
+                parent_op(&op_target), part_num(_part_num),
+                part_num_str(_part_num_str) { parent_op.prepare(NULL);}
+
+  int DBMultipartWriter::prepare(optional_yield y)
+  {
+    parent_op.set_mp_part_str(upload_id + "." + std::to_string(part_num));
+    // XXX: do we need to handle part_num_str??
+    return 0;
+  }
+
+  int DBMultipartWriter::process(bufferlist&& data, uint64_t offset)
+  {
+    /* XXX: same as AtomicWriter..consolidate code */
+    total_data_size += data.length();
+
+    /* XXX: Optimize all bufferlist copies in this function */
+
+    /* copy head_data into meta. But for multipart we do not
+     * need to write head_data */
+    uint64_t max_chunk_size = store->getDB()->get_max_chunk_size();
+    int excess_size = 0;
+
+    /* Accumulate tail_data till max_chunk_size or flush op */
+    bufferlist tail_data;
+
+    if (data.length() != 0) {
+        parent_op.meta.data = &head_data; /* Null data ?? */
+
+      /* handle tail )parts.
+       * First accumulate and write data into dbstore in its chunk_size
+       * parts
+       */
+      if (!tail_part_size) { /* new tail part */
+        tail_part_offset = offset;
+      }
+      data.begin(0).copy(data.length(), tail_data);
+      tail_part_size += tail_data.length();
+      tail_part_data.append(tail_data);
+
+      if (tail_part_size < max_chunk_size)  {
+        return 0;
+      } else {
+        int write_ofs = 0;
+        while (tail_part_size >= max_chunk_size) {
+          excess_size = tail_part_size - max_chunk_size;
+          bufferlist tmp;
+          tail_part_data.begin(write_ofs).copy(max_chunk_size, tmp);
+          /* write tail objects data */
+          int ret = parent_op.write_data(dpp, tmp, tail_part_offset);
+
+          if (ret < 0) {
+            return ret;
+          }
+
+          tail_part_size -= max_chunk_size;
+          write_ofs += max_chunk_size;
+          tail_part_offset += max_chunk_size;
+        }
+        /* reset tail parts or update if excess data */
+        if (excess_size > 0) { /* wrote max_chunk_size data */
+          tail_part_size = excess_size;
+          bufferlist tmp;
+          tail_part_data.begin(write_ofs).copy(excess_size, tmp);
+          tail_part_data = tmp;
+        } else {
+          tail_part_size = 0;
+          tail_part_data.clear();
+          tail_part_offset = 0;
+        }
+      }
+    } else {
+      if (tail_part_size == 0) {
+        return 0; /* nothing more to write */
+      }
+
+      /* flush watever tail data is present */
+      int ret = parent_op.write_data(dpp, tail_part_data, tail_part_offset);
+      if (ret < 0) {
+        return ret;
+      }
+      tail_part_size = 0;
+      tail_part_data.clear();
+      tail_part_offset = 0;
+    }
+
+    return 0;
+  }
+
+  int DBMultipartWriter::complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y)
+  {
+    int ret = 0;
+    /* XXX: same as AtomicWriter..consolidate code */
+    parent_op.meta.mtime = mtime;
+    parent_op.meta.delete_at = delete_at;
+    parent_op.meta.if_match = if_match;
+    parent_op.meta.if_nomatch = if_nomatch;
+    parent_op.meta.user_data = user_data;
+    parent_op.meta.zones_trace = zones_trace;
+    
+    /* XXX: handle accounted size */
+    accounted_size = total_data_size;
+
+    if (ret < 0)
+      return ret;
+
+    RGWUploadPartInfo info;
+    info.num = part_num;
+    info.etag = etag;
+    info.size = total_data_size;
+    info.accounted_size = accounted_size;
+    info.modified = real_clock::now();
+    //info.manifest = manifest;
+
+    DB::Object op_target(store->getDB(),
+        meta_obj->get_bucket()->get_info(), meta_obj->get_obj());
+    ret = op_target.add_mp_part(dpp, info);
+    if (ret < 0) {
+      return ret == -ENOENT ? -ERR_NO_SUCH_UPLOAD : ret;
+    }
+
+    return 0;
+  }
+
   DBAtomicWriter::DBAtomicWriter(const DoutPrefixProvider *dpp,
                    optional_yield y,
                        std::unique_ptr<rgw::sal::Object> _head_obj,
index 8007b6c48bc40e8badcbc086ee76d848ace10a02..bc2e952fcf1b9127190519ea58a72a16367b97aa 100644 (file)
@@ -19,6 +19,7 @@
 #include "rgw_oidc_provider.h"
 #include "rgw_role.h"
 #include "rgw_lc.h"
+#include "rgw_multi.h"
 
 #include "store/dbstore/common/dbstore.h"
 #include "store/dbstore/dbstore_mgr.h"
@@ -304,6 +305,133 @@ protected:
     }
   };
 
+  /*
+   * For multipart upload, below is the process flow -
+   *
+   * MultipartUpload::Init - create head object of meta obj (src_obj_name + "." + upload_id)
+   *                     [ Meta object stores all the parts upload info]
+   * MultipartWriter::process - create all data/tail objects with obj_name same as
+   *                        meta obj (so that they can all be identified & deleted 
+   *                        during abort)
+   * MultipartUpload::Abort - Just delete meta obj .. that will indirectly delete all the
+   *                     uploads associated with that upload id / meta obj so far.
+   * MultipartUpload::Complete - create head object of the original object (if not exists) &
+   *                     rename all data/tail objects to orig object name and update
+   *                     metadata of the orig object.
+   */
+  class DBMultipartPart : public MultipartPart {
+  protected:
+    RGWUploadPartInfo info; /* XXX: info contains manifest also which is not needed */
+
+  public:
+    DBMultipartPart() = default;
+    virtual ~DBMultipartPart() = default;
+
+    virtual RGWUploadPartInfo& get_info() { return info; }
+    virtual void set_info(const RGWUploadPartInfo& _info) { info = _info; }
+    virtual uint32_t get_num() { return info.num; }
+    virtual uint64_t get_size() { return info.accounted_size; }
+    virtual const std::string& get_etag() { return info.etag; }
+    virtual ceph::real_time& get_mtime() { return info.modified; }
+
+  };
+
+  class DBMPObj {
+    std::string oid; // object name
+    std::string upload_id;
+    std::string meta; // multipart meta object = <oid>.<upload_id>
+  public:
+    DBMPObj() {}
+    DBMPObj(const std::string& _oid, const std::string& _upload_id) {
+      init(_oid, _upload_id, _upload_id);
+    }
+    DBMPObj(const std::string& _oid, std::optional<std::string> _upload_id) {
+      if (_upload_id) {
+        init(_oid, *_upload_id, *_upload_id);
+      } else {
+        from_meta(_oid);
+      }
+    }
+    void init(const std::string& _oid, const std::string& _upload_id) {
+      init(_oid, _upload_id, _upload_id);
+    }
+    void init(const std::string& _oid, const std::string& _upload_id, const std::string& part_unique_str) {
+      if (_oid.empty()) {
+        clear();
+        return;
+      }
+      oid = _oid;
+      upload_id = _upload_id;
+      meta = oid + "." + upload_id;
+    }
+    const std::string& get_upload_id() const {
+      return upload_id;
+    }
+    const std::string& get_key() const {
+      return oid;
+    }
+    const std::string& get_meta() const { return meta; }
+    bool from_meta(const std::string& meta) {
+      int end_pos = meta.length();
+      int mid_pos = meta.rfind('.', end_pos - 1); // <key>.<upload_id>
+      if (mid_pos < 0)
+        return false;
+      oid = meta.substr(0, mid_pos);
+      upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
+      init(oid, upload_id, upload_id);
+      return true;
+    }
+    void clear() {
+      oid = "";
+      meta = "";
+      upload_id = "";
+    }
+  };
+
+  class DBMultipartUpload : public MultipartUpload {
+    DBStore* store;
+    DBMPObj mp_obj;
+    ACLOwner owner;
+    ceph::real_time mtime;
+    rgw_placement_rule placement;
+
+  public:
+    DBMultipartUpload(DBStore* _store, Bucket* _bucket, const std::string& oid, std::optional<std::string> upload_id, ACLOwner _owner, ceph::real_time _mtime) : MultipartUpload(_bucket), store(_store), mp_obj(oid, upload_id), owner(_owner), mtime(_mtime) {}
+    virtual ~DBMultipartUpload() = default;
+
+    virtual const std::string& get_meta() const { return mp_obj.get_meta(); }
+    virtual const std::string& get_key() const { return mp_obj.get_key(); }
+    virtual const std::string& get_upload_id() const { return mp_obj.get_upload_id(); }
+    virtual const ACLOwner& get_owner() const override { return owner; }
+    virtual ceph::real_time& get_mtime() { return mtime; }
+    virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() override;
+    virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) override;
+    virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct,
+                        int num_parts, int marker,
+                        int* next_marker, bool* truncated,
+                        bool assume_unsorted = false) override;
+    virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
+                   RGWObjectCtx* obj_ctx) override;
+    virtual int complete(const DoutPrefixProvider* dpp,
+                      optional_yield y, CephContext* cct,
+                      std::map<int, std::string>& part_etags,
+                      std::list<rgw_obj_index_key>& remove_objs,
+                      uint64_t& accounted_size, bool& compressed,
+                      RGWCompressionInfo& cs_info, off_t& ofs,
+                      std::string& tag, ACLOwner& owner,
+                      uint64_t olh_epoch,
+                      rgw::sal::Object* target_obj,
+                      RGWObjectCtx* obj_ctx) override;
+    virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
+    virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
+                         optional_yield y,
+                         std::unique_ptr<rgw::sal::Object> _head_obj,
+                         const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                         const rgw_placement_rule *ptail_placement_rule,
+                         uint64_t part_num,
+                         const std::string& part_num_str) override;
+  };
+
   class DBObject : public Object {
     private:
       DBStore* store;
@@ -432,6 +560,15 @@ protected:
       int read_attrs(const DoutPrefixProvider* dpp, DB::Object::Read &read_op, optional_yield y, rgw_obj* target_obj = nullptr);
   };
 
+  class MPDBSerializer : public MPSerializer {
+
+  public:
+    MPDBSerializer(const DoutPrefixProvider *dpp, DBStore* store, DBObject* obj, const std::string& lock_name) {}
+
+    virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override {return 0; }
+    virtual int unlock() override { return 0;}
+  };
+
   class DBAtomicWriter : public Writer {
     protected:
     rgw::sal::DBStore* store;
@@ -477,6 +614,54 @@ protected:
                          optional_yield y) override;
   };
 
+  class DBMultipartWriter : public Writer {
+  protected:
+    rgw::sal::DBStore* store;
+    const rgw_user& owner;
+       const rgw_placement_rule *ptail_placement_rule;
+       uint64_t olh_epoch;
+    std::unique_ptr<rgw::sal::Object> head_obj;
+    string upload_id;
+    string oid; /* object->name() + "." + "upload_id" + "." + part_num */
+    std::unique_ptr<rgw::sal::Object> meta_obj;
+    DB::Object op_target;
+    DB::Object::Write parent_op;
+    int part_num;
+    string part_num_str;
+    uint64_t total_data_size = 0; /* for total data being uploaded */
+    bufferlist head_data;
+    bufferlist tail_part_data;
+    uint64_t tail_part_offset;
+    uint64_t tail_part_size = 0; /* corresponds to each tail part being
+                                  written to dbstore */
+
+public:
+    DBMultipartWriter(const DoutPrefixProvider *dpp,
+                      optional_yield y, MultipartUpload* upload,
+                      std::unique_ptr<rgw::sal::Object> _head_obj,
+                      DBStore* _store,
+                      const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                      const rgw_placement_rule *ptail_placement_rule,
+                      uint64_t part_num, const std::string& part_num_str);
+    ~DBMultipartWriter() = default;
+
+    // prepare to start processing object data
+    virtual int prepare(optional_yield y) override;
+
+    // Process a bufferlist
+    virtual int process(bufferlist&& data, uint64_t offset) override;
+
+    // complete the operation and make its result visible to clients
+    virtual int complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y) override;
+  };
+
   class DBStore : public Store {
     private:
       /* DBStoreManager is used in case multiple
index 55932b649188f2cb29031e95634838e87f7e9585..036b95f80c862fb91fa220edf2ae17ffd999d814 100644 (file)
@@ -52,24 +52,6 @@ static string mp_ns = RGW_OBJ_NS_MULTIPART;
 
 namespace rgw::sal {
 
-struct multipart_upload_info
-{
-  rgw_placement_rule dest_placement;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(dest_placement, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(dest_placement, bl);
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(multipart_upload_info)
-
 // default number of entries to list with each bucket listing call
 // (use marker to bridge between calls)
 static constexpr size_t listing_max_entries = 1000;
index 1b851fa550cf5c79a8415c67216ffea7632ba97d..6a56bc338a852d15c54d3c6d0e5b6abdcf2973eb 100644 (file)
@@ -185,6 +185,8 @@ DBOp *DB::getDBOp(const DoutPrefixProvider *dpp, string Op, struct DBOpParams *p
     return Ob->ListBucketObjects;
   if (!Op.compare("PutObjectData"))
     return Ob->PutObjectData;
+  if (!Op.compare("UpdateObjectData"))
+    return Ob->UpdateObjectData;
   if (!Op.compare("GetObjectData"))
     return Ob->GetObjectData;
   if (!Op.compare("DeleteObjectData"))
@@ -790,13 +792,13 @@ int DB::raw_obj::InitializeParamsfromRawObj(const DoutPrefixProvider *dpp,
   params->op.obj.state.obj.key.instance = obj_instance;
   params->op.obj.state.obj.key.ns = obj_ns;
 
-  if (multipart_partnum != 0) {
+  if (multipart_part_str != "0.0") {
     params->op.obj.is_multipart = true;
   } else {
     params->op.obj.is_multipart = false;
   }
 
-  params->op.obj_data.multipart_part_num = multipart_partnum;
+  params->op.obj_data.multipart_part_str = multipart_part_str;
   params->op.obj_data.part_num = part_num;
 
   return ret;
@@ -894,6 +896,72 @@ out:
   return ret;
 }
 
+int DB::Object::add_mp_part(const DoutPrefixProvider *dpp,
+                            RGWUploadPartInfo info) {
+  int ret = 0;
+
+  DBOpParams params = {};
+
+  store->InitializeParams(dpp, "GetObject", &params);
+  InitializeParamsfromObject(dpp, &params);
+
+  ret = store->ProcessOp(dpp, "GetObject", &params);
+
+  if (ret) {
+    ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+    goto out;
+  }
+
+  /* pick one field check if object exists */
+  if (!params.op.obj.state.exists) {
+    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
+    return -1;
+  }
+
+  params.op.obj.mp_parts.push_back(info);
+  params.op.query_str = "mp";
+  params.op.obj.state.mtime = real_clock::now();
+
+  ret = store->ProcessOp(dpp, "UpdateObject", &params);
+
+  if (ret) {
+    ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <<ret<<") " << dendl;
+    goto out;
+  }
+
+out:
+  return ret;
+}
+
+int DB::Object::get_mp_parts_list(const DoutPrefixProvider *dpp,
+                                  std::list<RGWUploadPartInfo>& info)
+{
+  int ret = 0;
+  DBOpParams params = {};
+  std::map<std::string, bufferlist> omap;
+
+  store->InitializeParams(dpp, "GetObject", &params);
+  InitializeParamsfromObject(dpp, &params);
+
+  ret = store->ProcessOp(dpp, "GetObject", &params);
+
+  if (ret) {
+    ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
+    goto out;
+  }
+
+  /* pick one field check if object exists */
+  if (!params.op.obj.state.exists) {
+    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
+    return -1;
+  }
+
+  info = params.op.obj.mp_parts;
+
+out:
+  return ret;
+}
+
 /* Taken from rgw_rados.cc */
 void DB::gen_rand_obj_instance_name(rgw_obj_key *target_key)
 {
@@ -1353,9 +1421,9 @@ int DB::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl, const DoutP
 
   /* tail object */
   int part_num = (ofs / max_chunk_size);
-  /* XXX: Handle multipart_num */
+  /* XXX: Handle multipart_str */
   raw_obj read_obj(store, source->get_bucket_info().bucket.name, astate->obj.key.name, 
-      astate->obj.key.instance, astate->obj.key.ns, 0, part_num);
+      astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
 
   read_len = len;
 
@@ -1469,9 +1537,9 @@ int DB::Object::iterate_obj(const DoutPrefixProvider *dpp,
     part_num = (ofs / max_chunk_size);
     uint64_t read_len = std::min(len, max_chunk_size);
 
-    /* XXX: Handle multipart_num */
+    /* XXX: Handle multipart_str */
     raw_obj read_obj(store, get_bucket_info().bucket.name, astate->obj.key.name, 
-        astate->obj.key.instance, astate->obj.key.ns, 0, part_num);
+        astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
     bool reading_from_head = (ofs < head_data_size);
 
     r = cb(dpp, read_obj, ofs, read_len, reading_from_head, astate, arg);
@@ -1533,14 +1601,14 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp,
   /* XXX: Split into parts each of max_chunk_size. But later make tail
    * object chunk size limit to sqlite blob limit */
   int part_num = 0;
-  uint64_t max_chunk_size, max_head_size;
 
-  max_head_size = store->get_max_head_size();
-  max_chunk_size = store->get_max_chunk_size();
+  uint64_t max_chunk_size = store->get_max_chunk_size();
 
   /* tail_obj ofs should be greater than max_head_size */
-  if (ofs < max_head_size) {
-    return -1;
+  if (mp_part_str == "0.0")  { // ensure not multipart meta object
+    if (ofs < store->get_max_head_size()) {
+      return -1;
+    }
   }
   
   uint64_t end = data.length();
@@ -1552,9 +1620,9 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp,
     part_num = (ofs / max_chunk_size);
     uint64_t len = std::min(end, max_chunk_size);
 
-    /* XXX: Handle multipart_num */
+    /* XXX: Handle multipart_str */
     raw_obj write_obj(store, target->get_bucket_info().bucket.name, obj_state.obj.key.name, 
-        obj_state.obj.key.instance, obj_state.obj.key.ns, 0, part_num);
+        obj_state.obj.key.instance, obj_state.obj.key.ns, mp_part_str, part_num);
 
 
     ldpp_dout(dpp, 20) << "dbstore->write obj-ofs=" << ofs << " write_len=" << len << dendl;
@@ -1714,6 +1782,27 @@ int DB::Object::Write::write_meta(const DoutPrefixProvider *dpp, uint64_t size,
   return r;
 }
 
+int DB::Object::Write::update_mp_parts(const DoutPrefixProvider *dpp, rgw_obj_key new_obj_key)
+{
+  int ret = 0;
+  DBOpParams params = {};
+  DB *store = target->get_store();
+
+  store->InitializeParams(dpp, "UpdateObjectData", &params);
+  target->InitializeParamsfromObject(dpp, &params);
+
+  params.op.obj.new_obj_key = new_obj_key;
+
+  ret = store->ProcessOp(dpp, "UpdateObjectData", &params);
+
+  if (ret) {
+    ldpp_dout(dpp, 0)<<"In UpdateObjectData failed err:(" <<ret<<")" << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
 int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
   int ret = 0;
   DB *store = target->get_store();
index 18aef417cd7ffb44059b98de61630080576daee9..01b55908c5b73e9c3656e763a832fa66dde2b37e 100644 (file)
@@ -23,6 +23,7 @@
 #include "global/global_init.h"
 #include "common/ceph_context.h"
 #include "rgw/rgw_obj_manifest.h"
+#include "rgw/rgw_multi.h"
 
 using namespace std;
 
@@ -83,16 +84,21 @@ struct DBOpObjectInfo {
 
   /* Extra fields */
   bool is_multipart;
+  std::list<RGWUploadPartInfo> mp_parts;
+
   bufferlist head_data;
   string min_marker;
   string max_marker;
   list<rgw_bucket_dir_entry> list_entries;
+  /* Below used to update mp_parts obj name
+   * from meta object to src object on completion */
+  rgw_obj_key new_obj_key;
 };
 
 struct DBOpObjectDataInfo {
   RGWObjState state;
   uint64_t part_num;
-  uint64_t multipart_part_num;
+  string multipart_part_str;
   uint64_t offset;
   uint64_t size;
   bufferlist data{};
@@ -274,9 +280,15 @@ struct DBOpObjectPrepareInfo {
   string manifest_part_rules = ":manifest_part_rules";
   string omap = ":omap";
   string is_multipart = ":is_multipart";
+  string mp_parts = ":mp_parts";
   string head_data = ":head_data";
   string min_marker = ":min_marker";
   string max_marker = ":max_marker";
+  /* Below used to update mp_parts obj name
+   * from meta object to src object on completion */
+  string new_obj_name = ":new_obj_name";
+  string new_obj_instance = ":new_obj_instance";
+  string new_obj_ns  = ":new_obj_ns";
 };
 
 struct DBOpObjectDataPrepareInfo {
@@ -284,7 +296,7 @@ struct DBOpObjectDataPrepareInfo {
   string offset = ":offset";
   string data = ":data";
   string size = ":size";
-  string multipart_part_num = ":multipart_part_num";
+  string multipart_part_str = ":multipart_part_str";
 };
 
 struct DBOpLCEntryPrepareInfo {
@@ -359,6 +371,7 @@ class ObjectOp {
     class UpdateObjectOp *UpdateObject;
     class ListBucketObjectsOp *ListBucketObjects;
     class PutObjectDataOp *PutObjectData;
+    class UpdateObjectDataOp *UpdateObjectData;
     class GetObjectDataOp *GetObjectData;
     class DeleteObjectDataOp *DeleteObjectData;
 
@@ -546,14 +559,15 @@ class DBOp {
       ManifestPartRules   BLOB,   \
       Omap    BLOB,   \
       IsMultipart     BOOL,   \
+      MPPartsList    BLOB,   \
       HeadData  BLOB,   \
       PRIMARY KEY (ObjName, ObjInstance, BucketName), \
       FOREIGN KEY (BucketName) \
       REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);";
 
     const string CreateObjectDataTableQ =
-      /* Extra field 'MultipartPartNum' added which signifies multipart upload
-       * part number.  For regular object, it is '0'
+      /* Extra field 'MultipartPartStr' added which signifies multipart
+       * <uploadid + partnum>. For regular object, it is '0.0'
        *
        *  - part: a collection of stripes that make a contiguous part of an
        object. A regular object will only have one part (although might have
@@ -566,12 +580,12 @@ class DBOp {
       ObjInstance TEXT, \
       ObjNS TEXT, \
       BucketName TEXT NOT NULL , \
+      MultipartPartStr TEXT, \
       PartNum  INTEGER NOT NULL, \
       Offset   INTEGER, \
-      Data     BLOB,             \
       Size      INTEGER, \
-      MultipartPartNum INTEGER, \
-      PRIMARY KEY (ObjName, BucketName, ObjInstance, MultipartPartNum, PartNum), \
+      Data     BLOB,             \
+      PRIMARY KEY (ObjName, BucketName, ObjInstance, MultipartPartStr, PartNum), \
       FOREIGN KEY (BucketName, ObjName, ObjInstance) \
       REFERENCES '{}' (BucketName, ObjName, ObjInstance) ON DELETE CASCADE ON UPDATE CASCADE \n);";
 
@@ -933,10 +947,10 @@ class PutObjectOp: public DBOp {
        ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
        Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
        TailPlacementRuleName, TailPlacementStorageClass, \
-       ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData )     \
+       ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData )     \
       VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
           {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
-          {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
+          {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
 
   public:
     virtual ~PutObjectOp() {}
@@ -966,7 +980,7 @@ class PutObjectOp: public DBOp {
           params.op.obj.tail_placement_storage_class,
           params.op.obj.manifest_part_objs,
           params.op.obj.manifest_part_rules, params.op.obj.omap,
-          params.op.obj.is_multipart, params.op.obj.head_data);
+          params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data);
     }
 };
 
@@ -998,7 +1012,7 @@ class GetObjectOp: public DBOp {
       ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
       Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
       TailPlacementRuleName, TailPlacementStorageClass, \
-      ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData from '{}' \
+      ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
       where BucketName = {} and ObjName = {} and ObjInstance = {}";
 
   public:
@@ -1027,7 +1041,7 @@ class ListBucketObjectsOp: public DBOp {
       ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
       Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
       TailPlacementRuleName, TailPlacementStorageClass, \
-      ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData from '{}' \
+      ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
       where BucketName = {} and ObjName > {} ORDER BY ObjName ASC LIMIT {}";
   public:
     virtual ~ListBucketObjectsOp() {}
@@ -1051,6 +1065,9 @@ class UpdateObjectOp: public DBOp {
     const string AttrsQuery =
       "UPDATE '{}' SET ObjAttrs = {}, Mtime = {}  \
       where BucketName = {} and ObjName = {} and ObjInstance = {}";
+    const string MPQuery =
+      "UPDATE '{}' SET MPPartsList = {}, Mtime = {}  \
+      where BucketName = {} and ObjName = {} and ObjInstance = {}";
     const string MetaQuery =
       "UPDATE '{}' SET \
        ObjNS = {}, ACLs = {}, IndexVer = {}, Tag = {}, Flags = {}, VersionedEpoch = {}, \
@@ -1064,7 +1081,7 @@ class UpdateObjectOp: public DBOp {
        HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \
        TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \
        ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \
-       IsMultipart = {}, HeadData = {} \
+       IsMultipart = {}, MPPartsList = {}, HeadData = {} \
        WHERE ObjName = {} and ObjInstance = {} and BucketName = {}";
 
   public:
@@ -1087,6 +1104,14 @@ class UpdateObjectOp: public DBOp {
             params.op.obj.obj_name.c_str(),
             params.op.obj.obj_instance.c_str());
       }
+      if (params.op.query_str == "mp") {
+        return fmt::format(MPQuery.c_str(),
+            params.object_table.c_str(), params.op.obj.mp_parts.c_str(),
+            params.op.obj.mtime.c_str(),
+            params.op.bucket.bucket_name.c_str(),
+            params.op.obj.obj_name.c_str(),
+            params.op.obj.obj_instance.c_str());
+      }
       if (params.op.query_str == "meta") {
         return fmt::format(MetaQuery.c_str(),
           params.object_table.c_str(),
@@ -1111,7 +1136,7 @@ class UpdateObjectOp: public DBOp {
           params.op.obj.tail_placement_storage_class,
           params.op.obj.manifest_part_objs,
           params.op.obj.manifest_part_rules, params.op.obj.omap,
-          params.op.obj.is_multipart, params.op.obj.head_data,
+          params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data,
           params.op.obj.obj_name, params.op.obj.obj_instance,
           params.op.bucket.bucket_name);
       }
@@ -1123,7 +1148,7 @@ class PutObjectDataOp: public DBOp {
   private:
     const string Query =
       "INSERT OR REPLACE INTO '{}' \
-      (ObjName, ObjInstance, ObjNS, BucketName, PartNum, Offset, Data, Size, MultipartPartNum) \
+      (ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data) \
       VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {})";
 
   public:
@@ -1135,18 +1160,41 @@ class PutObjectDataOp: public DBOp {
           params.op.obj.obj_name, params.op.obj.obj_instance,
           params.op.obj.obj_ns,
           params.op.bucket.bucket_name.c_str(),
+          params.op.obj_data.multipart_part_str.c_str(),
           params.op.obj_data.part_num,
-          params.op.obj_data.offset.c_str(), params.op.obj_data.data.c_str(),
-          params.op.obj_data.size, params.op.obj_data.multipart_part_num);
+          params.op.obj_data.offset.c_str(),
+          params.op.obj_data.size,
+          params.op.obj_data.data.c_str());
     }
 };
 
+class UpdateObjectDataOp: public DBOp {
+  private:
+    const string Query =
+      "UPDATE '{}' \
+      SET ObjName = {}, ObjInstance = {}, ObjNS = {} \
+      WHERE ObjName = {} and ObjInstance = {} and ObjNS = {} and \
+      BucketName = {}";
+
+  public:
+    virtual ~UpdateObjectDataOp() {}
+
+    string Schema(DBOpPrepareParams &params) {
+      return fmt::format(Query.c_str(),
+          params.objectdata_table.c_str(),
+          params.op.obj.new_obj_name, params.op.obj.new_obj_instance,
+          params.op.obj.new_obj_ns,
+          params.op.obj.obj_name, params.op.obj.obj_instance,
+          params.op.obj.obj_ns,
+          params.op.bucket.bucket_name.c_str());
+    }
+};
 class GetObjectDataOp: public DBOp {
   private:
     const string Query =
       "SELECT  \
-      ObjName, ObjInstance, ObjNS, BucketName, PartNum, Offset, Data, Size, \
-      MultipartPartNum from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {}";
+      ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data \
+      from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {} ORDER BY MultipartPartStr, PartNum";
 
   public:
     virtual ~GetObjectDataOp() {}
@@ -1460,28 +1508,28 @@ class DB {
         const rgw_user* powner_id, map<std::string, bufferlist>* pattrs,
         ceph::real_time* pmtime, RGWObjVersionTracker* pobjv);
 
-    int get_max_head_size() { return ObjHeadSize; }
-    int get_max_chunk_size() { return ObjChunkSize; }
+    uint64_t get_max_head_size() { return ObjHeadSize; }
+    uint64_t get_max_chunk_size() { return ObjChunkSize; }
     void gen_rand_obj_instance_name(rgw_obj_key *target_key);
 
     // db raw obj string is of format -
-    // "<bucketname>_<objname>_<objinstance>_<multipart-partnum>_<partnum>"
+    // "<bucketname>_<objname>_<objinstance>_<multipart-part-str>_<partnum>"
     const string raw_obj_oid = "{0}_{1}_{2}_{3}_{4}";
 
     inline string to_oid(const string& bucket, const string& obj_name, const string& obj_instance,
-        uint64_t mp_num, uint64_t partnum) {
-      string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, mp_num, partnum);
+        string mp_str, uint64_t partnum) {
+      string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, mp_str, partnum);
       return s;
     }
     inline int from_oid(const string& oid, string& bucket, string& obj_name,
         string& obj_instance,
-        uint64_t& mp_num, uint64_t& partnum) {
+        string& mp_str, uint64_t& partnum) {
       vector<std::string> result;
       boost::split(result, oid, boost::is_any_of("_"));
       bucket = result[0];
       obj_name = result[1];
       obj_instance = result[2];
-      mp_num = stoi(result[3]);
+      mp_str = result[3];
       partnum = stoi(result[4]);
 
       return 0;
@@ -1494,7 +1542,7 @@ class DB {
       string obj_name;
       string obj_instance;
       string obj_ns;
-      uint64_t multipart_partnum;
+      string multipart_part_str;
       uint64_t part_num;
 
       string obj_table;
@@ -1505,13 +1553,13 @@ class DB {
       }
 
       raw_obj(DB* _db, string& _bname, string& _obj_name, string& _obj_instance,
-          string& _obj_ns, int _mp_partnum, int _part_num) {
+          string& _obj_ns, string _mp_part_str, int _part_num) {
         db = _db;
         bucket_name = _bname;
         obj_name = _obj_name;
         obj_instance = _obj_instance;
         obj_ns = _obj_ns;
-        multipart_partnum = _mp_partnum;
+        multipart_part_str = _mp_part_str;
         part_num = _part_num;
 
         obj_table = bucket_name+".object.table";
@@ -1522,10 +1570,10 @@ class DB {
         int r;
 
         db = _db;
-        r = db->from_oid(oid, bucket_name, obj_name, obj_instance, multipart_partnum,
+        r = db->from_oid(oid, bucket_name, obj_name, obj_instance, multipart_part_str,
             part_num);
         if (r < 0) {
-          multipart_partnum = 0;
+          multipart_part_str = "0.0";
           part_num = 0;
         }
 
@@ -1659,6 +1707,7 @@ class DB {
       struct Write {
         DB::Object *target;
         RGWObjState obj_state;
+        string mp_part_str = "0.0"; // multipart num
 
         struct MetaParams {
           ceph::real_time *mtime;
@@ -1690,6 +1739,7 @@ class DB {
 
         explicit Write(DB::Object *_target) : target(_target) {}
 
+        void set_mp_part_str(string _mp_part_str) { mp_part_str = _mp_part_str;}
         int prepare(const DoutPrefixProvider* dpp);
         int write_data(const DoutPrefixProvider* dpp,
                                bufferlist& data, uint64_t ofs);
@@ -1699,6 +1749,11 @@ class DB {
             bool assume_noent, bool modify_tail);
         int write_meta(const DoutPrefixProvider *dpp, uint64_t size,
             uint64_t accounted_size, map<string, bufferlist>& attrs);
+        /* Below are used to update mp data rows object name
+         * from meta to src object name on multipart upload
+         * completion
+         */
+        int update_mp_parts(const DoutPrefixProvider *dpp, rgw_obj_key new_obj_key);
       };
 
       struct Delete {
@@ -1764,6 +1819,8 @@ class DB {
           std::map<std::string, bufferlist> *m, bool* pmore);
       using iterate_obj_cb = int (*)(const DoutPrefixProvider*, const raw_obj&, off_t, off_t,
           bool, RGWObjState*, void*);
+      int add_mp_part(const DoutPrefixProvider *dpp, RGWUploadPartInfo info);
+      int get_mp_parts_list(const DoutPrefixProvider *dpp, std::list<RGWUploadPartInfo>& info);
 
       int iterate_obj(const DoutPrefixProvider *dpp,
           const RGWBucketInfo& bucket_info, const rgw_obj& obj,
index dc5463f5b09ff9587431811a8a6a9539f6abcb84..417a3f78a9b66d6a9a0619ba4fb55e00ddee2b6a 100644 (file)
@@ -259,6 +259,7 @@ enum GetObject {
   ManifestPartRules,
   Omap,
   IsMultipart,
+  MPPartsList,
   HeadData
 };
 
@@ -267,11 +268,11 @@ enum GetObjectData {
   ObjDataInstance,
   ObjDataNS,
   ObjDataBucketName,
+  MultipartPartStr,
   PartNum,
   Offset,
-  ObjData,
   ObjDataSize,
-  MultipartPartNum
+  ObjData
 };
 
 enum GetLCEntry {
@@ -444,6 +445,7 @@ static int list_object(const DoutPrefixProvider *dpp, DBOpInfo &op, sqlite3_stmt
   SQL_DECODE_BLOB_PARAM(dpp, stmt, ManifestPartRules, op.obj.rules, sdb);
   SQL_DECODE_BLOB_PARAM(dpp, stmt, Omap, op.obj.omap, sdb);
   op.obj.is_multipart = sqlite3_column_int(stmt, IsMultipart);
+  SQL_DECODE_BLOB_PARAM(dpp, stmt, MPPartsList, op.obj.mp_parts, sdb);
   SQL_DECODE_BLOB_PARAM(dpp, stmt, HeadData, op.obj.head_data, sdb);
   op.obj.state.data = op.obj.head_data;
 
@@ -481,7 +483,7 @@ static int get_objectdata(const DoutPrefixProvider *dpp, DBOpInfo &op, sqlite3_s
   op.obj_data.part_num = sqlite3_column_int(stmt, PartNum);
   op.obj_data.offset = sqlite3_column_int(stmt, Offset);
   op.obj_data.size = sqlite3_column_int(stmt, ObjDataSize);
-  op.obj_data.multipart_part_num = sqlite3_column_int(stmt, MultipartPartNum);
+  op.obj_data.multipart_part_str = (const char*)sqlite3_column_text(stmt, MultipartPartStr);
   SQL_DECODE_BLOB_PARAM(dpp, stmt, ObjData, op.obj_data.data, sdb);
 
   return 0;
@@ -977,6 +979,7 @@ int SQLObjectOp::InitializeObjectOps(string db_name, const DoutPrefixProvider *d
   UpdateObject = new SQLUpdateObject(sdb, db_name, cct);
   ListBucketObjects = new SQLListBucketObjects(sdb, db_name, cct);
   PutObjectData = new SQLPutObjectData(sdb, db_name, cct);
+  UpdateObjectData = new SQLUpdateObjectData(sdb, db_name, cct);
   GetObjectData = new SQLGetObjectData(sdb, db_name, cct);
   DeleteObjectData = new SQLDeleteObjectData(sdb, db_name, cct);
 
@@ -990,6 +993,7 @@ int SQLObjectOp::FreeObjectOps(const DoutPrefixProvider *dpp)
   delete GetObject;
   delete UpdateObject;
   delete PutObjectData;
+  delete UpdateObjectData;
   delete GetObjectData;
   delete DeleteObjectData;
 
@@ -1835,6 +1839,9 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_multipart.c_str(), sdb);
   SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_multipart, sdb);
 
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.mp_parts.c_str(), sdb);
+  SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.mp_parts, sdb);
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.head_data.c_str(), sdb);
   SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.head_data, sdb);
 
@@ -1980,6 +1987,8 @@ int SQLUpdateObject::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *p
     SQL_PREPARE(dpp, p_params, sdb, attrs_stmt, ret, "PrepareUpdateObject");
   } else if (params->op.query_str == "meta") {
     SQL_PREPARE(dpp, p_params, sdb, meta_stmt, ret, "PrepareUpdateObject");
+  } else if (params->op.query_str == "mp") {
+    SQL_PREPARE(dpp, p_params, sdb, mp_stmt, ret, "PrepareUpdateObject");
   } else {
     ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" <<
       params->op.query_str << dendl;
@@ -2004,6 +2013,8 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para
     stmt = &attrs_stmt;
   } else if (params->op.query_str == "meta") { 
     stmt = &meta_stmt;
+  } else if (params->op.query_str == "mp") { 
+    stmt = &mp_stmt;
   } else {
     ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" <<
       params->op.query_str << dendl;
@@ -2030,6 +2041,10 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para
     SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.obj_attrs.c_str(), sdb);
     SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.state.attrset, sdb);
   }
+  if (params->op.query_str == "mp") { 
+    SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.mp_parts.c_str(), sdb);
+    SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.mp_parts, sdb);
+  }
   if (params->op.query_str == "meta") { 
     SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.obj_ns.c_str(), sdb);
     SQL_BIND_TEXT(dpp, *stmt, index, params->op.obj.state.obj.key.ns.c_str(), sdb);
@@ -2157,6 +2172,9 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para
     SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_multipart.c_str(), sdb);
     SQL_BIND_INT(dpp, *stmt, index, params->op.obj.is_multipart, sdb);
 
+    SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.mp_parts.c_str(), sdb);
+    SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.mp_parts, sdb);
+
     SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.head_data.c_str(), sdb);
     SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.head_data, sdb);
   }
@@ -2176,6 +2194,8 @@ int SQLUpdateObject::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *p
     stmt = &attrs_stmt;
   } else if (params->op.query_str == "meta") { 
     stmt = &meta_stmt;
+  } else if (params->op.query_str == "mp") { 
+    stmt = &mp_stmt;
   } else {
     ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" <<
       params->op.query_str << dendl;
@@ -2312,9 +2332,9 @@ int SQLPutObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *par
 
   SQL_BIND_INT(dpp, stmt, index, params->op.obj_data.size, sdb);
 
-  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj_data.multipart_part_num.c_str(), sdb);
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj_data.multipart_part_str.c_str(), sdb);
 
-  SQL_BIND_INT(dpp, stmt, index, params->op.obj_data.multipart_part_num, sdb);
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj_data.multipart_part_str.c_str(), sdb);
 
 out:
   return rc;
@@ -2329,6 +2349,82 @@ out:
   return ret;
 }
 
+int SQLUpdateObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+  int ret = -1;
+  struct DBOpPrepareParams p_params = PrepareParams;
+  struct DBOpParams copy = *params;
+  string bucket_name = params->op.bucket.info.bucket.name;
+
+  if (!*sdb) {
+    ldpp_dout(dpp, 0)<<"In SQLUpdateObjectData - no db" << dendl;
+    goto out;
+  }
+
+  if (p_params.object_table.empty()) {
+    p_params.object_table = getObjectTable(bucket_name);
+  }
+  if (p_params.objectdata_table.empty()) {
+    p_params.objectdata_table = getObjectDataTable(bucket_name);
+  }
+  params->bucket_table = p_params.bucket_table;
+  params->object_table = p_params.object_table;
+  params->objectdata_table = p_params.objectdata_table;
+  (void)createObjectDataTable(dpp, params);
+
+  SQL_PREPARE(dpp, p_params, sdb, stmt, ret, "PrepareUpdateObjectData");
+
+out:
+  return ret;
+}
+
+int SQLUpdateObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+  int index = -1;
+  int rc = 0;
+  struct DBOpPrepareParams p_params = PrepareParams;
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name.c_str(), sdb);
+
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_instance.c_str(), sdb);
+
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.instance.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_ns.c_str(), sdb);
+
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.ns.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name.c_str(), sdb);
+
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_name.c_str(), sdb);
+
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.name.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_instance.c_str(), sdb);
+
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.instance.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_ns.c_str(), sdb);
+
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.ns.c_str(), sdb);
+
+out:
+  return rc;
+}
+
+int SQLUpdateObjectData::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+  int ret = -1;
+
+  SQL_EXECUTE(dpp, params, stmt, NULL);
+out:
+  return ret;
+}
+
 int SQLGetObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
 {
   int ret = -1;
index 55e3dc33aa348fee2ce6e1f14a12f6e56e3117d2..3636cebade629175e14aa96538149b2c9ead8809 100644 (file)
@@ -286,6 +286,7 @@ class SQLUpdateObject : public SQLiteDB, public UpdateObjectOp {
     sqlite3_stmt *omap_stmt = NULL; // Prepared statement
     sqlite3_stmt *attrs_stmt = NULL; // Prepared statement
     sqlite3_stmt *meta_stmt = NULL; // Prepared statement
+    sqlite3_stmt *mp_stmt = NULL; // Prepared statement
 
   public:
     SQLUpdateObject(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
@@ -341,6 +342,24 @@ class SQLPutObjectData : public SQLiteDB, public PutObjectDataOp {
     int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
 };
 
+class SQLUpdateObjectData : public SQLiteDB, public UpdateObjectDataOp {
+  private:
+    sqlite3 **sdb = NULL;
+    sqlite3_stmt *stmt = NULL; // Prepared statement
+
+  public:
+    SQLUpdateObjectData(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
+    SQLUpdateObjectData(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
+
+    ~SQLUpdateObjectData() {
+      if (stmt)
+        sqlite3_finalize(stmt);
+    }
+    int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params);
+    int Execute(const DoutPrefixProvider *dpp, DBOpParams *params);
+    int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
+};
+
 class SQLGetObjectData : public SQLiteDB, public GetObjectDataOp {
   private:
     sqlite3 **sdb = NULL;
index fcdd6a079a6da90cff8050d8be333c62327a8df5..4591abdc5d777cebe0127570d3fd9ba7043becab 100644 (file)
@@ -609,11 +609,16 @@ TEST_F(DBStoreTest, PutObject) {
   ret = db->ProcessOp(dpp, "PutObject", &params);
   ASSERT_EQ(ret, 0);
 
-  /* Insert another object */
+  /* Insert another objects */
   params.op.obj.state.obj.key.name = "object2";
   params.op.obj.state.obj.key.instance = "inst2";
   ret = db->ProcessOp(dpp, "PutObject", &params);
   ASSERT_EQ(ret, 0);
+
+  params.op.obj.state.obj.key.name = "object3";
+  params.op.obj.state.obj.key.instance = "inst3";
+  ret = db->ProcessOp(dpp, "PutObject", &params);
+  ASSERT_EQ(ret, 0);
 }
 
 TEST_F(DBStoreTest, ListAllObjects) {
@@ -957,7 +962,7 @@ TEST_F(DBStoreTest, PutObjectData) {
 
   params.op.obj_data.part_num = 1;
   params.op.obj_data.offset = 10;
-  params.op.obj_data.multipart_part_num = 2;
+  params.op.obj_data.multipart_part_str = "2";
   bufferlist b1;
   encode("HELLO WORLD", b1);
   params.op.obj_data.data = b1;
@@ -966,15 +971,29 @@ TEST_F(DBStoreTest, PutObjectData) {
   ASSERT_EQ(ret, 0);
 }
 
+TEST_F(DBStoreTest, UpdateObjectData) {
+  struct DBOpParams params = GlobalParams;
+  int ret = -1;
+
+  params.op.obj.new_obj_key.name = "object3";
+  params.op.obj.new_obj_key.instance = "inst3";
+  ret = db->ProcessOp(dpp, "UpdateObjectData", &params);
+  ASSERT_EQ(ret, 0);
+}
+
 TEST_F(DBStoreTest, GetObjectData) {
   struct DBOpParams params = GlobalParams;
   int ret = -1;
 
+  params.op.obj.state.obj.key.instance = "inst3";
+  params.op.obj.state.obj.key.name = "object3";
   ret = db->ProcessOp(dpp, "GetObjectData", &params);
   ASSERT_EQ(ret, 0);
   ASSERT_EQ(params.op.obj_data.part_num, 1);
   ASSERT_EQ(params.op.obj_data.offset, 10);
-  ASSERT_EQ(params.op.obj_data.multipart_part_num, 2);
+  ASSERT_EQ(params.op.obj_data.multipart_part_str, "2");
+  ASSERT_EQ(params.op.obj.state.obj.key.instance, "inst3");
+  ASSERT_EQ(params.op.obj.state.obj.key.name, "object3");
   string data;
   decode(data, params.op.obj_data.data);
   ASSERT_EQ(data, "HELLO WORLD");