return out << o.bucket.name << ":" << o.get_oid();
}
+struct multipart_upload_info
+{
+ rgw_placement_rule dest_placement;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(dest_placement, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(dest_placement, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(multipart_upload_info)
+
static inline void buf_to_hex(const unsigned char* const buf,
const size_t len,
char* const str)
const std::string& oid,
std::optional<std::string> upload_id,
ACLOwner owner, ceph::real_time mtime) {
- return nullptr;
+ return std::make_unique<DBMultipartUpload>(this->store, this, oid, upload_id,
+ std::move(owner), mtime);
}
int DBBucket::list_multiparts(const DoutPrefixProvider *dpp,
MPSerializer* DBObject::get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name)
{
- return nullptr;
+ return new MPDBSerializer(dpp, store, this, lock_name);
}
int DBObject::transition(RGWObjectCtx& rctx,
return 0;
}
+ int DBMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
+ RGWObjectCtx *obj_ctx)
+ {
+ std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
+ meta_obj->set_in_extra_data(true);
+ meta_obj->set_hash_source(mp_obj.get_key());
+ int ret;
+
+ std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op(obj_ctx);
+ del_op->params.bucket_owner = bucket->get_acl_owner();
+ del_op->params.versioning_status = 0;
+
+ // Since the data objects are associated with meta obj till
+ // MultipartUpload::Complete() is done, removing the metadata obj
+ // should remove all the uploads so far.
+ ret = del_op->delete_obj(dpp, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
+ ret << dendl;
+ }
+ return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+ }
+
+ static string mp_ns = RGW_OBJ_NS_MULTIPART;
+
+ std::unique_ptr<rgw::sal::Object> DBMultipartUpload::get_meta_obj()
+ {
+ return bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns));
+ }
+
+ int DBMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs)
+ {
+ int ret;
+ std::string oid = mp_obj.get_key();
+
+ char buf[33];
+ std::unique_ptr<rgw::sal::Object> obj; // create meta obj
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
+ upload_id.append(buf);
+
+ mp_obj.init(oid, upload_id);
+ obj = get_meta_obj();
+
+ DB::Object op_target(store->getDB(), obj->get_bucket()->get_info(),
+ obj->get_obj());
+ DB::Object::Write obj_op(&op_target);
+
+ obj_op.meta.owner = owner.get_id();
+ obj_op.meta.category = RGWObjCategory::MultiMeta;
+ obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
+ obj_op.meta.mtime = &mtime;
+
+ multipart_upload_info upload_info;
+ upload_info.dest_placement = dest_placement;
+
+ bufferlist bl;
+ encode(upload_info, bl);
+ obj_op.meta.data = &bl;
+ ret = obj_op.prepare(dpp);
+ if (ret < 0)
+ return ret;
+ ret = obj_op.write_meta(dpp, bl.length(), bl.length(), attrs);
+
+ return ret;
+ }
+
+ int DBMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
+ int num_parts, int marker,
+ int *next_marker, bool *truncated,
+ bool assume_unsorted)
+ {
+ std::list<RGWUploadPartInfo> parts_map;
+
+ std::unique_ptr<rgw::sal::Object> obj = get_meta_obj();
+
+ parts.clear();
+ int ret;
+
+ DB::Object op_target(store->getDB(),
+ obj->get_bucket()->get_info(), obj->get_obj());
+ ret = op_target.get_mp_parts_list(dpp, parts_map);
+ if (ret < 0) {
+ return ret;
+ }
+
+ int last_num = 0;
+
+ while (!parts_map.empty()) {
+ std::unique_ptr<DBMultipartPart> part = std::make_unique<DBMultipartPart>();
+ RGWUploadPartInfo &pinfo = parts_map.front();
+ part->set_info(pinfo);
+ if ((int)pinfo.num > marker) {
+ last_num = pinfo.num;
+ parts[pinfo.num] = std::move(part);
+ }
+ parts_map.pop_front();
+ }
+
+ /* rebuild a map with only num_parts entries */
+ std::map<uint32_t, std::unique_ptr<MultipartPart>> new_parts;
+ std::map<uint32_t, std::unique_ptr<MultipartPart>>::iterator piter;
+ int i;
+ for (i = 0, piter = parts.begin();
+ i < num_parts && piter != parts.end();
+ ++i, ++piter) {
+ last_num = piter->first;
+ new_parts[piter->first] = std::move(piter->second);
+ }
+
+ if (truncated) {
+ *truncated = (piter != parts.end());
+ }
+
+ parts.swap(new_parts);
+
+ if (next_marker) {
+ *next_marker = last_num;
+ }
+
+ return 0;
+ }
+
+ int DBMultipartUpload::complete(const DoutPrefixProvider *dpp,
+ optional_yield y, CephContext* cct,
+ map<int, string>& part_etags,
+ list<rgw_obj_index_key>& remove_objs,
+ uint64_t& accounted_size, bool& compressed,
+ RGWCompressionInfo& cs_info, off_t& ofs,
+ std::string& tag, ACLOwner& owner,
+ uint64_t olh_epoch,
+ rgw::sal::Object* target_obj,
+ RGWObjectCtx* obj_ctx)
+ {
+ char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
+ std::string etag;
+ bufferlist etag_bl;
+ MD5 hash;
+ bool truncated;
+ int ret;
+
+ int total_parts = 0;
+ int handled_parts = 0;
+ int max_parts = 1000;
+ int marker = 0;
+ uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
+ auto etags_iter = part_etags.begin();
+ rgw::sal::Attrs attrs = target_obj->get_attrs();
+
+ ofs = 0;
+ accounted_size = 0;
+ do {
+ ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
+ if (ret == -ENOENT) {
+ ret = -ERR_NO_SUCH_UPLOAD;
+ }
+ if (ret < 0)
+ return ret;
+
+ total_parts += parts.size();
+ if (!truncated && total_parts != (int)part_etags.size()) {
+ ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts
+ << " expected: " << part_etags.size() << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ }
+
+ for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) {
+ DBMultipartPart* part = dynamic_cast<rgw::sal::DBMultipartPart*>(obj_iter->second.get());
+ uint64_t part_size = part->get_size();
+ if (handled_parts < (int)part_etags.size() - 1 &&
+ part_size < min_part_size) {
+ ret = -ERR_TOO_SMALL;
+ return ret;
+ }
+
+ char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ if (etags_iter->first != (int)obj_iter->first) {
+ ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: "
+ << etags_iter->first << " next uploaded: "
+ << obj_iter->first << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ }
+ string part_etag = rgw_string_unquote(etags_iter->second);
+ if (part_etag.compare(part->get_etag()) != 0) {
+ ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first
+ << " etag: " << etags_iter->second << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ }
+
+ hex_to_buf(part->get_etag().c_str(), petag,
+ CEPH_CRYPTO_MD5_DIGESTSIZE);
+ hash.Update((const unsigned char *)petag, sizeof(petag));
+
+ RGWUploadPartInfo& obj_part = part->get_info();
+
+ ofs += obj_part.size;
+ accounted_size += obj_part.accounted_size;
+ }
+ } while (truncated);
+ hash.Final((unsigned char *)final_etag);
+
+ buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
+ snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
+ sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
+ "-%lld", (long long)part_etags.size());
+ etag = final_etag_str;
+ ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl;
+
+ etag_bl.append(etag);
+
+ attrs[RGW_ATTR_ETAG] = etag_bl;
+
+ /* XXX: handle compression ? */
+
+ /* Rename all the object data entries with original object name (i.e
+ * from 'head_obj.name + "." + upload_id' to head_obj.name) */
+
+ /* Original head object */
+ DB::Object op_target(store->getDB(),
+ target_obj->get_bucket()->get_info(),
+ target_obj->get_obj());
+ DB::Object::Write obj_op(&op_target);
+ obj_op.prepare(NULL);
+
+ /* Meta object */
+ std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
+ DB::Object meta_op_target(store->getDB(),
+ meta_obj->get_bucket()->get_info(),
+ meta_obj->get_obj());
+ DB::Object::Write mp_op(&meta_op_target);
+ mp_op.update_mp_parts(dpp, target_obj->get_obj().key);
+
+ obj_op.meta.owner = owner.get_id();
+ obj_op.meta.flags = PUT_OBJ_CREATE;
+ obj_op.meta.modify_tail = true;
+ obj_op.meta.completeMultipart = true;
+
+ ret = obj_op.write_meta(dpp, ofs, accounted_size, attrs);
+ if (ret < 0)
+ return ret;
+
+ /* No need to delete Meta obj here. It is deleted from sal */
+ return ret;
+ }
+
+ int DBMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
+ {
+ if (!rule && !attrs) {
+ return 0;
+ }
+
+ if (rule) {
+ if (!placement.empty()) {
+ *rule = &placement;
+ if (!attrs) {
+ /* Don't need attrs, done */
+ return 0;
+ }
+ } else {
+ *rule = nullptr;
+ }
+ }
+
+ /* We need either attributes or placement, so we need a read */
+ std::unique_ptr<rgw::sal::Object> meta_obj;
+ meta_obj = get_meta_obj();
+ meta_obj->set_in_extra_data(true);
+
+ multipart_upload_info upload_info;
+ bufferlist headbl;
+
+ /* Read the obj head which contains the multipart_upload_info */
+ std::unique_ptr<rgw::sal::Object::ReadOp> read_op = meta_obj->get_read_op(obj_ctx);
+ int ret = read_op->prepare(y, dpp);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ return ret;
+ }
+
+ if (attrs) {
+ /* Attrs are filled in by prepare */
+ *attrs = meta_obj->get_attrs();
+ if (!rule || *rule != nullptr) {
+ /* placement was cached; don't actually read */
+ return 0;
+ }
+ }
+
+ /* Now read the placement from the head */
+ ret = read_op->read(0, store->getDB()->get_max_head_size(), headbl, y, dpp);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ return ret;
+ }
+
+ if (headbl.length() <= 0) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+
+ /* Decode multipart_upload_info */
+ auto hiter = headbl.cbegin();
+ try {
+ decode(upload_info, hiter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode multipart upload info" << dendl;
+ return -EIO;
+ }
+ placement = upload_info.dest_placement;
+ *rule = &placement;
+
+ return 0;
+ }
+
+ std::unique_ptr<Writer> DBMultipartUpload::get_writer(
+ const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t part_num,
+ const std::string& part_num_str)
+ {
+ return std::make_unique<DBMultipartWriter>(dpp, y, this,
+ std::move(_head_obj), store, owner,
+ obj_ctx, ptail_placement_rule, part_num, part_num_str);
+ }
+
+ DBMultipartWriter::DBMultipartWriter(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ MultipartUpload* upload,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ DBStore* _store,
+ const rgw_user& _owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *_ptail_placement_rule,
+ uint64_t _part_num, const std::string& _part_num_str):
+ Writer(dpp, y),
+ store(_store),
+ owner(_owner),
+ ptail_placement_rule(_ptail_placement_rule),
+ head_obj(std::move(_head_obj)),
+ upload_id(upload->get_upload_id()),
+ oid(head_obj->get_name() + "." + upload_id +
+ "." + std::to_string(part_num)),
+ meta_obj(((DBMultipartUpload*)upload)->get_meta_obj()),
+ op_target(_store->getDB(), meta_obj->get_bucket()->get_info(), meta_obj->get_obj()),
+ parent_op(&op_target), part_num(_part_num),
+ part_num_str(_part_num_str) { parent_op.prepare(NULL);}
+
+ int DBMultipartWriter::prepare(optional_yield y)
+ {
+ parent_op.set_mp_part_str(upload_id + "." + std::to_string(part_num));
+ // XXX: do we need to handle part_num_str??
+ return 0;
+ }
+
+ int DBMultipartWriter::process(bufferlist&& data, uint64_t offset)
+ {
+ /* XXX: same as AtomicWriter..consolidate code */
+ total_data_size += data.length();
+
+ /* XXX: Optimize all bufferlist copies in this function */
+
+ /* copy head_data into meta. But for multipart we do not
+ * need to write head_data */
+ uint64_t max_chunk_size = store->getDB()->get_max_chunk_size();
+ int excess_size = 0;
+
+ /* Accumulate tail_data till max_chunk_size or flush op */
+ bufferlist tail_data;
+
+ if (data.length() != 0) {
+ parent_op.meta.data = &head_data; /* Null data ?? */
+
+ /* handle tail )parts.
+ * First accumulate and write data into dbstore in its chunk_size
+ * parts
+ */
+ if (!tail_part_size) { /* new tail part */
+ tail_part_offset = offset;
+ }
+ data.begin(0).copy(data.length(), tail_data);
+ tail_part_size += tail_data.length();
+ tail_part_data.append(tail_data);
+
+ if (tail_part_size < max_chunk_size) {
+ return 0;
+ } else {
+ int write_ofs = 0;
+ while (tail_part_size >= max_chunk_size) {
+ excess_size = tail_part_size - max_chunk_size;
+ bufferlist tmp;
+ tail_part_data.begin(write_ofs).copy(max_chunk_size, tmp);
+ /* write tail objects data */
+ int ret = parent_op.write_data(dpp, tmp, tail_part_offset);
+
+ if (ret < 0) {
+ return ret;
+ }
+
+ tail_part_size -= max_chunk_size;
+ write_ofs += max_chunk_size;
+ tail_part_offset += max_chunk_size;
+ }
+ /* reset tail parts or update if excess data */
+ if (excess_size > 0) { /* wrote max_chunk_size data */
+ tail_part_size = excess_size;
+ bufferlist tmp;
+ tail_part_data.begin(write_ofs).copy(excess_size, tmp);
+ tail_part_data = tmp;
+ } else {
+ tail_part_size = 0;
+ tail_part_data.clear();
+ tail_part_offset = 0;
+ }
+ }
+ } else {
+ if (tail_part_size == 0) {
+ return 0; /* nothing more to write */
+ }
+
+ /* flush watever tail data is present */
+ int ret = parent_op.write_data(dpp, tail_part_data, tail_part_offset);
+ if (ret < 0) {
+ return ret;
+ }
+ tail_part_size = 0;
+ tail_part_data.clear();
+ tail_part_offset = 0;
+ }
+
+ return 0;
+ }
+
+ int DBMultipartWriter::complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y)
+ {
+ int ret = 0;
+ /* XXX: same as AtomicWriter..consolidate code */
+ parent_op.meta.mtime = mtime;
+ parent_op.meta.delete_at = delete_at;
+ parent_op.meta.if_match = if_match;
+ parent_op.meta.if_nomatch = if_nomatch;
+ parent_op.meta.user_data = user_data;
+ parent_op.meta.zones_trace = zones_trace;
+
+ /* XXX: handle accounted size */
+ accounted_size = total_data_size;
+
+ if (ret < 0)
+ return ret;
+
+ RGWUploadPartInfo info;
+ info.num = part_num;
+ info.etag = etag;
+ info.size = total_data_size;
+ info.accounted_size = accounted_size;
+ info.modified = real_clock::now();
+ //info.manifest = manifest;
+
+ DB::Object op_target(store->getDB(),
+ meta_obj->get_bucket()->get_info(), meta_obj->get_obj());
+ ret = op_target.add_mp_part(dpp, info);
+ if (ret < 0) {
+ return ret == -ENOENT ? -ERR_NO_SUCH_UPLOAD : ret;
+ }
+
+ return 0;
+ }
+
DBAtomicWriter::DBAtomicWriter(const DoutPrefixProvider *dpp,
optional_yield y,
std::unique_ptr<rgw::sal::Object> _head_obj,
#include "rgw_oidc_provider.h"
#include "rgw_role.h"
#include "rgw_lc.h"
+#include "rgw_multi.h"
#include "store/dbstore/common/dbstore.h"
#include "store/dbstore/dbstore_mgr.h"
}
};
+ /*
+ * For multipart upload, below is the process flow -
+ *
+ * MultipartUpload::Init - create head object of meta obj (src_obj_name + "." + upload_id)
+ * [ Meta object stores all the parts upload info]
+ * MultipartWriter::process - create all data/tail objects with obj_name same as
+ * meta obj (so that they can all be identified & deleted
+ * during abort)
+ * MultipartUpload::Abort - Just delete meta obj .. that will indirectly delete all the
+ * uploads associated with that upload id / meta obj so far.
+ * MultipartUpload::Complete - create head object of the original object (if not exists) &
+ * rename all data/tail objects to orig object name and update
+ * metadata of the orig object.
+ */
+ class DBMultipartPart : public MultipartPart {
+ protected:
+ RGWUploadPartInfo info; /* XXX: info contains manifest also which is not needed */
+
+ public:
+ DBMultipartPart() = default;
+ virtual ~DBMultipartPart() = default;
+
+ virtual RGWUploadPartInfo& get_info() { return info; }
+ virtual void set_info(const RGWUploadPartInfo& _info) { info = _info; }
+ virtual uint32_t get_num() { return info.num; }
+ virtual uint64_t get_size() { return info.accounted_size; }
+ virtual const std::string& get_etag() { return info.etag; }
+ virtual ceph::real_time& get_mtime() { return info.modified; }
+
+ };
+
+ class DBMPObj {
+ std::string oid; // object name
+ std::string upload_id;
+ std::string meta; // multipart meta object = <oid>.<upload_id>
+ public:
+ DBMPObj() {}
+ DBMPObj(const std::string& _oid, const std::string& _upload_id) {
+ init(_oid, _upload_id, _upload_id);
+ }
+ DBMPObj(const std::string& _oid, std::optional<std::string> _upload_id) {
+ if (_upload_id) {
+ init(_oid, *_upload_id, *_upload_id);
+ } else {
+ from_meta(_oid);
+ }
+ }
+ void init(const std::string& _oid, const std::string& _upload_id) {
+ init(_oid, _upload_id, _upload_id);
+ }
+ void init(const std::string& _oid, const std::string& _upload_id, const std::string& part_unique_str) {
+ if (_oid.empty()) {
+ clear();
+ return;
+ }
+ oid = _oid;
+ upload_id = _upload_id;
+ meta = oid + "." + upload_id;
+ }
+ const std::string& get_upload_id() const {
+ return upload_id;
+ }
+ const std::string& get_key() const {
+ return oid;
+ }
+ const std::string& get_meta() const { return meta; }
+ bool from_meta(const std::string& meta) {
+ int end_pos = meta.length();
+ int mid_pos = meta.rfind('.', end_pos - 1); // <key>.<upload_id>
+ if (mid_pos < 0)
+ return false;
+ oid = meta.substr(0, mid_pos);
+ upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
+ init(oid, upload_id, upload_id);
+ return true;
+ }
+ void clear() {
+ oid = "";
+ meta = "";
+ upload_id = "";
+ }
+ };
+
+ class DBMultipartUpload : public MultipartUpload {
+ DBStore* store;
+ DBMPObj mp_obj;
+ ACLOwner owner;
+ ceph::real_time mtime;
+ rgw_placement_rule placement;
+
+ public:
+ DBMultipartUpload(DBStore* _store, Bucket* _bucket, const std::string& oid, std::optional<std::string> upload_id, ACLOwner _owner, ceph::real_time _mtime) : MultipartUpload(_bucket), store(_store), mp_obj(oid, upload_id), owner(_owner), mtime(_mtime) {}
+ virtual ~DBMultipartUpload() = default;
+
+ virtual const std::string& get_meta() const { return mp_obj.get_meta(); }
+ virtual const std::string& get_key() const { return mp_obj.get_key(); }
+ virtual const std::string& get_upload_id() const { return mp_obj.get_upload_id(); }
+ virtual const ACLOwner& get_owner() const override { return owner; }
+ virtual ceph::real_time& get_mtime() { return mtime; }
+ virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() override;
+ virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) override;
+ virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct,
+ int num_parts, int marker,
+ int* next_marker, bool* truncated,
+ bool assume_unsorted = false) override;
+ virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
+ RGWObjectCtx* obj_ctx) override;
+ virtual int complete(const DoutPrefixProvider* dpp,
+ optional_yield y, CephContext* cct,
+ std::map<int, std::string>& part_etags,
+ std::list<rgw_obj_index_key>& remove_objs,
+ uint64_t& accounted_size, bool& compressed,
+ RGWCompressionInfo& cs_info, off_t& ofs,
+ std::string& tag, ACLOwner& owner,
+ uint64_t olh_epoch,
+ rgw::sal::Object* target_obj,
+ RGWObjectCtx* obj_ctx) override;
+ virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
+ virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t part_num,
+ const std::string& part_num_str) override;
+ };
+
class DBObject : public Object {
private:
DBStore* store;
int read_attrs(const DoutPrefixProvider* dpp, DB::Object::Read &read_op, optional_yield y, rgw_obj* target_obj = nullptr);
};
+ class MPDBSerializer : public MPSerializer {
+
+ public:
+ MPDBSerializer(const DoutPrefixProvider *dpp, DBStore* store, DBObject* obj, const std::string& lock_name) {}
+
+ virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override {return 0; }
+ virtual int unlock() override { return 0;}
+ };
+
class DBAtomicWriter : public Writer {
protected:
rgw::sal::DBStore* store;
optional_yield y) override;
};
+ class DBMultipartWriter : public Writer {
+ protected:
+ rgw::sal::DBStore* store;
+ const rgw_user& owner;
+ const rgw_placement_rule *ptail_placement_rule;
+ uint64_t olh_epoch;
+ std::unique_ptr<rgw::sal::Object> head_obj;
+ string upload_id;
+ string oid; /* object->name() + "." + "upload_id" + "." + part_num */
+ std::unique_ptr<rgw::sal::Object> meta_obj;
+ DB::Object op_target;
+ DB::Object::Write parent_op;
+ int part_num;
+ string part_num_str;
+ uint64_t total_data_size = 0; /* for total data being uploaded */
+ bufferlist head_data;
+ bufferlist tail_part_data;
+ uint64_t tail_part_offset;
+ uint64_t tail_part_size = 0; /* corresponds to each tail part being
+ written to dbstore */
+
+public:
+ DBMultipartWriter(const DoutPrefixProvider *dpp,
+ optional_yield y, MultipartUpload* upload,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ DBStore* _store,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t part_num, const std::string& part_num_str);
+ ~DBMultipartWriter() = default;
+
+ // prepare to start processing object data
+ virtual int prepare(optional_yield y) override;
+
+ // Process a bufferlist
+ virtual int process(bufferlist&& data, uint64_t offset) override;
+
+ // complete the operation and make its result visible to clients
+ virtual int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y) override;
+ };
+
class DBStore : public Store {
private:
/* DBStoreManager is used in case multiple
namespace rgw::sal {
-struct multipart_upload_info
-{
- rgw_placement_rule dest_placement;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(dest_placement, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(dest_placement, bl);
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(multipart_upload_info)
-
// default number of entries to list with each bucket listing call
// (use marker to bridge between calls)
static constexpr size_t listing_max_entries = 1000;
return Ob->ListBucketObjects;
if (!Op.compare("PutObjectData"))
return Ob->PutObjectData;
+ if (!Op.compare("UpdateObjectData"))
+ return Ob->UpdateObjectData;
if (!Op.compare("GetObjectData"))
return Ob->GetObjectData;
if (!Op.compare("DeleteObjectData"))
params->op.obj.state.obj.key.instance = obj_instance;
params->op.obj.state.obj.key.ns = obj_ns;
- if (multipart_partnum != 0) {
+ if (multipart_part_str != "0.0") {
params->op.obj.is_multipart = true;
} else {
params->op.obj.is_multipart = false;
}
- params->op.obj_data.multipart_part_num = multipart_partnum;
+ params->op.obj_data.multipart_part_str = multipart_part_str;
params->op.obj_data.part_num = part_num;
return ret;
return ret;
}
+int DB::Object::add_mp_part(const DoutPrefixProvider *dpp,
+ RGWUploadPartInfo info) {
+ int ret = 0;
+
+ DBOpParams params = {};
+
+ store->InitializeParams(dpp, "GetObject", ¶ms);
+ InitializeParamsfromObject(dpp, ¶ms);
+
+ ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+
+ if (ret) {
+ ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+ goto out;
+ }
+
+ /* pick one field check if object exists */
+ if (!params.op.obj.state.exists) {
+ ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
+ return -1;
+ }
+
+ params.op.obj.mp_parts.push_back(info);
+ params.op.query_str = "mp";
+ params.op.obj.state.mtime = real_clock::now();
+
+ ret = store->ProcessOp(dpp, "UpdateObject", ¶ms);
+
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <<ret<<") " << dendl;
+ goto out;
+ }
+
+out:
+ return ret;
+}
+
+int DB::Object::get_mp_parts_list(const DoutPrefixProvider *dpp,
+ std::list<RGWUploadPartInfo>& info)
+{
+ int ret = 0;
+ DBOpParams params = {};
+ std::map<std::string, bufferlist> omap;
+
+ store->InitializeParams(dpp, "GetObject", ¶ms);
+ InitializeParamsfromObject(dpp, ¶ms);
+
+ ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+
+ if (ret) {
+ ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
+ goto out;
+ }
+
+ /* pick one field check if object exists */
+ if (!params.op.obj.state.exists) {
+ ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
+ return -1;
+ }
+
+ info = params.op.obj.mp_parts;
+
+out:
+ return ret;
+}
+
/* Taken from rgw_rados.cc */
void DB::gen_rand_obj_instance_name(rgw_obj_key *target_key)
{
/* tail object */
int part_num = (ofs / max_chunk_size);
- /* XXX: Handle multipart_num */
+ /* XXX: Handle multipart_str */
raw_obj read_obj(store, source->get_bucket_info().bucket.name, astate->obj.key.name,
- astate->obj.key.instance, astate->obj.key.ns, 0, part_num);
+ astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
read_len = len;
part_num = (ofs / max_chunk_size);
uint64_t read_len = std::min(len, max_chunk_size);
- /* XXX: Handle multipart_num */
+ /* XXX: Handle multipart_str */
raw_obj read_obj(store, get_bucket_info().bucket.name, astate->obj.key.name,
- astate->obj.key.instance, astate->obj.key.ns, 0, part_num);
+ astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
bool reading_from_head = (ofs < head_data_size);
r = cb(dpp, read_obj, ofs, read_len, reading_from_head, astate, arg);
/* XXX: Split into parts each of max_chunk_size. But later make tail
* object chunk size limit to sqlite blob limit */
int part_num = 0;
- uint64_t max_chunk_size, max_head_size;
- max_head_size = store->get_max_head_size();
- max_chunk_size = store->get_max_chunk_size();
+ uint64_t max_chunk_size = store->get_max_chunk_size();
/* tail_obj ofs should be greater than max_head_size */
- if (ofs < max_head_size) {
- return -1;
+ if (mp_part_str == "0.0") { // ensure not multipart meta object
+ if (ofs < store->get_max_head_size()) {
+ return -1;
+ }
}
uint64_t end = data.length();
part_num = (ofs / max_chunk_size);
uint64_t len = std::min(end, max_chunk_size);
- /* XXX: Handle multipart_num */
+ /* XXX: Handle multipart_str */
raw_obj write_obj(store, target->get_bucket_info().bucket.name, obj_state.obj.key.name,
- obj_state.obj.key.instance, obj_state.obj.key.ns, 0, part_num);
+ obj_state.obj.key.instance, obj_state.obj.key.ns, mp_part_str, part_num);
ldpp_dout(dpp, 20) << "dbstore->write obj-ofs=" << ofs << " write_len=" << len << dendl;
return r;
}
+int DB::Object::Write::update_mp_parts(const DoutPrefixProvider *dpp, rgw_obj_key new_obj_key)
+{
+ int ret = 0;
+ DBOpParams params = {};
+ DB *store = target->get_store();
+
+ store->InitializeParams(dpp, "UpdateObjectData", ¶ms);
+ target->InitializeParamsfromObject(dpp, ¶ms);
+
+ params.op.obj.new_obj_key = new_obj_key;
+
+ ret = store->ProcessOp(dpp, "UpdateObjectData", ¶ms);
+
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"In UpdateObjectData failed err:(" <<ret<<")" << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
int ret = 0;
DB *store = target->get_store();
#include "global/global_init.h"
#include "common/ceph_context.h"
#include "rgw/rgw_obj_manifest.h"
+#include "rgw/rgw_multi.h"
using namespace std;
/* Extra fields */
bool is_multipart;
+ std::list<RGWUploadPartInfo> mp_parts;
+
bufferlist head_data;
string min_marker;
string max_marker;
list<rgw_bucket_dir_entry> list_entries;
+ /* Below used to update mp_parts obj name
+ * from meta object to src object on completion */
+ rgw_obj_key new_obj_key;
};
struct DBOpObjectDataInfo {
RGWObjState state;
uint64_t part_num;
- uint64_t multipart_part_num;
+ string multipart_part_str;
uint64_t offset;
uint64_t size;
bufferlist data{};
string manifest_part_rules = ":manifest_part_rules";
string omap = ":omap";
string is_multipart = ":is_multipart";
+ string mp_parts = ":mp_parts";
string head_data = ":head_data";
string min_marker = ":min_marker";
string max_marker = ":max_marker";
+ /* Below used to update mp_parts obj name
+ * from meta object to src object on completion */
+ string new_obj_name = ":new_obj_name";
+ string new_obj_instance = ":new_obj_instance";
+ string new_obj_ns = ":new_obj_ns";
};
struct DBOpObjectDataPrepareInfo {
string offset = ":offset";
string data = ":data";
string size = ":size";
- string multipart_part_num = ":multipart_part_num";
+ string multipart_part_str = ":multipart_part_str";
};
struct DBOpLCEntryPrepareInfo {
class UpdateObjectOp *UpdateObject;
class ListBucketObjectsOp *ListBucketObjects;
class PutObjectDataOp *PutObjectData;
+ class UpdateObjectDataOp *UpdateObjectData;
class GetObjectDataOp *GetObjectData;
class DeleteObjectDataOp *DeleteObjectData;
ManifestPartRules BLOB, \
Omap BLOB, \
IsMultipart BOOL, \
+ MPPartsList BLOB, \
HeadData BLOB, \
PRIMARY KEY (ObjName, ObjInstance, BucketName), \
FOREIGN KEY (BucketName) \
REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);";
const string CreateObjectDataTableQ =
- /* Extra field 'MultipartPartNum' added which signifies multipart upload
- * part number. For regular object, it is '0'
+ /* Extra field 'MultipartPartStr' added which signifies multipart
+ * <uploadid + partnum>. For regular object, it is '0.0'
*
* - part: a collection of stripes that make a contiguous part of an
object. A regular object will only have one part (although might have
ObjInstance TEXT, \
ObjNS TEXT, \
BucketName TEXT NOT NULL , \
+ MultipartPartStr TEXT, \
PartNum INTEGER NOT NULL, \
Offset INTEGER, \
- Data BLOB, \
Size INTEGER, \
- MultipartPartNum INTEGER, \
- PRIMARY KEY (ObjName, BucketName, ObjInstance, MultipartPartNum, PartNum), \
+ Data BLOB, \
+ PRIMARY KEY (ObjName, BucketName, ObjInstance, MultipartPartStr, PartNum), \
FOREIGN KEY (BucketName, ObjName, ObjInstance) \
REFERENCES '{}' (BucketName, ObjName, ObjInstance) ON DELETE CASCADE ON UPDATE CASCADE \n);";
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
- ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData ) \
+ ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData ) \
VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
- {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
+ {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
public:
virtual ~PutObjectOp() {}
params.op.obj.tail_placement_storage_class,
params.op.obj.manifest_part_objs,
params.op.obj.manifest_part_rules, params.op.obj.omap,
- params.op.obj.is_multipart, params.op.obj.head_data);
+ params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data);
}
};
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
- ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData from '{}' \
+ ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
where BucketName = {} and ObjName = {} and ObjInstance = {}";
public:
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
- ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, HeadData from '{}' \
+ ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
where BucketName = {} and ObjName > {} ORDER BY ObjName ASC LIMIT {}";
public:
virtual ~ListBucketObjectsOp() {}
const string AttrsQuery =
"UPDATE '{}' SET ObjAttrs = {}, Mtime = {} \
where BucketName = {} and ObjName = {} and ObjInstance = {}";
+ const string MPQuery =
+ "UPDATE '{}' SET MPPartsList = {}, Mtime = {} \
+ where BucketName = {} and ObjName = {} and ObjInstance = {}";
const string MetaQuery =
"UPDATE '{}' SET \
ObjNS = {}, ACLs = {}, IndexVer = {}, Tag = {}, Flags = {}, VersionedEpoch = {}, \
HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \
TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \
ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \
- IsMultipart = {}, HeadData = {} \
+ IsMultipart = {}, MPPartsList = {}, HeadData = {} \
WHERE ObjName = {} and ObjInstance = {} and BucketName = {}";
public:
params.op.obj.obj_name.c_str(),
params.op.obj.obj_instance.c_str());
}
+ if (params.op.query_str == "mp") {
+ return fmt::format(MPQuery.c_str(),
+ params.object_table.c_str(), params.op.obj.mp_parts.c_str(),
+ params.op.obj.mtime.c_str(),
+ params.op.bucket.bucket_name.c_str(),
+ params.op.obj.obj_name.c_str(),
+ params.op.obj.obj_instance.c_str());
+ }
if (params.op.query_str == "meta") {
return fmt::format(MetaQuery.c_str(),
params.object_table.c_str(),
params.op.obj.tail_placement_storage_class,
params.op.obj.manifest_part_objs,
params.op.obj.manifest_part_rules, params.op.obj.omap,
- params.op.obj.is_multipart, params.op.obj.head_data,
+ params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data,
params.op.obj.obj_name, params.op.obj.obj_instance,
params.op.bucket.bucket_name);
}
private:
const string Query =
"INSERT OR REPLACE INTO '{}' \
- (ObjName, ObjInstance, ObjNS, BucketName, PartNum, Offset, Data, Size, MultipartPartNum) \
+ (ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data) \
VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {})";
public:
params.op.obj.obj_name, params.op.obj.obj_instance,
params.op.obj.obj_ns,
params.op.bucket.bucket_name.c_str(),
+ params.op.obj_data.multipart_part_str.c_str(),
params.op.obj_data.part_num,
- params.op.obj_data.offset.c_str(), params.op.obj_data.data.c_str(),
- params.op.obj_data.size, params.op.obj_data.multipart_part_num);
+ params.op.obj_data.offset.c_str(),
+ params.op.obj_data.size,
+ params.op.obj_data.data.c_str());
}
};
+class UpdateObjectDataOp: public DBOp {
+ private:
+ const string Query =
+ "UPDATE '{}' \
+ SET ObjName = {}, ObjInstance = {}, ObjNS = {} \
+ WHERE ObjName = {} and ObjInstance = {} and ObjNS = {} and \
+ BucketName = {}";
+
+ public:
+ virtual ~UpdateObjectDataOp() {}
+
+ string Schema(DBOpPrepareParams ¶ms) {
+ return fmt::format(Query.c_str(),
+ params.objectdata_table.c_str(),
+ params.op.obj.new_obj_name, params.op.obj.new_obj_instance,
+ params.op.obj.new_obj_ns,
+ params.op.obj.obj_name, params.op.obj.obj_instance,
+ params.op.obj.obj_ns,
+ params.op.bucket.bucket_name.c_str());
+ }
+};
class GetObjectDataOp: public DBOp {
private:
const string Query =
"SELECT \
- ObjName, ObjInstance, ObjNS, BucketName, PartNum, Offset, Data, Size, \
- MultipartPartNum from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {}";
+ ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data \
+ from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {} ORDER BY MultipartPartStr, PartNum";
public:
virtual ~GetObjectDataOp() {}
const rgw_user* powner_id, map<std::string, bufferlist>* pattrs,
ceph::real_time* pmtime, RGWObjVersionTracker* pobjv);
- int get_max_head_size() { return ObjHeadSize; }
- int get_max_chunk_size() { return ObjChunkSize; }
+ uint64_t get_max_head_size() { return ObjHeadSize; }
+ uint64_t get_max_chunk_size() { return ObjChunkSize; }
void gen_rand_obj_instance_name(rgw_obj_key *target_key);
// db raw obj string is of format -
- // "<bucketname>_<objname>_<objinstance>_<multipart-partnum>_<partnum>"
+ // "<bucketname>_<objname>_<objinstance>_<multipart-part-str>_<partnum>"
const string raw_obj_oid = "{0}_{1}_{2}_{3}_{4}";
inline string to_oid(const string& bucket, const string& obj_name, const string& obj_instance,
- uint64_t mp_num, uint64_t partnum) {
- string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, mp_num, partnum);
+ string mp_str, uint64_t partnum) {
+ string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, mp_str, partnum);
return s;
}
inline int from_oid(const string& oid, string& bucket, string& obj_name,
string& obj_instance,
- uint64_t& mp_num, uint64_t& partnum) {
+ string& mp_str, uint64_t& partnum) {
vector<std::string> result;
boost::split(result, oid, boost::is_any_of("_"));
bucket = result[0];
obj_name = result[1];
obj_instance = result[2];
- mp_num = stoi(result[3]);
+ mp_str = result[3];
partnum = stoi(result[4]);
return 0;
string obj_name;
string obj_instance;
string obj_ns;
- uint64_t multipart_partnum;
+ string multipart_part_str;
uint64_t part_num;
string obj_table;
}
raw_obj(DB* _db, string& _bname, string& _obj_name, string& _obj_instance,
- string& _obj_ns, int _mp_partnum, int _part_num) {
+ string& _obj_ns, string _mp_part_str, int _part_num) {
db = _db;
bucket_name = _bname;
obj_name = _obj_name;
obj_instance = _obj_instance;
obj_ns = _obj_ns;
- multipart_partnum = _mp_partnum;
+ multipart_part_str = _mp_part_str;
part_num = _part_num;
obj_table = bucket_name+".object.table";
int r;
db = _db;
- r = db->from_oid(oid, bucket_name, obj_name, obj_instance, multipart_partnum,
+ r = db->from_oid(oid, bucket_name, obj_name, obj_instance, multipart_part_str,
part_num);
if (r < 0) {
- multipart_partnum = 0;
+ multipart_part_str = "0.0";
part_num = 0;
}
struct Write {
DB::Object *target;
RGWObjState obj_state;
+ string mp_part_str = "0.0"; // multipart num
struct MetaParams {
ceph::real_time *mtime;
explicit Write(DB::Object *_target) : target(_target) {}
+ void set_mp_part_str(string _mp_part_str) { mp_part_str = _mp_part_str;}
int prepare(const DoutPrefixProvider* dpp);
int write_data(const DoutPrefixProvider* dpp,
bufferlist& data, uint64_t ofs);
bool assume_noent, bool modify_tail);
int write_meta(const DoutPrefixProvider *dpp, uint64_t size,
uint64_t accounted_size, map<string, bufferlist>& attrs);
+ /* Below are used to update mp data rows object name
+ * from meta to src object name on multipart upload
+ * completion
+ */
+ int update_mp_parts(const DoutPrefixProvider *dpp, rgw_obj_key new_obj_key);
};
struct Delete {
std::map<std::string, bufferlist> *m, bool* pmore);
using iterate_obj_cb = int (*)(const DoutPrefixProvider*, const raw_obj&, off_t, off_t,
bool, RGWObjState*, void*);
+ int add_mp_part(const DoutPrefixProvider *dpp, RGWUploadPartInfo info);
+ int get_mp_parts_list(const DoutPrefixProvider *dpp, std::list<RGWUploadPartInfo>& info);
int iterate_obj(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info, const rgw_obj& obj,
ManifestPartRules,
Omap,
IsMultipart,
+ MPPartsList,
HeadData
};
ObjDataInstance,
ObjDataNS,
ObjDataBucketName,
+ MultipartPartStr,
PartNum,
Offset,
- ObjData,
ObjDataSize,
- MultipartPartNum
+ ObjData
};
enum GetLCEntry {
SQL_DECODE_BLOB_PARAM(dpp, stmt, ManifestPartRules, op.obj.rules, sdb);
SQL_DECODE_BLOB_PARAM(dpp, stmt, Omap, op.obj.omap, sdb);
op.obj.is_multipart = sqlite3_column_int(stmt, IsMultipart);
+ SQL_DECODE_BLOB_PARAM(dpp, stmt, MPPartsList, op.obj.mp_parts, sdb);
SQL_DECODE_BLOB_PARAM(dpp, stmt, HeadData, op.obj.head_data, sdb);
op.obj.state.data = op.obj.head_data;
op.obj_data.part_num = sqlite3_column_int(stmt, PartNum);
op.obj_data.offset = sqlite3_column_int(stmt, Offset);
op.obj_data.size = sqlite3_column_int(stmt, ObjDataSize);
- op.obj_data.multipart_part_num = sqlite3_column_int(stmt, MultipartPartNum);
+ op.obj_data.multipart_part_str = (const char*)sqlite3_column_text(stmt, MultipartPartStr);
SQL_DECODE_BLOB_PARAM(dpp, stmt, ObjData, op.obj_data.data, sdb);
return 0;
UpdateObject = new SQLUpdateObject(sdb, db_name, cct);
ListBucketObjects = new SQLListBucketObjects(sdb, db_name, cct);
PutObjectData = new SQLPutObjectData(sdb, db_name, cct);
+ UpdateObjectData = new SQLUpdateObjectData(sdb, db_name, cct);
GetObjectData = new SQLGetObjectData(sdb, db_name, cct);
DeleteObjectData = new SQLDeleteObjectData(sdb, db_name, cct);
delete GetObject;
delete UpdateObject;
delete PutObjectData;
+ delete UpdateObjectData;
delete GetObjectData;
delete DeleteObjectData;
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_multipart.c_str(), sdb);
SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_multipart, sdb);
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.mp_parts.c_str(), sdb);
+ SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.mp_parts, sdb);
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.head_data.c_str(), sdb);
SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.head_data, sdb);
SQL_PREPARE(dpp, p_params, sdb, attrs_stmt, ret, "PrepareUpdateObject");
} else if (params->op.query_str == "meta") {
SQL_PREPARE(dpp, p_params, sdb, meta_stmt, ret, "PrepareUpdateObject");
+ } else if (params->op.query_str == "mp") {
+ SQL_PREPARE(dpp, p_params, sdb, mp_stmt, ret, "PrepareUpdateObject");
} else {
ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" <<
params->op.query_str << dendl;
stmt = &attrs_stmt;
} else if (params->op.query_str == "meta") {
stmt = &meta_stmt;
+ } else if (params->op.query_str == "mp") {
+ stmt = &mp_stmt;
} else {
ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" <<
params->op.query_str << dendl;
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.obj_attrs.c_str(), sdb);
SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.state.attrset, sdb);
}
+ if (params->op.query_str == "mp") {
+ SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.mp_parts.c_str(), sdb);
+ SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.mp_parts, sdb);
+ }
if (params->op.query_str == "meta") {
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.obj_ns.c_str(), sdb);
SQL_BIND_TEXT(dpp, *stmt, index, params->op.obj.state.obj.key.ns.c_str(), sdb);
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_multipart.c_str(), sdb);
SQL_BIND_INT(dpp, *stmt, index, params->op.obj.is_multipart, sdb);
+ SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.mp_parts.c_str(), sdb);
+ SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.mp_parts, sdb);
+
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.head_data.c_str(), sdb);
SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.head_data, sdb);
}
stmt = &attrs_stmt;
} else if (params->op.query_str == "meta") {
stmt = &meta_stmt;
+ } else if (params->op.query_str == "mp") {
+ stmt = &mp_stmt;
} else {
ldpp_dout(dpp, 0)<<"In SQLUpdateObject invalid query_str:" <<
params->op.query_str << dendl;
SQL_BIND_INT(dpp, stmt, index, params->op.obj_data.size, sdb);
- SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj_data.multipart_part_num.c_str(), sdb);
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj_data.multipart_part_str.c_str(), sdb);
- SQL_BIND_INT(dpp, stmt, index, params->op.obj_data.multipart_part_num, sdb);
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj_data.multipart_part_str.c_str(), sdb);
out:
return rc;
return ret;
}
+int SQLUpdateObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+ int ret = -1;
+ struct DBOpPrepareParams p_params = PrepareParams;
+ struct DBOpParams copy = *params;
+ string bucket_name = params->op.bucket.info.bucket.name;
+
+ if (!*sdb) {
+ ldpp_dout(dpp, 0)<<"In SQLUpdateObjectData - no db" << dendl;
+ goto out;
+ }
+
+ if (p_params.object_table.empty()) {
+ p_params.object_table = getObjectTable(bucket_name);
+ }
+ if (p_params.objectdata_table.empty()) {
+ p_params.objectdata_table = getObjectDataTable(bucket_name);
+ }
+ params->bucket_table = p_params.bucket_table;
+ params->object_table = p_params.object_table;
+ params->objectdata_table = p_params.objectdata_table;
+ (void)createObjectDataTable(dpp, params);
+
+ SQL_PREPARE(dpp, p_params, sdb, stmt, ret, "PrepareUpdateObjectData");
+
+out:
+ return ret;
+}
+
+int SQLUpdateObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+ int index = -1;
+ int rc = 0;
+ struct DBOpPrepareParams p_params = PrepareParams;
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name.c_str(), sdb);
+
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_instance.c_str(), sdb);
+
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.instance.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_ns.c_str(), sdb);
+
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.ns.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name.c_str(), sdb);
+
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_name.c_str(), sdb);
+
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.name.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_instance.c_str(), sdb);
+
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.instance.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.new_obj_ns.c_str(), sdb);
+
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.new_obj_key.ns.c_str(), sdb);
+
+out:
+ return rc;
+}
+
+int SQLUpdateObjectData::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+ int ret = -1;
+
+ SQL_EXECUTE(dpp, params, stmt, NULL);
+out:
+ return ret;
+}
+
int SQLGetObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
{
int ret = -1;
sqlite3_stmt *omap_stmt = NULL; // Prepared statement
sqlite3_stmt *attrs_stmt = NULL; // Prepared statement
sqlite3_stmt *meta_stmt = NULL; // Prepared statement
+ sqlite3_stmt *mp_stmt = NULL; // Prepared statement
public:
SQLUpdateObject(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
};
+class SQLUpdateObjectData : public SQLiteDB, public UpdateObjectDataOp {
+ private:
+ sqlite3 **sdb = NULL;
+ sqlite3_stmt *stmt = NULL; // Prepared statement
+
+ public:
+ SQLUpdateObjectData(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
+ SQLUpdateObjectData(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
+
+ ~SQLUpdateObjectData() {
+ if (stmt)
+ sqlite3_finalize(stmt);
+ }
+ int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params);
+ int Execute(const DoutPrefixProvider *dpp, DBOpParams *params);
+ int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
+};
+
class SQLGetObjectData : public SQLiteDB, public GetObjectDataOp {
private:
sqlite3 **sdb = NULL;
ret = db->ProcessOp(dpp, "PutObject", ¶ms);
ASSERT_EQ(ret, 0);
- /* Insert another object */
+ /* Insert another objects */
params.op.obj.state.obj.key.name = "object2";
params.op.obj.state.obj.key.instance = "inst2";
ret = db->ProcessOp(dpp, "PutObject", ¶ms);
ASSERT_EQ(ret, 0);
+
+ params.op.obj.state.obj.key.name = "object3";
+ params.op.obj.state.obj.key.instance = "inst3";
+ ret = db->ProcessOp(dpp, "PutObject", ¶ms);
+ ASSERT_EQ(ret, 0);
}
TEST_F(DBStoreTest, ListAllObjects) {
params.op.obj_data.part_num = 1;
params.op.obj_data.offset = 10;
- params.op.obj_data.multipart_part_num = 2;
+ params.op.obj_data.multipart_part_str = "2";
bufferlist b1;
encode("HELLO WORLD", b1);
params.op.obj_data.data = b1;
ASSERT_EQ(ret, 0);
}
+TEST_F(DBStoreTest, UpdateObjectData) {
+ struct DBOpParams params = GlobalParams;
+ int ret = -1;
+
+ params.op.obj.new_obj_key.name = "object3";
+ params.op.obj.new_obj_key.instance = "inst3";
+ ret = db->ProcessOp(dpp, "UpdateObjectData", ¶ms);
+ ASSERT_EQ(ret, 0);
+}
+
TEST_F(DBStoreTest, GetObjectData) {
struct DBOpParams params = GlobalParams;
int ret = -1;
+ params.op.obj.state.obj.key.instance = "inst3";
+ params.op.obj.state.obj.key.name = "object3";
ret = db->ProcessOp(dpp, "GetObjectData", ¶ms);
ASSERT_EQ(ret, 0);
ASSERT_EQ(params.op.obj_data.part_num, 1);
ASSERT_EQ(params.op.obj_data.offset, 10);
- ASSERT_EQ(params.op.obj_data.multipart_part_num, 2);
+ ASSERT_EQ(params.op.obj_data.multipart_part_str, "2");
+ ASSERT_EQ(params.op.obj.state.obj.key.instance, "inst3");
+ ASSERT_EQ(params.op.obj.state.obj.key.name, "object3");
string data;
decode(data, params.op.obj_data.data);
ASSERT_EQ(data, "HELLO WORLD");