From: Soumya Koduri Date: Mon, 22 Nov 2021 16:58:19 +0000 (+0530) Subject: rgw/dbstore: Multipart upload APIs X-Git-Tag: v17.1.0~263^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a7100972ad9f4ce359770cc1f0fe6ba0d353c07c;p=ceph-ci.git rgw/dbstore: Multipart upload APIs For multipart upload processing, below is the method applied - MultipartUpload::Init - create head object entry for meta obj (src_obj_name + "." + upload_id) [ Meta object stores all the parts upload info] MultipartWriter::process - create all data/tail objects with obj_name same as meta obj (so that they can all be identified & deleted during abort) MultipartUpload::Abort - Just delete meta obj .. that will indirectly delete all the uploads associated with that upload id / meta obj so far. MultipartUpload::Complete - Create head object of the original object (if not exists). Rename all data/tail object entries' obj name to orig object name and update metadata of the orig object. Signed-off-by: Soumya Koduri --- diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 3d90d5728ae..c1bfb55e106 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -1891,6 +1891,24 @@ inline std::ostream& operator<<(std::ostream& out, const rgw_obj &o) { return out << o.bucket.name << ":" << o.get_oid(); } +struct multipart_upload_info +{ + rgw_placement_rule dest_placement; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(dest_placement, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(dest_placement, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(multipart_upload_info) + static inline void buf_to_hex(const unsigned char* const buf, const size_t len, char* const str) diff --git a/src/rgw/rgw_sal_dbstore.cc b/src/rgw/rgw_sal_dbstore.cc index 92f85b641e0..0dce21ec768 100644 --- a/src/rgw/rgw_sal_dbstore.cc +++ b/src/rgw/rgw_sal_dbstore.cc @@ -446,7 +446,8 @@ namespace rgw::sal { const std::string& oid, std::optional upload_id, ACLOwner owner, ceph::real_time mtime) { - return nullptr; + return std::make_unique(this->store, this, oid, upload_id, + std::move(owner), mtime); } int DBBucket::list_multiparts(const DoutPrefixProvider *dpp, @@ -653,7 +654,7 @@ namespace rgw::sal { MPSerializer* DBObject::get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name) { - return nullptr; + return new MPDBSerializer(dpp, store, this, lock_name); } int DBObject::transition(RGWObjectCtx& rctx, @@ -839,6 +840,489 @@ namespace rgw::sal { return 0; } + int DBMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct, + RGWObjectCtx *obj_ctx) + { + std::unique_ptr meta_obj = get_meta_obj(); + meta_obj->set_in_extra_data(true); + meta_obj->set_hash_source(mp_obj.get_key()); + int ret; + + std::unique_ptr del_op = meta_obj->get_delete_op(obj_ctx); + del_op->params.bucket_owner = bucket->get_acl_owner(); + del_op->params.versioning_status = 0; + + // Since the data objects are associated with meta obj till + // MultipartUpload::Complete() is done, removing the metadata obj + // should remove all the uploads so far. + ret = del_op->delete_obj(dpp, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " << + ret << dendl; + } + return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; + } + + static string mp_ns = RGW_OBJ_NS_MULTIPART; + + std::unique_ptr DBMultipartUpload::get_meta_obj() + { + return bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns)); + } + + int DBMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) + { + int ret; + std::string oid = mp_obj.get_key(); + + char buf[33]; + std::unique_ptr obj; // create meta obj + gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); + std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */ + upload_id.append(buf); + + mp_obj.init(oid, upload_id); + obj = get_meta_obj(); + + DB::Object op_target(store->getDB(), obj->get_bucket()->get_info(), + obj->get_obj()); + DB::Object::Write obj_op(&op_target); + + obj_op.meta.owner = owner.get_id(); + obj_op.meta.category = RGWObjCategory::MultiMeta; + obj_op.meta.flags = PUT_OBJ_CREATE_EXCL; + obj_op.meta.mtime = &mtime; + + multipart_upload_info upload_info; + upload_info.dest_placement = dest_placement; + + bufferlist bl; + encode(upload_info, bl); + obj_op.meta.data = &bl; + ret = obj_op.prepare(dpp); + if (ret < 0) + return ret; + ret = obj_op.write_meta(dpp, bl.length(), bl.length(), attrs); + + return ret; + } + + int DBMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct, + int num_parts, int marker, + int *next_marker, bool *truncated, + bool assume_unsorted) + { + std::list parts_map; + + std::unique_ptr obj = get_meta_obj(); + + parts.clear(); + int ret; + + DB::Object op_target(store->getDB(), + obj->get_bucket()->get_info(), obj->get_obj()); + ret = op_target.get_mp_parts_list(dpp, parts_map); + if (ret < 0) { + return ret; + } + + int last_num = 0; + + while (!parts_map.empty()) { + std::unique_ptr part = std::make_unique(); + RGWUploadPartInfo &pinfo = parts_map.front(); + part->set_info(pinfo); + if ((int)pinfo.num > marker) { + last_num = pinfo.num; + parts[pinfo.num] = std::move(part); + } + parts_map.pop_front(); + } + + /* rebuild a map with only num_parts entries */ + std::map> new_parts; + std::map>::iterator piter; + int i; + for (i = 0, piter = parts.begin(); + i < num_parts && piter != parts.end(); + ++i, ++piter) { + last_num = piter->first; + new_parts[piter->first] = std::move(piter->second); + } + + if (truncated) { + *truncated = (piter != parts.end()); + } + + parts.swap(new_parts); + + if (next_marker) { + *next_marker = last_num; + } + + return 0; + } + + int DBMultipartUpload::complete(const DoutPrefixProvider *dpp, + optional_yield y, CephContext* cct, + map& part_etags, + list& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& ofs, + std::string& tag, ACLOwner& owner, + uint64_t olh_epoch, + rgw::sal::Object* target_obj, + RGWObjectCtx* obj_ctx) + { + char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; + std::string etag; + bufferlist etag_bl; + MD5 hash; + bool truncated; + int ret; + + int total_parts = 0; + int handled_parts = 0; + int max_parts = 1000; + int marker = 0; + uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; + auto etags_iter = part_etags.begin(); + rgw::sal::Attrs attrs = target_obj->get_attrs(); + + ofs = 0; + accounted_size = 0; + do { + ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated); + if (ret == -ENOENT) { + ret = -ERR_NO_SUCH_UPLOAD; + } + if (ret < 0) + return ret; + + total_parts += parts.size(); + if (!truncated && total_parts != (int)part_etags.size()) { + ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts + << " expected: " << part_etags.size() << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) { + DBMultipartPart* part = dynamic_cast(obj_iter->second.get()); + uint64_t part_size = part->get_size(); + if (handled_parts < (int)part_etags.size() - 1 && + part_size < min_part_size) { + ret = -ERR_TOO_SMALL; + return ret; + } + + char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + if (etags_iter->first != (int)obj_iter->first) { + ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: " + << etags_iter->first << " next uploaded: " + << obj_iter->first << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + string part_etag = rgw_string_unquote(etags_iter->second); + if (part_etag.compare(part->get_etag()) != 0) { + ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first + << " etag: " << etags_iter->second << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + hex_to_buf(part->get_etag().c_str(), petag, + CEPH_CRYPTO_MD5_DIGESTSIZE); + hash.Update((const unsigned char *)petag, sizeof(petag)); + + RGWUploadPartInfo& obj_part = part->get_info(); + + ofs += obj_part.size; + accounted_size += obj_part.accounted_size; + } + } while (truncated); + hash.Final((unsigned char *)final_etag); + + buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); + snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], + sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, + "-%lld", (long long)part_etags.size()); + etag = final_etag_str; + ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl; + + etag_bl.append(etag); + + attrs[RGW_ATTR_ETAG] = etag_bl; + + /* XXX: handle compression ? */ + + /* Rename all the object data entries with original object name (i.e + * from 'head_obj.name + "." + upload_id' to head_obj.name) */ + + /* Original head object */ + DB::Object op_target(store->getDB(), + target_obj->get_bucket()->get_info(), + target_obj->get_obj()); + DB::Object::Write obj_op(&op_target); + obj_op.prepare(NULL); + + /* Meta object */ + std::unique_ptr meta_obj = get_meta_obj(); + DB::Object meta_op_target(store->getDB(), + meta_obj->get_bucket()->get_info(), + meta_obj->get_obj()); + DB::Object::Write mp_op(&meta_op_target); + mp_op.update_mp_parts(dpp, target_obj->get_obj().key); + + obj_op.meta.owner = owner.get_id(); + obj_op.meta.flags = PUT_OBJ_CREATE; + obj_op.meta.modify_tail = true; + obj_op.meta.completeMultipart = true; + + ret = obj_op.write_meta(dpp, ofs, accounted_size, attrs); + if (ret < 0) + return ret; + + /* No need to delete Meta obj here. It is deleted from sal */ + return ret; + } + + int DBMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs) + { + if (!rule && !attrs) { + return 0; + } + + if (rule) { + if (!placement.empty()) { + *rule = &placement; + if (!attrs) { + /* Don't need attrs, done */ + return 0; + } + } else { + *rule = nullptr; + } + } + + /* We need either attributes or placement, so we need a read */ + std::unique_ptr meta_obj; + meta_obj = get_meta_obj(); + meta_obj->set_in_extra_data(true); + + multipart_upload_info upload_info; + bufferlist headbl; + + /* Read the obj head which contains the multipart_upload_info */ + std::unique_ptr read_op = meta_obj->get_read_op(obj_ctx); + int ret = read_op->prepare(y, dpp); + if (ret < 0) { + if (ret == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + return ret; + } + + if (attrs) { + /* Attrs are filled in by prepare */ + *attrs = meta_obj->get_attrs(); + if (!rule || *rule != nullptr) { + /* placement was cached; don't actually read */ + return 0; + } + } + + /* Now read the placement from the head */ + ret = read_op->read(0, store->getDB()->get_max_head_size(), headbl, y, dpp); + if (ret < 0) { + if (ret == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + return ret; + } + + if (headbl.length() <= 0) { + return -ERR_NO_SUCH_UPLOAD; + } + + /* Decode multipart_upload_info */ + auto hiter = headbl.cbegin(); + try { + decode(upload_info, hiter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: failed to decode multipart upload info" << dendl; + return -EIO; + } + placement = upload_info.dest_placement; + *rule = &placement; + + return 0; + } + + std::unique_ptr DBMultipartUpload::get_writer( + const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) + { + return std::make_unique(dpp, y, this, + std::move(_head_obj), store, owner, + obj_ctx, ptail_placement_rule, part_num, part_num_str); + } + + DBMultipartWriter::DBMultipartWriter(const DoutPrefixProvider *dpp, + optional_yield y, + MultipartUpload* upload, + std::unique_ptr _head_obj, + DBStore* _store, + const rgw_user& _owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *_ptail_placement_rule, + uint64_t _part_num, const std::string& _part_num_str): + Writer(dpp, y), + store(_store), + owner(_owner), + ptail_placement_rule(_ptail_placement_rule), + head_obj(std::move(_head_obj)), + upload_id(upload->get_upload_id()), + oid(head_obj->get_name() + "." + upload_id + + "." + std::to_string(part_num)), + meta_obj(((DBMultipartUpload*)upload)->get_meta_obj()), + op_target(_store->getDB(), meta_obj->get_bucket()->get_info(), meta_obj->get_obj()), + parent_op(&op_target), part_num(_part_num), + part_num_str(_part_num_str) { parent_op.prepare(NULL);} + + int DBMultipartWriter::prepare(optional_yield y) + { + parent_op.set_mp_part_str(upload_id + "." + std::to_string(part_num)); + // XXX: do we need to handle part_num_str?? + return 0; + } + + int DBMultipartWriter::process(bufferlist&& data, uint64_t offset) + { + /* XXX: same as AtomicWriter..consolidate code */ + total_data_size += data.length(); + + /* XXX: Optimize all bufferlist copies in this function */ + + /* copy head_data into meta. But for multipart we do not + * need to write head_data */ + uint64_t max_chunk_size = store->getDB()->get_max_chunk_size(); + int excess_size = 0; + + /* Accumulate tail_data till max_chunk_size or flush op */ + bufferlist tail_data; + + if (data.length() != 0) { + parent_op.meta.data = &head_data; /* Null data ?? */ + + /* handle tail )parts. + * First accumulate and write data into dbstore in its chunk_size + * parts + */ + if (!tail_part_size) { /* new tail part */ + tail_part_offset = offset; + } + data.begin(0).copy(data.length(), tail_data); + tail_part_size += tail_data.length(); + tail_part_data.append(tail_data); + + if (tail_part_size < max_chunk_size) { + return 0; + } else { + int write_ofs = 0; + while (tail_part_size >= max_chunk_size) { + excess_size = tail_part_size - max_chunk_size; + bufferlist tmp; + tail_part_data.begin(write_ofs).copy(max_chunk_size, tmp); + /* write tail objects data */ + int ret = parent_op.write_data(dpp, tmp, tail_part_offset); + + if (ret < 0) { + return ret; + } + + tail_part_size -= max_chunk_size; + write_ofs += max_chunk_size; + tail_part_offset += max_chunk_size; + } + /* reset tail parts or update if excess data */ + if (excess_size > 0) { /* wrote max_chunk_size data */ + tail_part_size = excess_size; + bufferlist tmp; + tail_part_data.begin(write_ofs).copy(excess_size, tmp); + tail_part_data = tmp; + } else { + tail_part_size = 0; + tail_part_data.clear(); + tail_part_offset = 0; + } + } + } else { + if (tail_part_size == 0) { + return 0; /* nothing more to write */ + } + + /* flush watever tail data is present */ + int ret = parent_op.write_data(dpp, tail_part_data, tail_part_offset); + if (ret < 0) { + return ret; + } + tail_part_size = 0; + tail_part_data.clear(); + tail_part_offset = 0; + } + + return 0; + } + + int DBMultipartWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) + { + int ret = 0; + /* XXX: same as AtomicWriter..consolidate code */ + parent_op.meta.mtime = mtime; + parent_op.meta.delete_at = delete_at; + parent_op.meta.if_match = if_match; + parent_op.meta.if_nomatch = if_nomatch; + parent_op.meta.user_data = user_data; + parent_op.meta.zones_trace = zones_trace; + + /* XXX: handle accounted size */ + accounted_size = total_data_size; + + if (ret < 0) + return ret; + + RGWUploadPartInfo info; + info.num = part_num; + info.etag = etag; + info.size = total_data_size; + info.accounted_size = accounted_size; + info.modified = real_clock::now(); + //info.manifest = manifest; + + DB::Object op_target(store->getDB(), + meta_obj->get_bucket()->get_info(), meta_obj->get_obj()); + ret = op_target.add_mp_part(dpp, info); + if (ret < 0) { + return ret == -ENOENT ? -ERR_NO_SUCH_UPLOAD : ret; + } + + return 0; + } + DBAtomicWriter::DBAtomicWriter(const DoutPrefixProvider *dpp, optional_yield y, std::unique_ptr _head_obj, diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 8007b6c48bc..bc2e952fcf1 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -19,6 +19,7 @@ #include "rgw_oidc_provider.h" #include "rgw_role.h" #include "rgw_lc.h" +#include "rgw_multi.h" #include "store/dbstore/common/dbstore.h" #include "store/dbstore/dbstore_mgr.h" @@ -304,6 +305,133 @@ protected: } }; + /* + * For multipart upload, below is the process flow - + * + * MultipartUpload::Init - create head object of meta obj (src_obj_name + "." + upload_id) + * [ Meta object stores all the parts upload info] + * MultipartWriter::process - create all data/tail objects with obj_name same as + * meta obj (so that they can all be identified & deleted + * during abort) + * MultipartUpload::Abort - Just delete meta obj .. that will indirectly delete all the + * uploads associated with that upload id / meta obj so far. + * MultipartUpload::Complete - create head object of the original object (if not exists) & + * rename all data/tail objects to orig object name and update + * metadata of the orig object. + */ + class DBMultipartPart : public MultipartPart { + protected: + RGWUploadPartInfo info; /* XXX: info contains manifest also which is not needed */ + + public: + DBMultipartPart() = default; + virtual ~DBMultipartPart() = default; + + virtual RGWUploadPartInfo& get_info() { return info; } + virtual void set_info(const RGWUploadPartInfo& _info) { info = _info; } + virtual uint32_t get_num() { return info.num; } + virtual uint64_t get_size() { return info.accounted_size; } + virtual const std::string& get_etag() { return info.etag; } + virtual ceph::real_time& get_mtime() { return info.modified; } + + }; + + class DBMPObj { + std::string oid; // object name + std::string upload_id; + std::string meta; // multipart meta object = . + public: + DBMPObj() {} + DBMPObj(const std::string& _oid, const std::string& _upload_id) { + init(_oid, _upload_id, _upload_id); + } + DBMPObj(const std::string& _oid, std::optional _upload_id) { + if (_upload_id) { + init(_oid, *_upload_id, *_upload_id); + } else { + from_meta(_oid); + } + } + void init(const std::string& _oid, const std::string& _upload_id) { + init(_oid, _upload_id, _upload_id); + } + void init(const std::string& _oid, const std::string& _upload_id, const std::string& part_unique_str) { + if (_oid.empty()) { + clear(); + return; + } + oid = _oid; + upload_id = _upload_id; + meta = oid + "." + upload_id; + } + const std::string& get_upload_id() const { + return upload_id; + } + const std::string& get_key() const { + return oid; + } + const std::string& get_meta() const { return meta; } + bool from_meta(const std::string& meta) { + int end_pos = meta.length(); + int mid_pos = meta.rfind('.', end_pos - 1); // . + if (mid_pos < 0) + return false; + oid = meta.substr(0, mid_pos); + upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1); + init(oid, upload_id, upload_id); + return true; + } + void clear() { + oid = ""; + meta = ""; + upload_id = ""; + } + }; + + class DBMultipartUpload : public MultipartUpload { + DBStore* store; + DBMPObj mp_obj; + ACLOwner owner; + ceph::real_time mtime; + rgw_placement_rule placement; + + public: + DBMultipartUpload(DBStore* _store, Bucket* _bucket, const std::string& oid, std::optional upload_id, ACLOwner _owner, ceph::real_time _mtime) : MultipartUpload(_bucket), store(_store), mp_obj(oid, upload_id), owner(_owner), mtime(_mtime) {} + virtual ~DBMultipartUpload() = default; + + virtual const std::string& get_meta() const { return mp_obj.get_meta(); } + virtual const std::string& get_key() const { return mp_obj.get_key(); } + virtual const std::string& get_upload_id() const { return mp_obj.get_upload_id(); } + virtual const ACLOwner& get_owner() const override { return owner; } + virtual ceph::real_time& get_mtime() { return mtime; } + virtual std::unique_ptr get_meta_obj() override; + virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) override; + virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct, + int num_parts, int marker, + int* next_marker, bool* truncated, + bool assume_unsorted = false) override; + virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, + RGWObjectCtx* obj_ctx) override; + virtual int complete(const DoutPrefixProvider* dpp, + optional_yield y, CephContext* cct, + std::map& part_etags, + std::list& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& ofs, + std::string& tag, ACLOwner& owner, + uint64_t olh_epoch, + rgw::sal::Object* target_obj, + RGWObjectCtx* obj_ctx) override; + virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override; + virtual std::unique_ptr get_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) override; + }; + class DBObject : public Object { private: DBStore* store; @@ -432,6 +560,15 @@ protected: int read_attrs(const DoutPrefixProvider* dpp, DB::Object::Read &read_op, optional_yield y, rgw_obj* target_obj = nullptr); }; + class MPDBSerializer : public MPSerializer { + + public: + MPDBSerializer(const DoutPrefixProvider *dpp, DBStore* store, DBObject* obj, const std::string& lock_name) {} + + virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override {return 0; } + virtual int unlock() override { return 0;} + }; + class DBAtomicWriter : public Writer { protected: rgw::sal::DBStore* store; @@ -477,6 +614,54 @@ protected: optional_yield y) override; }; + class DBMultipartWriter : public Writer { + protected: + rgw::sal::DBStore* store; + const rgw_user& owner; + const rgw_placement_rule *ptail_placement_rule; + uint64_t olh_epoch; + std::unique_ptr head_obj; + string upload_id; + string oid; /* object->name() + "." + "upload_id" + "." + part_num */ + std::unique_ptr meta_obj; + DB::Object op_target; + DB::Object::Write parent_op; + int part_num; + string part_num_str; + uint64_t total_data_size = 0; /* for total data being uploaded */ + bufferlist head_data; + bufferlist tail_part_data; + uint64_t tail_part_offset; + uint64_t tail_part_size = 0; /* corresponds to each tail part being + written to dbstore */ + +public: + DBMultipartWriter(const DoutPrefixProvider *dpp, + optional_yield y, MultipartUpload* upload, + std::unique_ptr _head_obj, + DBStore* _store, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, const std::string& part_num_str); + ~DBMultipartWriter() = default; + + // prepare to start processing object data + virtual int prepare(optional_yield y) override; + + // Process a bufferlist + virtual int process(bufferlist&& data, uint64_t offset) override; + + // complete the operation and make its result visible to clients + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) override; + }; + class DBStore : public Store { private: /* DBStoreManager is used in case multiple diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc index 55932b64918..036b95f80c8 100644 --- a/src/rgw/rgw_sal_rados.cc +++ b/src/rgw/rgw_sal_rados.cc @@ -52,24 +52,6 @@ static string mp_ns = RGW_OBJ_NS_MULTIPART; namespace rgw::sal { -struct multipart_upload_info -{ - rgw_placement_rule dest_placement; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(dest_placement, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(dest_placement, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(multipart_upload_info) - // default number of entries to list with each bucket listing call // (use marker to bridge between calls) static constexpr size_t listing_max_entries = 1000; diff --git a/src/rgw/store/dbstore/common/dbstore.cc b/src/rgw/store/dbstore/common/dbstore.cc index 1b851fa550c..6a56bc338a8 100644 --- a/src/rgw/store/dbstore/common/dbstore.cc +++ b/src/rgw/store/dbstore/common/dbstore.cc @@ -185,6 +185,8 @@ DBOp *DB::getDBOp(const DoutPrefixProvider *dpp, string Op, struct DBOpParams *p return Ob->ListBucketObjects; if (!Op.compare("PutObjectData")) return Ob->PutObjectData; + if (!Op.compare("UpdateObjectData")) + return Ob->UpdateObjectData; if (!Op.compare("GetObjectData")) return Ob->GetObjectData; if (!Op.compare("DeleteObjectData")) @@ -790,13 +792,13 @@ int DB::raw_obj::InitializeParamsfromRawObj(const DoutPrefixProvider *dpp, params->op.obj.state.obj.key.instance = obj_instance; params->op.obj.state.obj.key.ns = obj_ns; - if (multipart_partnum != 0) { + if (multipart_part_str != "0.0") { params->op.obj.is_multipart = true; } else { params->op.obj.is_multipart = false; } - params->op.obj_data.multipart_part_num = multipart_partnum; + params->op.obj_data.multipart_part_str = multipart_part_str; params->op.obj_data.part_num = part_num; return ret; @@ -894,6 +896,72 @@ out: return ret; } +int DB::Object::add_mp_part(const DoutPrefixProvider *dpp, + RGWUploadPartInfo info) { + int ret = 0; + + DBOpParams params = {}; + + store->InitializeParams(dpp, "GetObject", ¶ms); + InitializeParamsfromObject(dpp, ¶ms); + + ret = store->ProcessOp(dpp, "GetObject", ¶ms); + + if (ret) { + ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <ProcessOp(dpp, "UpdateObject", ¶ms); + + if (ret) { + ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <& info) +{ + int ret = 0; + DBOpParams params = {}; + std::map omap; + + store->InitializeParams(dpp, "GetObject", ¶ms); + InitializeParamsfromObject(dpp, ¶ms); + + ret = store->ProcessOp(dpp, "GetObject", ¶ms); + + if (ret) { + ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <get_bucket_info().bucket.name, astate->obj.key.name, - astate->obj.key.instance, astate->obj.key.ns, 0, part_num); + astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num); read_len = len; @@ -1469,9 +1537,9 @@ int DB::Object::iterate_obj(const DoutPrefixProvider *dpp, part_num = (ofs / max_chunk_size); uint64_t read_len = std::min(len, max_chunk_size); - /* XXX: Handle multipart_num */ + /* XXX: Handle multipart_str */ raw_obj read_obj(store, get_bucket_info().bucket.name, astate->obj.key.name, - astate->obj.key.instance, astate->obj.key.ns, 0, part_num); + astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num); bool reading_from_head = (ofs < head_data_size); r = cb(dpp, read_obj, ofs, read_len, reading_from_head, astate, arg); @@ -1533,14 +1601,14 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp, /* XXX: Split into parts each of max_chunk_size. But later make tail * object chunk size limit to sqlite blob limit */ int part_num = 0; - uint64_t max_chunk_size, max_head_size; - max_head_size = store->get_max_head_size(); - max_chunk_size = store->get_max_chunk_size(); + uint64_t max_chunk_size = store->get_max_chunk_size(); /* tail_obj ofs should be greater than max_head_size */ - if (ofs < max_head_size) { - return -1; + if (mp_part_str == "0.0") { // ensure not multipart meta object + if (ofs < store->get_max_head_size()) { + return -1; + } } uint64_t end = data.length(); @@ -1552,9 +1620,9 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp, part_num = (ofs / max_chunk_size); uint64_t len = std::min(end, max_chunk_size); - /* XXX: Handle multipart_num */ + /* XXX: Handle multipart_str */ raw_obj write_obj(store, target->get_bucket_info().bucket.name, obj_state.obj.key.name, - obj_state.obj.key.instance, obj_state.obj.key.ns, 0, part_num); + obj_state.obj.key.instance, obj_state.obj.key.ns, mp_part_str, part_num); ldpp_dout(dpp, 20) << "dbstore->write obj-ofs=" << ofs << " write_len=" << len << dendl; @@ -1714,6 +1782,27 @@ int DB::Object::Write::write_meta(const DoutPrefixProvider *dpp, uint64_t size, return r; } +int DB::Object::Write::update_mp_parts(const DoutPrefixProvider *dpp, rgw_obj_key new_obj_key) +{ + int ret = 0; + DBOpParams params = {}; + DB *store = target->get_store(); + + store->InitializeParams(dpp, "UpdateObjectData", ¶ms); + target->InitializeParamsfromObject(dpp, ¶ms); + + params.op.obj.new_obj_key = new_obj_key; + + ret = store->ProcessOp(dpp, "UpdateObjectData", ¶ms); + + if (ret) { + ldpp_dout(dpp, 0)<<"In UpdateObjectData failed err:(" <get_store(); diff --git a/src/rgw/store/dbstore/common/dbstore.h b/src/rgw/store/dbstore/common/dbstore.h index 18aef417cd7..01b55908c5b 100644 --- a/src/rgw/store/dbstore/common/dbstore.h +++ b/src/rgw/store/dbstore/common/dbstore.h @@ -23,6 +23,7 @@ #include "global/global_init.h" #include "common/ceph_context.h" #include "rgw/rgw_obj_manifest.h" +#include "rgw/rgw_multi.h" using namespace std; @@ -83,16 +84,21 @@ struct DBOpObjectInfo { /* Extra fields */ bool is_multipart; + std::list mp_parts; + bufferlist head_data; string min_marker; string max_marker; list list_entries; + /* Below used to update mp_parts obj name + * from meta object to src object on completion */ + rgw_obj_key new_obj_key; }; struct DBOpObjectDataInfo { RGWObjState state; uint64_t part_num; - uint64_t multipart_part_num; + string multipart_part_str; uint64_t offset; uint64_t size; bufferlist data{}; @@ -274,9 +280,15 @@ struct DBOpObjectPrepareInfo { string manifest_part_rules = ":manifest_part_rules"; string omap = ":omap"; string is_multipart = ":is_multipart"; + string mp_parts = ":mp_parts"; string head_data = ":head_data"; string min_marker = ":min_marker"; string max_marker = ":max_marker"; + /* Below used to update mp_parts obj name + * from meta object to src object on completion */ + string new_obj_name = ":new_obj_name"; + string new_obj_instance = ":new_obj_instance"; + string new_obj_ns = ":new_obj_ns"; }; struct DBOpObjectDataPrepareInfo { @@ -284,7 +296,7 @@ struct DBOpObjectDataPrepareInfo { string offset = ":offset"; string data = ":data"; string size = ":size"; - string multipart_part_num = ":multipart_part_num"; + string multipart_part_str = ":multipart_part_str"; }; struct DBOpLCEntryPrepareInfo { @@ -359,6 +371,7 @@ class ObjectOp { class UpdateObjectOp *UpdateObject; class ListBucketObjectsOp *ListBucketObjects; class PutObjectDataOp *PutObjectData; + class UpdateObjectDataOp *UpdateObjectData; class GetObjectDataOp *GetObjectData; class DeleteObjectDataOp *DeleteObjectData; @@ -546,14 +559,15 @@ class DBOp { ManifestPartRules BLOB, \ Omap BLOB, \ IsMultipart BOOL, \ + MPPartsList BLOB, \ HeadData BLOB, \ PRIMARY KEY (ObjName, ObjInstance, BucketName), \ FOREIGN KEY (BucketName) \ REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);"; const string CreateObjectDataTableQ = - /* Extra field 'MultipartPartNum' added which signifies multipart upload - * part number. For regular object, it is '0' + /* Extra field 'MultipartPartStr' added which signifies multipart + * . For regular object, it is '0.0' * * - part: a collection of stripes that make a contiguous part of an object. A regular object will only have one part (although might have @@ -566,12 +580,12 @@ class DBOp { ObjInstance TEXT, \ ObjNS TEXT, \ BucketName TEXT NOT NULL , \ + MultipartPartStr TEXT, \ PartNum INTEGER NOT NULL, \ Offset INTEGER, \ - Data BLOB, \ Size INTEGER, \ - MultipartPartNum INTEGER, \ - PRIMARY KEY (ObjName, BucketName, ObjInstance, MultipartPartNum, PartNum), \ + Data BLOB, \ + PRIMARY KEY (ObjName, BucketName, ObjInstance, MultipartPartStr, PartNum), \ FOREIGN KEY (BucketName, ObjName, ObjInstance) \ REFERENCES '{}' (BucketName, ObjName, ObjInstance) ON DELETE CASCADE ON UPDATE CASCADE \n);"; @@ -933,10 +947,10 @@ class PutObjectOp: public DBOp { ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \ Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \ TailPlacementRuleName, TailPlacementStorageClass, \ - ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData ) \ + ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData ) \ VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \ {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \ - {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})"; + {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})"; public: virtual ~PutObjectOp() {} @@ -966,7 +980,7 @@ class PutObjectOp: public DBOp { params.op.obj.tail_placement_storage_class, params.op.obj.manifest_part_objs, params.op.obj.manifest_part_rules, params.op.obj.omap, - params.op.obj.is_multipart, params.op.obj.head_data); + params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data); } }; @@ -998,7 +1012,7 @@ class GetObjectOp: public DBOp { ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \ Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \ TailPlacementRuleName, TailPlacementStorageClass, \ - ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData from '{}' \ + ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \ where BucketName = {} and ObjName = {} and ObjInstance = {}"; public: @@ -1027,7 +1041,7 @@ class ListBucketObjectsOp: public DBOp { ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \ Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \ TailPlacementRuleName, TailPlacementStorageClass, \ - ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData from '{}' \ + ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \ where BucketName = {} and ObjName > {} ORDER BY ObjName ASC LIMIT {}"; public: virtual ~ListBucketObjectsOp() {} @@ -1051,6 +1065,9 @@ class UpdateObjectOp: public DBOp { const string AttrsQuery = "UPDATE '{}' SET ObjAttrs = {}, Mtime = {} \ where BucketName = {} and ObjName = {} and ObjInstance = {}"; + const string MPQuery = + "UPDATE '{}' SET MPPartsList = {}, Mtime = {} \ + where BucketName = {} and ObjName = {} and ObjInstance = {}"; const string MetaQuery = "UPDATE '{}' SET \ ObjNS = {}, ACLs = {}, IndexVer = {}, Tag = {}, Flags = {}, VersionedEpoch = {}, \ @@ -1064,7 +1081,7 @@ class UpdateObjectOp: public DBOp { HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \ TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \ ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \ - IsMultipart = {}, HeadData = {} \ + IsMultipart = {}, MPPartsList = {}, HeadData = {} \ WHERE ObjName = {} and ObjInstance = {} and BucketName = {}"; public: @@ -1087,6 +1104,14 @@ class UpdateObjectOp: public DBOp { params.op.obj.obj_name.c_str(), params.op.obj.obj_instance.c_str()); } + if (params.op.query_str == "mp") { + return fmt::format(MPQuery.c_str(), + params.object_table.c_str(), params.op.obj.mp_parts.c_str(), + params.op.obj.mtime.c_str(), + params.op.bucket.bucket_name.c_str(), + params.op.obj.obj_name.c_str(), + params.op.obj.obj_instance.c_str()); + } if (params.op.query_str == "meta") { return fmt::format(MetaQuery.c_str(), params.object_table.c_str(), @@ -1111,7 +1136,7 @@ class UpdateObjectOp: public DBOp { params.op.obj.tail_placement_storage_class, params.op.obj.manifest_part_objs, params.op.obj.manifest_part_rules, params.op.obj.omap, - params.op.obj.is_multipart, params.op.obj.head_data, + params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data, params.op.obj.obj_name, params.op.obj.obj_instance, params.op.bucket.bucket_name); } @@ -1123,7 +1148,7 @@ class PutObjectDataOp: public DBOp { private: const string Query = "INSERT OR REPLACE INTO '{}' \ - (ObjName, ObjInstance, ObjNS, BucketName, PartNum, Offset, Data, Size, MultipartPartNum) \ + (ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data) \ VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {})"; public: @@ -1135,18 +1160,41 @@ class PutObjectDataOp: public DBOp { params.op.obj.obj_name, params.op.obj.obj_instance, params.op.obj.obj_ns, params.op.bucket.bucket_name.c_str(), + params.op.obj_data.multipart_part_str.c_str(), params.op.obj_data.part_num, - params.op.obj_data.offset.c_str(), params.op.obj_data.data.c_str(), - params.op.obj_data.size, params.op.obj_data.multipart_part_num); + params.op.obj_data.offset.c_str(), + params.op.obj_data.size, + params.op.obj_data.data.c_str()); } }; +class UpdateObjectDataOp: public DBOp { + private: + const string Query = + "UPDATE '{}' \ + SET ObjName = {}, ObjInstance = {}, ObjNS = {} \ + WHERE ObjName = {} and ObjInstance = {} and ObjNS = {} and \ + BucketName = {}"; + + public: + virtual ~UpdateObjectDataOp() {} + + string Schema(DBOpPrepareParams ¶ms) { + return fmt::format(Query.c_str(), + params.objectdata_table.c_str(), + params.op.obj.new_obj_name, params.op.obj.new_obj_instance, + params.op.obj.new_obj_ns, + params.op.obj.obj_name, params.op.obj.obj_instance, + params.op.obj.obj_ns, + params.op.bucket.bucket_name.c_str()); + } +}; class GetObjectDataOp: public DBOp { private: const string Query = "SELECT \ - ObjName, ObjInstance, ObjNS, BucketName, PartNum, Offset, Data, Size, \ - MultipartPartNum from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {}"; + ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data \ + from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {} ORDER BY MultipartPartStr, PartNum"; public: virtual ~GetObjectDataOp() {} @@ -1460,28 +1508,28 @@ class DB { const rgw_user* powner_id, map* pattrs, ceph::real_time* pmtime, RGWObjVersionTracker* pobjv); - int get_max_head_size() { return ObjHeadSize; } - int get_max_chunk_size() { return ObjChunkSize; } + uint64_t get_max_head_size() { return ObjHeadSize; } + uint64_t get_max_chunk_size() { return ObjChunkSize; } void gen_rand_obj_instance_name(rgw_obj_key *target_key); // db raw obj string is of format - - // "____" + // "____" const string raw_obj_oid = "{0}_{1}_{2}_{3}_{4}"; inline string to_oid(const string& bucket, const string& obj_name, const string& obj_instance, - uint64_t mp_num, uint64_t partnum) { - string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, mp_num, partnum); + string mp_str, uint64_t partnum) { + string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, mp_str, partnum); return s; } inline int from_oid(const string& oid, string& bucket, string& obj_name, string& obj_instance, - uint64_t& mp_num, uint64_t& partnum) { + string& mp_str, uint64_t& partnum) { vector result; boost::split(result, oid, boost::is_any_of("_")); bucket = result[0]; obj_name = result[1]; obj_instance = result[2]; - mp_num = stoi(result[3]); + mp_str = result[3]; partnum = stoi(result[4]); return 0; @@ -1494,7 +1542,7 @@ class DB { string obj_name; string obj_instance; string obj_ns; - uint64_t multipart_partnum; + string multipart_part_str; uint64_t part_num; string obj_table; @@ -1505,13 +1553,13 @@ class DB { } raw_obj(DB* _db, string& _bname, string& _obj_name, string& _obj_instance, - string& _obj_ns, int _mp_partnum, int _part_num) { + string& _obj_ns, string _mp_part_str, int _part_num) { db = _db; bucket_name = _bname; obj_name = _obj_name; obj_instance = _obj_instance; obj_ns = _obj_ns; - multipart_partnum = _mp_partnum; + multipart_part_str = _mp_part_str; part_num = _part_num; obj_table = bucket_name+".object.table"; @@ -1522,10 +1570,10 @@ class DB { int r; db = _db; - r = db->from_oid(oid, bucket_name, obj_name, obj_instance, multipart_partnum, + r = db->from_oid(oid, bucket_name, obj_name, obj_instance, multipart_part_str, part_num); if (r < 0) { - multipart_partnum = 0; + multipart_part_str = "0.0"; part_num = 0; } @@ -1659,6 +1707,7 @@ class DB { struct Write { DB::Object *target; RGWObjState obj_state; + string mp_part_str = "0.0"; // multipart num struct MetaParams { ceph::real_time *mtime; @@ -1690,6 +1739,7 @@ class DB { explicit Write(DB::Object *_target) : target(_target) {} + void set_mp_part_str(string _mp_part_str) { mp_part_str = _mp_part_str;} int prepare(const DoutPrefixProvider* dpp); int write_data(const DoutPrefixProvider* dpp, bufferlist& data, uint64_t ofs); @@ -1699,6 +1749,11 @@ class DB { bool assume_noent, bool modify_tail); int write_meta(const DoutPrefixProvider *dpp, uint64_t size, uint64_t accounted_size, map& attrs); + /* Below are used to update mp data rows object name + * from meta to src object name on multipart upload + * completion + */ + int update_mp_parts(const DoutPrefixProvider *dpp, rgw_obj_key new_obj_key); }; struct Delete { @@ -1764,6 +1819,8 @@ class DB { std::map *m, bool* pmore); using iterate_obj_cb = int (*)(const DoutPrefixProvider*, const raw_obj&, off_t, off_t, bool, RGWObjState*, void*); + int add_mp_part(const DoutPrefixProvider *dpp, RGWUploadPartInfo info); + int get_mp_parts_list(const DoutPrefixProvider *dpp, std::list& info); int iterate_obj(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, diff --git a/src/rgw/store/dbstore/sqlite/sqliteDB.cc b/src/rgw/store/dbstore/sqlite/sqliteDB.cc index dc5463f5b09..417a3f78a9b 100644 --- a/src/rgw/store/dbstore/sqlite/sqliteDB.cc +++ b/src/rgw/store/dbstore/sqlite/sqliteDB.cc @@ -259,6 +259,7 @@ enum GetObject { ManifestPartRules, Omap, IsMultipart, + MPPartsList, HeadData }; @@ -267,11 +268,11 @@ enum GetObjectData { ObjDataInstance, ObjDataNS, ObjDataBucketName, + MultipartPartStr, PartNum, Offset, - ObjData, ObjDataSize, - MultipartPartNum + ObjData }; enum GetLCEntry { @@ -444,6 +445,7 @@ static int list_object(const DoutPrefixProvider *dpp, DBOpInfo &op, sqlite3_stmt SQL_DECODE_BLOB_PARAM(dpp, stmt, ManifestPartRules, op.obj.rules, sdb); SQL_DECODE_BLOB_PARAM(dpp, stmt, Omap, op.obj.omap, sdb); op.obj.is_multipart = sqlite3_column_int(stmt, IsMultipart); + SQL_DECODE_BLOB_PARAM(dpp, stmt, MPPartsList, op.obj.mp_parts, sdb); SQL_DECODE_BLOB_PARAM(dpp, stmt, HeadData, op.obj.head_data, sdb); op.obj.state.data = op.obj.head_data; @@ -481,7 +483,7 @@ static int get_objectdata(const DoutPrefixProvider *dpp, DBOpInfo &op, sqlite3_s op.obj_data.part_num = sqlite3_column_int(stmt, PartNum); op.obj_data.offset = sqlite3_column_int(stmt, Offset); op.obj_data.size = sqlite3_column_int(stmt, ObjDataSize); - op.obj_data.multipart_part_num = sqlite3_column_int(stmt, MultipartPartNum); + op.obj_data.multipart_part_str = (const char*)sqlite3_column_text(stmt, MultipartPartStr); SQL_DECODE_BLOB_PARAM(dpp, stmt, ObjData, op.obj_data.data, sdb); return 0; @@ -977,6 +979,7 @@ int SQLObjectOp::InitializeObjectOps(string db_name, const DoutPrefixProvider *d UpdateObject = new SQLUpdateObject(sdb, db_name, cct); ListBucketObjects = new SQLListBucketObjects(sdb, db_name, cct); PutObjectData = new SQLPutObjectData(sdb, db_name, cct); + UpdateObjectData = new SQLUpdateObjectData(sdb, db_name, cct); GetObjectData = new SQLGetObjectData(sdb, db_name, cct); DeleteObjectData = new SQLDeleteObjectData(sdb, db_name, cct); @@ -990,6 +993,7 @@ int SQLObjectOp::FreeObjectOps(const DoutPrefixProvider *dpp) delete GetObject; delete UpdateObject; delete PutObjectData; + delete UpdateObjectData; delete GetObjectData; delete DeleteObjectData; @@ -1835,6 +1839,9 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params) SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_multipart.c_str(), sdb); SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_multipart, sdb); + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.mp_parts.c_str(), sdb); + SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.mp_parts, sdb); + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.head_data.c_str(), sdb); SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.head_data, sdb); @@ -1980,6 +1987,8 @@ int SQLUpdateObject::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *p SQL_PREPARE(dpp, p_params, sdb, attrs_stmt, ret, "PrepareUpdateObject"); } else if (params->op.query_str == "meta") { SQL_PREPARE(dpp, p_params, sdb, meta_stmt, ret, "PrepareUpdateObject"); + } else if (params->op.query_str == "mp") { + SQL_PREPARE(dpp, p_params, sdb, mp_stmt, ret, "PrepareUpdateObject"); } else { ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" << params->op.query_str << dendl; @@ -2004,6 +2013,8 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para stmt = &attrs_stmt; } else if (params->op.query_str == "meta") { stmt = &meta_stmt; + } else if (params->op.query_str == "mp") { + stmt = &mp_stmt; } else { ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" << params->op.query_str << dendl; @@ -2030,6 +2041,10 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.obj_attrs.c_str(), sdb); SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.state.attrset, sdb); } + if (params->op.query_str == "mp") { + SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.mp_parts.c_str(), sdb); + SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.mp_parts, sdb); + } if (params->op.query_str == "meta") { SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.obj_ns.c_str(), sdb); SQL_BIND_TEXT(dpp, *stmt, index, params->op.obj.state.obj.key.ns.c_str(), sdb); @@ -2157,6 +2172,9 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_multipart.c_str(), sdb); SQL_BIND_INT(dpp, *stmt, index, params->op.obj.is_multipart, sdb); + SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.mp_parts.c_str(), sdb); + SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.mp_parts, sdb); + SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.head_data.c_str(), sdb); SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.head_data, sdb); } @@ -2176,6 +2194,8 @@ int SQLUpdateObject::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *p stmt = &attrs_stmt; } else if (params->op.query_str == "meta") { stmt = &meta_stmt; + } else if (params->op.query_str == "mp") { + stmt = &mp_stmt; } else { ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" << params->op.query_str << dendl; @@ -2312,9 +2332,9 @@ int SQLPutObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *par SQL_BIND_INT(dpp, stmt, index, params->op.obj_data.size, sdb); - SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj_data.multipart_part_num.c_str(), sdb); + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj_data.multipart_part_str.c_str(), sdb); - SQL_BIND_INT(dpp, stmt, index, params->op.obj_data.multipart_part_num, sdb); + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj_data.multipart_part_str.c_str(), sdb); out: return rc; @@ -2329,6 +2349,82 @@ out: return ret; } +int SQLUpdateObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params) +{ + int ret = -1; + struct DBOpPrepareParams p_params = PrepareParams; + struct DBOpParams copy = *params; + string bucket_name = params->op.bucket.info.bucket.name; + + if (!*sdb) { + ldpp_dout(dpp, 0)<<"In SQLUpdateObjectData - no db" << dendl; + goto out; + } + + if (p_params.object_table.empty()) { + p_params.object_table = getObjectTable(bucket_name); + } + if (p_params.objectdata_table.empty()) { + p_params.objectdata_table = getObjectDataTable(bucket_name); + } + params->bucket_table = p_params.bucket_table; + params->object_table = p_params.object_table; + params->objectdata_table = p_params.objectdata_table; + (void)createObjectDataTable(dpp, params); + + SQL_PREPARE(dpp, p_params, sdb, stmt, ret, "PrepareUpdateObjectData"); + +out: + return ret; +} + +int SQLUpdateObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params) +{ + int index = -1; + int rc = 0; + struct DBOpPrepareParams p_params = PrepareParams; + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name.c_str(), sdb); + + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_instance.c_str(), sdb); + + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.instance.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_ns.c_str(), sdb); + + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.ns.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name.c_str(), sdb); + + SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_name.c_str(), sdb); + + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.name.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_instance.c_str(), sdb); + + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.instance.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_ns.c_str(), sdb); + + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.ns.c_str(), sdb); + +out: + return rc; +} + +int SQLUpdateObjectData::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *params) +{ + int ret = -1; + + SQL_EXECUTE(dpp, params, stmt, NULL); +out: + return ret; +} + int SQLGetObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params) { int ret = -1; diff --git a/src/rgw/store/dbstore/sqlite/sqliteDB.h b/src/rgw/store/dbstore/sqlite/sqliteDB.h index 55e3dc33aa3..3636cebade6 100644 --- a/src/rgw/store/dbstore/sqlite/sqliteDB.h +++ b/src/rgw/store/dbstore/sqlite/sqliteDB.h @@ -286,6 +286,7 @@ class SQLUpdateObject : public SQLiteDB, public UpdateObjectOp { sqlite3_stmt *omap_stmt = NULL; // Prepared statement sqlite3_stmt *attrs_stmt = NULL; // Prepared statement sqlite3_stmt *meta_stmt = NULL; // Prepared statement + sqlite3_stmt *mp_stmt = NULL; // Prepared statement public: SQLUpdateObject(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {} @@ -341,6 +342,24 @@ class SQLPutObjectData : public SQLiteDB, public PutObjectDataOp { int Bind(const DoutPrefixProvider *dpp, DBOpParams *params); }; +class SQLUpdateObjectData : public SQLiteDB, public UpdateObjectDataOp { + private: + sqlite3 **sdb = NULL; + sqlite3_stmt *stmt = NULL; // Prepared statement + + public: + SQLUpdateObjectData(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {} + SQLUpdateObjectData(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {} + + ~SQLUpdateObjectData() { + if (stmt) + sqlite3_finalize(stmt); + } + int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params); + int Execute(const DoutPrefixProvider *dpp, DBOpParams *params); + int Bind(const DoutPrefixProvider *dpp, DBOpParams *params); +}; + class SQLGetObjectData : public SQLiteDB, public GetObjectDataOp { private: sqlite3 **sdb = NULL; diff --git a/src/rgw/store/dbstore/tests/dbstore_tests.cc b/src/rgw/store/dbstore/tests/dbstore_tests.cc index fcdd6a079a6..4591abdc5d7 100644 --- a/src/rgw/store/dbstore/tests/dbstore_tests.cc +++ b/src/rgw/store/dbstore/tests/dbstore_tests.cc @@ -609,11 +609,16 @@ TEST_F(DBStoreTest, PutObject) { ret = db->ProcessOp(dpp, "PutObject", ¶ms); ASSERT_EQ(ret, 0); - /* Insert another object */ + /* Insert another objects */ params.op.obj.state.obj.key.name = "object2"; params.op.obj.state.obj.key.instance = "inst2"; ret = db->ProcessOp(dpp, "PutObject", ¶ms); ASSERT_EQ(ret, 0); + + params.op.obj.state.obj.key.name = "object3"; + params.op.obj.state.obj.key.instance = "inst3"; + ret = db->ProcessOp(dpp, "PutObject", ¶ms); + ASSERT_EQ(ret, 0); } TEST_F(DBStoreTest, ListAllObjects) { @@ -957,7 +962,7 @@ TEST_F(DBStoreTest, PutObjectData) { params.op.obj_data.part_num = 1; params.op.obj_data.offset = 10; - params.op.obj_data.multipart_part_num = 2; + params.op.obj_data.multipart_part_str = "2"; bufferlist b1; encode("HELLO WORLD", b1); params.op.obj_data.data = b1; @@ -966,15 +971,29 @@ TEST_F(DBStoreTest, PutObjectData) { ASSERT_EQ(ret, 0); } +TEST_F(DBStoreTest, UpdateObjectData) { + struct DBOpParams params = GlobalParams; + int ret = -1; + + params.op.obj.new_obj_key.name = "object3"; + params.op.obj.new_obj_key.instance = "inst3"; + ret = db->ProcessOp(dpp, "UpdateObjectData", ¶ms); + ASSERT_EQ(ret, 0); +} + TEST_F(DBStoreTest, GetObjectData) { struct DBOpParams params = GlobalParams; int ret = -1; + params.op.obj.state.obj.key.instance = "inst3"; + params.op.obj.state.obj.key.name = "object3"; ret = db->ProcessOp(dpp, "GetObjectData", ¶ms); ASSERT_EQ(ret, 0); ASSERT_EQ(params.op.obj_data.part_num, 1); ASSERT_EQ(params.op.obj_data.offset, 10); - ASSERT_EQ(params.op.obj_data.multipart_part_num, 2); + ASSERT_EQ(params.op.obj_data.multipart_part_str, "2"); + ASSERT_EQ(params.op.obj.state.obj.key.instance, "inst3"); + ASSERT_EQ(params.op.obj.state.obj.key.name, "object3"); string data; decode(data, params.op.obj_data.data); ASSERT_EQ(data, "HELLO WORLD");