return Ob->UpdateObject;
if (!Op.compare("ListBucketObjects"))
return Ob->ListBucketObjects;
+ if (!Op.compare("ListVersionedObjects"))
+ return Ob->ListVersionedObjects;
if (!Op.compare("PutObjectData"))
return Ob->PutObjectData;
if (!Op.compare("UpdateObjectData"))
goto out;
}
-
for (auto& entry : db_params.op.obj.list_entries) {
+ if (!params.list_versions) {
+ if (!(entry.flags & rgw_bucket_dir_entry::FLAG_CURRENT) ||
+ (entry.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)) {
+ // skip all non-current entries and delete_marker
+ continue;
+ }
+ }
+
if (count >= max) {
*is_truncated = true;
next_marker.name = entry.key.name;
next_marker.instance = entry.key.instance;
break;
}
+
if (!params.delim.empty()) {
const std::string& objname = entry.key.name;
const int delim_pos = objname.find(params.delim, params.prefix.size());
return ret;
}
+int DB::Object::get_object_impl(const DoutPrefixProvider *dpp, DBOpParams& params) {
+ int ret = 0;
+
+ if (params.op.obj.state.obj.key.name.empty()) {
+ /* Initialize */
+ store->InitializeParams(dpp, ¶ms);
+ InitializeParamsfromObject(dpp, ¶ms);
+ }
+
+ ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+
+ /* pick one field check if object exists */
+ if (!ret && !params.op.obj.state.exists) {
+ ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
+ ret = -ENOENT;
+ }
+
+ return ret;
+}
+
int DB::Object::obj_omap_set_val_by_key(const DoutPrefixProvider *dpp,
const std::string& key, bufferlist& val,
bool must_exist) {
DBOpParams params = {};
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
if (ret) {
- ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- /* pick one field check if object exists */
- if (!params.op.obj.state.exists) {
- ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
- return -1;
- }
-
params.op.obj.omap[key] = val;
params.op.query_str = "omap";
params.op.obj.state.mtime = real_clock::now();
if (!vals)
return -1;
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
if (ret) {
- ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- /* pick one field check if object exists */
- if (!params.op.obj.state.exists) {
- ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
- return -1;
- }
-
omap = params.op.obj.omap;
for (const auto& k : keys) {
DBOpParams params = {};
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
if (ret) {
- ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- /* pick one field check if object exists */
- if (!params.op.obj.state.exists) {
- ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
- return -1;
- }
-
params.op.obj.mp_parts.push_back(info);
params.op.query_str = "mp";
params.op.obj.state.mtime = real_clock::now();
DBOpParams params = {};
std::map<std::string, bufferlist> omap;
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
if (ret) {
- ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- /* pick one field check if object exists */
- if (!params.op.obj.state.exists) {
- ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
- return -1;
- }
-
info = params.op.obj.mp_parts;
out:
if (!m)
return -1;
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
if (ret) {
- ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- /* pick one field check if object exists */
- if (!params.op.obj.state.exists) {
- ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
- return -1;
- }
-
(*m) = params.op.obj.omap;
out:
if (!m)
return -1;
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
if (ret) {
- ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- /* pick one field check if object exists */
- if (!params.op.obj.state.exists) {
- ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
- return -1;
- }
-
omap = params.op.obj.omap;
for (iter = omap.begin(); iter != omap.end(); ++iter) {
rgw::sal::Attrs *attrs;
map<string, bufferlist>::iterator iter;
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
if (ret) {
- ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- /* pick one field check if object exists */
- if (!params.op.obj.state.exists) {
- ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
- return -1;
- }
-
/* For now lets keep it simple..rmattrs & setattrs ..
* XXX: Check rgw_rados::set_attrs
*/
return write_len;
}
-int DB::Object::follow_olh(const DoutPrefixProvider *dpp,
- const RGWBucketInfo& bucket_info, RGWObjState* state,
- const rgw_obj& olh_obj, rgw_obj *target)
-{
- auto iter = state->attrset.find(RGW_ATTR_OLH_INFO);
- if (iter == state->attrset.end()) {
- return -EINVAL;
- }
-
- DBOLHInfo olh;
- string s;
- const bufferlist& bl = iter->second;
- try {
- auto biter = bl.cbegin();
- decode(olh, biter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- if (olh.removed) {
- return -ENOENT;
- }
-
- *target = olh.target;
+int DB::Object::list_versioned_objects(const DoutPrefixProvider *dpp,
+ std::list<rgw_bucket_dir_entry>& list_entries) {
+ int ret = 0;
+ store = get_store();
+ DBOpParams db_params = {};
- return 0;
-}
+ store->InitializeParams(dpp, &db_params);
+ InitializeParamsfromObject(dpp, &db_params);
-int DB::Object::get_olh_target_state(const DoutPrefixProvider *dpp,
- const RGWBucketInfo& bucket_info, const rgw_obj& obj,
- RGWObjState* olh_state, RGWObjState** target)
-{
- int ret = 0;
- rgw_obj target_obj;
+ db_params.op.list_max_count = MAX_VERSIONED_OBJECTS;
- if (!olh_state->is_olh) {
- return EINVAL;
- }
+ ret = store->ProcessOp(dpp, "ListVersionedObjects", &db_params);
- ret = follow_olh(dpp, bucket_info, olh_state, obj, &target_obj); /* might return -EAGAIN */
- if (ret < 0) {
- ldpp_dout(dpp, 0)<<"In get_olh_target_state follow_olh() failed err:(" <<ret<<")" << dendl;
- return ret;
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"In ListVersionedObjects failed err:(" <<ret<<") " << dendl;
+ } else {
+ list_entries = db_params.op.obj.list_entries;
}
- ret = get_obj_state(dpp, bucket_info, target_obj, false, target);
-
return ret;
}
DBOpParams params = {};
RGWObjState* s;
- store->InitializeParams(dpp, ¶ms);
- InitializeParamsfromObject(dpp, ¶ms);
-
- ret = store->ProcessOp(dpp, "GetObject", ¶ms);
+ ret = get_object_impl(dpp, params);
- if (ret) {
- ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
+ if (ret && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
goto out;
}
- if (!params.op.obj.state.exists) {
- return -ENOENT;
+ /* Check if its a versioned object */
+ if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
+ if ((ret == -ENOENT) || (params.op.obj.flags & rgw_bucket_dir_entry::FLAG_VER)) { // NEWWWW
+ ret = list_versioned_objects(dpp, params.op.obj.list_entries);
+
+ if (params.op.obj.list_entries.size() != 0) {
+ /* versioned object. Read the latest version object provided its not a
+ * delete marker */
+ auto& ent = params.op.obj.list_entries.front();
+ if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+ ret = -ENOENT;
+ ldpp_dout(dpp, 0) <<" Latest is delete marker - err:(" <<ret<<")" << dendl;
+ goto out;
+ }
+ params.op.obj.state.obj.key.instance = ent.key.instance;
+
+ ret = get_object_impl(dpp, params);
+
+ if (ret) {
+ ldpp_dout(dpp, 0) <<"get_object_impl of versioned object failed err:(" <<ret<<")" << dendl;
+ goto out;
+ }
+ }
+ }
}
s = ¶ms.op.obj.state;
*state = &obj_state;
**state = *s;
- if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
- /* fetch current version obj details */
- ret = get_olh_target_state(dpp, bucket_info, obj, s, state);
-
- if (ret < 0) {
- ldpp_dout(dpp, 0)<<"get_olh_target_state failed err:(" <<ret<<")" << dendl;
- }
- }
-
out:
return ret;
RGWObjState* astate;
- /* XXX Read obj_id too */
int r = source->get_state(dpp, &astate, true);
if (r < 0)
return r;
obj_state.obj = target->obj;
- if (target->obj_id.empty()) {
- // generate obj_id
- char buf[33];
- gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
- target->obj_id = target->obj.key.name + "." + buf;
+ if (target->obj_id.empty()) {
+ if (!target->obj.key.instance.empty() && (target->obj.key.instance != "null")) {
+ /* versioned object. Set obj_id same as versionID/instance */
+ target->obj_id = target->obj.key.instance;
+ } else {
+ // generate obj_id
+ char buf[33];
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ target->obj_id = buf;
+ }
}
ret = 0;
return 0;
}
+/*
+ * If versioned,
+ * - check if the old version of the object already exists
+ * if exists,
+ * - set its version-id/instance to "null" if instance was empty
+ * - demote it to NON_CURRENT
+ * - create versioned object with FLAG_CURRENT set
+ *
+ * XXX: call this function under a lock or sqlite transaction to prevent
+ * parallel object uploads marking both the versions as FLAG_CURRENT;
+ *
+ */
+int DB::Object::Write::write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params) {
+ DB *store = target->get_store();
+ int ret = -1;
+ DBOpParams obj_params = {};
+ uint64_t version_num = 0;
+
+ store->InitializeParams(dpp, &obj_params);
+ target->InitializeParamsfromObject(dpp, &obj_params);
+
+ /* Check if already exists */
+ obj_params.op.obj.state.obj.key.instance.clear();
+ ret = target->list_versioned_objects(dpp, obj_params.op.obj.list_entries);
+
+ if (ret && ret != -ENOENT) {
+ ldpp_dout(dpp, 0)<<"ListVersionedObjects of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
+ return ret;
+ }
+
+ for (auto& ent : obj_params.op.obj.list_entries) {
+ ret = target->update_obj_next_version(dpp, ent, false, version_num); // demote previous version
+ if (ret != ENOENT) {
+ break;
+ }
+ // continue to next object version
+ }
+
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"demote_next_version of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
+ return ret;
+ }
+
+ params.op.obj.version_num = ++version_num;
+
+ /* Put Object */
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+ ret = store->ProcessOp(dpp, "PutObject", ¶ms);
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
+ }
+
+ return ret;
+}
+
+/* promote or demote next object version */
+int DB::Object::update_obj_next_version(const DoutPrefixProvider *dpp,
+ rgw_bucket_dir_entry &obj_entry,
+ bool promote, uint64_t& version_num) {
+ int ret = -1;
+ DBOpParams params = {};
+
+ params.op.obj.state.obj.key.name = obj_entry.key.name;
+ params.op.obj.state.obj.key.instance = obj_entry.key.instance;
+
+ // fetch the last version of the object
+ store->InitializeParams(dpp, ¶ms);
+ params.op.bucket.info.bucket.name = bucket_info.bucket.name;
+
+ ret = get_object_impl(dpp, params);
+
+ if (ret) { // do nothing as its best effort?
+ ldpp_dout(dpp, 0)<<"In demote_object_next_ver get of prev version(" << params.op.obj.obj_id << ") failed err:(" <<ret<<")" << dendl;
+ goto out;
+ }
+
+ if (params.op.obj.state.obj.key.instance.empty()) {
+ /* This means, previous version of object exists which was created before
+ * versioning is enabled on the bucket.
+ *
+ * Set its instance to "null" and mark it as versioned.
+ */
+ params.op.obj.state.obj.key.instance = "null";
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+ }
+ version_num = params.op.obj.version_num;
+
+ if (promote) {
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+ } else {
+ params.op.obj.flags &= ~(rgw_bucket_dir_entry::FLAG_CURRENT);
+ }
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+
+ ret = store->ProcessOp(dpp, "PutObject", ¶ms); // XXX: can be UpdateObject?
+
+ if (ret) { // do nothing as its best effort?
+ ldpp_dout(dpp, 0)<<"Put of Old version of object failed err:(" <<ret<<")" << dendl;
+ }
+out :
+ return ret;
+}
+
/* Write metadata & head object data */
int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
uint64_t size, uint64_t accounted_size,
}
}
- if (state->is_olh) {
- (*attrset)[RGW_ATTR_OLH_ID_TAG] = state->olh_tag;
- }
-
state->mtime = meta.set_mtime;
if (meta.data) {
params.op.obj.state.accounted_size = accounted_size;
params.op.obj.owner = target->get_bucket_info().owner.id;
- /* XXX: handle versioning */
if (meta.mtime) {
*meta.mtime = meta.set_mtime;
}
params.op.query_str = "meta";
params.op.obj.obj_id = target->obj_id;
- ret = store->ProcessOp(dpp, "PutObject", ¶ms);
- if (ret) {
- ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
- goto out;
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+
+ /* Check if versioned */
+ bool is_versioned = !target->obj.key.instance.empty() && (target->obj.key.instance != "null");
+ params.op.obj.is_versioned = is_versioned;
+
+ if (!is_versioned) {
+ ret = store->ProcessOp(dpp, "PutObject", ¶ms);
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
+ goto out;
+ }
+ return ret;
}
- /* pick one field check if object exists */
- return 0;
+ /* versioned object */
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+ ret = write_versioned_obj(dpp, params);
out:
if (ret < 0) {
int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
int ret = 0;
- DB *store = target->get_store();
- RGWObjState* astate;
+ DBOpParams del_params = {};
- int r = target->get_state(dpp, &astate, true);
- if (r < 0)
- return r;
+ ret = target->get_object_impl(dpp, del_params);
- if (!astate->exists) {
- return -ENOENT;
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0)<<"GetObject during delete failed err:(" <<ret<<")" << dendl;
+ return ret;
}
- /* XXX: handle versioned objects. Create delete marker */
+ if (!ret && !(del_params.op.obj.flags &
+ (rgw_bucket_dir_entry::FLAG_DELETE_MARKER | rgw_bucket_dir_entry::FLAG_VER))) {
+ /* Non versioned objects. Simple delete */
+ ret = delete_obj_impl(dpp, del_params);
+ return ret;
+ }
- /* XXX: check params conditions */
- DBOpParams del_params = {};
+ /* check if it is versioned object. */
+ ret = target->list_versioned_objects(dpp, del_params.op.obj.list_entries);
+
+ if (!del_params.op.obj.list_entries.empty()) {
+ ret = delete_versioned_obj(dpp, del_params);
+ } else {
+ ret = -ENOENT;
+ }
+
+ return ret;
+}
- store->InitializeParams(dpp, &del_params);
- target->InitializeParamsfromObject(dpp, &del_params);
+int DB::Object::Delete::delete_obj_impl(const DoutPrefixProvider *dpp,
+ DBOpParams& del_params) {
+ int ret = 0;
+ DB *store = target->get_store();
ret = store->ProcessOp(dpp, "DeleteObject", &del_params);
if (ret) {
* its corresponding head object is deleted (like here in this case).
*/
DBOpParams update_params = del_params;
- update_params.op.obj.obj_id = astate->shadow_obj; // objectID is copied here in get_state()
update_params.op.obj.state.mtime = real_clock::now();
ret = store->ProcessOp(dpp, "UpdateObjectData", &update_params);
if (ret) {
ldpp_dout(dpp, 0) << "Updating tail objects mtime failed err:(" <<ret<<")" << dendl;
- return ret;
}
+ return ret;
+}
- return 0;
+/*
+ * a) if no versionID specified,
+ * - create a delete marker with
+ * - new version/instanceID (if bucket versioned)
+ * - null versionID (if versioning suspended)
+ * - demote the current version entry
+ * XXX: If delete_marker is the latest version, another delete_marker
+ * is not created for now (for any subsequent deletes) . Will revisit this if
+ * needed.
+ *
+ * b) If versionID provided,
+ * - delete that particular entry
+ * - Incase the entry is current entry, promote next object version to CURRENT.
+ *
+ * XXX: call this function under a lock or sqlite transaction to prevent
+ * parallel object uploads marking both the versions as FLAG_CURRENT;
+ */
+int DB::Object::Delete::delete_versioned_obj(const DoutPrefixProvider *dpp,
+ DBOpParams& del_params) {
+
+ DB *store = target->get_store();
+ bool versioning_enabled = (params.versioning_status & BUCKET_VERSIONED);
+ int ret = -1;
+ DBOpParams olh_params = {};
+ std::string version_id;
+ DBOpParams next_params = del_params;
+
+ version_id = del_params.op.obj.state.obj.key.instance;
+ auto &ent = del_params.op.obj.list_entries.front();
+ uint64_t version_num = MAX_VERSIONED_OBJECTS;
+
+ if (version_id.empty()) {
+ /* create delete marker */
+
+ if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+ // for now do not create another delete marker..just exit
+ return 0;
+ }
+ DBOpParams dm_params = del_params;
+
+ //demote current entry
+ target->update_obj_next_version(dpp, ent, false, version_num); //XXX: not checking return status
+
+ // create delete marker
+ store->InitializeParams(dpp, &dm_params);
+ target->InitializeParamsfromObject(dpp, &dm_params);
+
+ if (!versioning_enabled) {
+ dm_params.op.obj.state.obj.key.instance = "null";
+ } else {
+ store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key);
+ dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance;
+ }
+
+ dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER);
+ dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_CURRENT);
+ dm_params.op.obj.version_num = ++version_num; // max obj version list
+
+ ret = store->ProcessOp(dpp, "PutObject", &dm_params);
+
+ if (ret) {
+ ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <<ret<<")" << dendl;
+ return ret;
+ }
+ } else {
+ // delete paritcular version-id
+ DBOpParams dm_params = {};
+
+ store->InitializeParams(dpp, &dm_params);
+ target->InitializeParamsfromObject(dpp, &dm_params);
+
+ dm_params.op.obj.state.obj.key.instance = version_id;
+
+ ret = target->get_object_impl(dpp, dm_params);
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "get obj of versioned object failed - err:(" <<ret<<")" << dendl;
+ return ret;
+ }
+ ret = delete_obj_impl(dpp, dm_params);
+ if (dm_params.op.obj.flags & rgw_bucket_dir_entry::FLAG_CURRENT) {
+ bool found = false;
+ std::list<rgw_bucket_dir_entry>::iterator e;
+ for (e = del_params.op.obj.list_entries.begin(); e != del_params.op.obj.list_entries.end(); e++) {
+
+ if (e->key.instance == version_id) {
+ e++;
+ if (e != del_params.op.obj.list_entries.end()) {
+ found = true;
+ }
+ break;
+ }
+ }
+ if (found) {
+ uint64_t version_num = 0;
+ rgw_bucket_dir_entry& e1 = *e;
+ target->update_obj_next_version(dpp, e1, true, version_num);
+ }
+ }
+ }
+
+ return ret;
}
int DB::get_entry(const std::string& oid, const std::string& marker,
ret = ProcessOp(dpp, "GetBucket", ¶ms);
if (ret) {
ldpp_dout(dpp, 0) << "In GetBucket failed err:(" <<ret<<")" << dendl;
+ return ret;
}
ldpp_dout(dpp, 20) << " Deleting stale_objs of bucket( " << bucket <<")" << dendl;
struct DBOpObjectInfo {
RGWAccessControlPolicy acls;
- RGWObjState state;
+ RGWObjState state = {};
/* Below are taken from rgw_bucket_dir_entry */
RGWObjCategory category;
std::string prefix;
std::list<rgw_bucket_dir_entry> list_entries;
/* XXX: Maybe use std::vector instead of std::list */
+
+ /* for versioned objects */
+ bool is_versioned;
+ uint64_t version_num = 1; // default value for plain entries (non-versioned)
};
struct DBOpObjectDataInfo {
static constexpr const char* fake_tag = ":fake_tag";
static constexpr const char* shadow_obj = ":shadow_obj";
static constexpr const char* has_data = ":has_data";
- static constexpr const char* is_olh = ":is_ols";
- static constexpr const char* olh_tag = ":olh_tag";
+ static constexpr const char* is_versioned = ":is_versioned";
+ static constexpr const char* version_num = ":version_num";
static constexpr const char* pg_ver = ":pg_ver";
static constexpr const char* zone_short_id = ":zone_short_id";
static constexpr const char* obj_version = ":obj_version";
static constexpr const char* new_obj_name = ":new_obj_name";
static constexpr const char* new_obj_instance = ":new_obj_instance";
static constexpr const char* new_obj_ns = ":new_obj_ns";
+ static constexpr const char* versions = ":versions";
};
struct DBOpObjectDataPrepareInfo {
std::shared_ptr<class GetObjectOp> GetObject;
std::shared_ptr<class UpdateObjectOp> UpdateObject;
std::shared_ptr<class ListBucketObjectsOp> ListBucketObjects;
+ std::shared_ptr<class ListVersionedObjectsOp> ListVersionedObjects;
std::shared_ptr<class PutObjectDataOp> PutObjectData;
std::shared_ptr<class UpdateObjectDataOp> UpdateObjectData;
std::shared_ptr<class GetObjectDataOp> GetObjectData;
FakeTag BOOL, \
ShadowObj TEXT, \
HasData BOOL, \
- IsOLH BOOL, \
- OLHTag BLOB, \
+ IsVersioned BOOL, \
+ VersionNum INTEGER, \
PGVer INTEGER, \
ZoneShortID INTEGER, \
ObjVersion INTEGER, \
IsMultipart BOOL, \
MPPartsList BLOB, \
HeadData BLOB, \
+ VERSIONS BLOB, \
PRIMARY KEY (ObjName, ObjInstance, BucketName), \
FOREIGN KEY (BucketName) \
REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);";
System, PlacementName, PlacementStorageClass, PlacementTags, \
BucketQuota, TempURLKeys, UserQuota, Type, MfaIDs, AssumedRoleARN, \
UserAttrs, UserVersion, UserVersionTag \
- from '{}' where Tenant = {} and UserID = {} and NS = {}";
+ from '{}' where UserID = {}";
public:
virtual ~GetUserOp() {}
} else if (params.op.query_str == "user_id") {
return fmt::format(QueryByUserID,
params.user_table,
- params.op.user.tenant,
- params.op.user.user_id,
- params.op.user.ns);
+ params.op.user.user_id);
} else {
return fmt::format(Query, params.user_table,
params.op.user.user_id);
Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
- ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
+ ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
- ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData ) \
+ ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
+ HeadData, Versions) \
VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
- {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
+ {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
public:
params.op.obj.accounted_size, params.op.obj.mtime,
params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag,
params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj,
- params.op.obj.has_data, params.op.obj.is_olh, params.op.obj.olh_tag,
+ params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num,
params.op.obj.pg_ver, params.op.obj.zone_short_id,
params.op.obj.obj_version, params.op.obj.obj_version_tag,
params.op.obj.obj_attrs, params.op.obj.head_size,
params.op.obj.tail_placement_storage_class,
params.op.obj.manifest_part_objs,
params.op.obj.manifest_part_rules, params.op.obj.omap,
- params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data);
+ params.op.obj.is_multipart, params.op.obj.mp_parts,
+ params.op.obj.head_data, params.op.obj.versions);
}
};
Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
- ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
+ ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
- ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
+ ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
+ HeadData, Versions from '{}' \
where BucketName = {} and ObjName = {} and ObjInstance = {}";
public:
Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
- ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
+ ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
}
};
+#define MAX_VERSIONED_OBJECTS 20
+class ListVersionedObjectsOp: virtual public DBOp {
+ private:
+ // once we have stats also stored, may have to update this query to join
+ // these two tables.
+ static constexpr std::string_view Query =
+ "SELECT \
+ ObjName, ObjInstance, ObjNS, BucketName, ACLs, IndexVer, Tag, \
+ Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
+ StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
+ AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
+ ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
+ ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
+ ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
+ TailPlacementRuleName, TailPlacementStorageClass, \
+ ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
+ HeadData, Versions from '{}' \
+ where BucketName = {} and ObjName = {} ORDER BY VersionNum DESC LIMIT {}";
+ public:
+ virtual ~ListVersionedObjectsOp() {}
+
+ static std::string Schema(DBOpPrepareParams ¶ms) {
+ /* XXX: Include obj_id, delim */
+ return fmt::format(Query,
+ params.object_table,
+ params.op.bucket.bucket_name,
+ params.op.obj.obj_name,
+ params.op.list_max_count);
+ }
+};
+
class UpdateObjectOp: virtual public DBOp {
private:
// Updates Omap
StorageClass = {}, Appendable = {}, ContentType = {}, \
IndexHashSource = {}, ObjSize = {}, AccountedSize = {}, Mtime = {}, \
Epoch = {}, ObjTag = {}, TailTag = {}, WriteTag = {}, FakeTag = {}, \
- ShadowObj = {}, HasData = {}, IsOLH = {}, OLHTag = {}, PGVer = {}, \
+ ShadowObj = {}, HasData = {}, IsVersioned = {}, VersionNum = {}, PGVer = {}, \
ZoneShortID = {}, ObjVersion = {}, ObjVersionTag = {}, ObjAttrs = {}, \
HeadSize = {}, MaxHeadSize = {}, ObjID = {}, TailInstance = {}, \
HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \
TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \
ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \
- IsMultipart = {}, MPPartsList = {}, HeadData = {} \
+ IsMultipart = {}, MPPartsList = {}, HeadData = {}, Versions = {} \
WHERE ObjName = {} and ObjInstance = {} and BucketName = {}";
public:
params.op.obj.accounted_size, params.op.obj.mtime,
params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag,
params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj,
- params.op.obj.has_data, params.op.obj.is_olh, params.op.obj.olh_tag,
+ params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num,
params.op.obj.pg_ver, params.op.obj.zone_short_id,
params.op.obj.obj_version, params.op.obj.obj_version_tag,
params.op.obj.obj_attrs, params.op.obj.head_size,
params.op.obj.tail_placement_storage_class,
params.op.obj.manifest_part_objs,
params.op.obj.manifest_part_rules, params.op.obj.omap,
- params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data,
+ params.op.obj.is_multipart, params.op.obj.mp_parts,
+ params.op.obj.head_data, params.op.obj.versions,
params.op.obj.obj_name, params.op.obj.obj_instance,
params.op.bucket.bucket_name);
}
params.op.obj.obj_id);
}
};
+
class GetObjectDataOp: virtual public DBOp {
private:
static constexpr std::string_view Query =
bool assume_noent, bool modify_tail);
int write_meta(const DoutPrefixProvider *dpp, uint64_t size,
uint64_t accounted_size, std::map<std::string, bufferlist>& attrs);
+ int write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params);
};
struct Delete {
explicit Delete(DB::Object *_target) : target(_target) {}
int delete_obj(const DoutPrefixProvider *dpp);
+ int delete_obj_impl(const DoutPrefixProvider *dpp, DBOpParams& del_params);
+ int delete_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& del_params);
};
/* XXX: the parameters may be subject to change. All we need is bucket name
* & obj name,instance - keys */
+ int get_object_impl(const DoutPrefixProvider *dpp, DBOpParams& params);
int get_obj_state(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
const rgw_obj& obj,
bool follow_olh, RGWObjState **state);
- int get_olh_target_state(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
- RGWObjState* olh_state, RGWObjState** target);
- int follow_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState *state,
- const rgw_obj& olh_obj, rgw_obj *target);
-
int get_state(const DoutPrefixProvider *dpp, RGWObjState **pstate, bool follow_olh);
+ int list_versioned_objects(const DoutPrefixProvider *dpp,
+ std::list<rgw_bucket_dir_entry>& list_entries);
DB *get_store() { return store; }
rgw_obj& get_obj() { return obj; }
const RGWBucketInfo& bucket_info, const rgw_obj& obj,
off_t ofs, off_t end, uint64_t max_chunk_size,
iterate_obj_cb cb, void *arg);
+ int update_obj_next_version(const DoutPrefixProvider *dpp,
+ rgw_bucket_dir_entry &obj_entry,
+ bool promote, uint64_t& version_num);
};
int get_obj_iterate_cb(const DoutPrefixProvider *dpp,
const raw_obj& read_obj, off_t obj_ofs,
#define SQL_BIND_TEXT(dpp, stmt, index, str, sdb) \
do { \
- if (strcmp(str, "null") == 0) { \
- rc = sqlite3_bind_text(stmt, index, "", -1, SQLITE_TRANSIENT); \
- } else { \
rc = sqlite3_bind_text(stmt, index, str, -1, SQLITE_TRANSIENT); \
- } \
- \
if (rc != SQLITE_OK) { \
ldpp_dout(dpp, 0)<<"sqlite bind text failed for index(" \
<<index<<"), str("<<str<<") in stmt(" \
FakeTag,
ShadowObj,
HasData,
- IsOLH,
- OLHTag,
+ IsVersioned,
+ VersionNum,
PGVer,
ZoneShortID,
ObjVersion,
Omap,
IsMultipart,
MPPartsList,
- HeadData
+ HeadData,
+ Versions
};
enum GetObjectData {
op.obj.state.fake_tag = sqlite3_column_int(stmt, FakeTag);
op.obj.state.shadow_obj = (const char*)sqlite3_column_text(stmt, ShadowObj);
op.obj.state.has_data = sqlite3_column_int(stmt, HasData);
- op.obj.state.is_olh = sqlite3_column_int(stmt, IsOLH);
- SQL_DECODE_BLOB_PARAM(dpp, stmt, OLHTag, op.obj.state.olh_tag, sdb);
+ op.obj.is_versioned = sqlite3_column_int(stmt, IsVersioned);
+ op.obj.version_num = sqlite3_column_int(stmt, VersionNum);
op.obj.state.pg_ver = sqlite3_column_int(stmt, PGVer);
op.obj.state.zone_short_id = sqlite3_column_int(stmt, ZoneShortID);
op.obj.state.objv_tracker.read_version.ver = sqlite3_column_int(stmt, ObjVersion);
GetObject = make_shared<SQLGetObject>(sdb, db_name, cct);
UpdateObject = make_shared<SQLUpdateObject>(sdb, db_name, cct);
ListBucketObjects = make_shared<SQLListBucketObjects>(sdb, db_name, cct);
+ ListVersionedObjects = make_shared<SQLListVersionedObjects>(sdb, db_name, cct);
PutObjectData = make_shared<SQLPutObjectData>(sdb, db_name, cct);
UpdateObjectData = make_shared<SQLUpdateObjectData>(sdb, db_name, cct);
GetObjectData = make_shared<SQLGetObjectData>(sdb, db_name, cct);
SQL_BIND_TEXT(dpp, ak_stmt, index, access_key.c_str(), sdb);
}
} else if (params->op.query_str == "user_id") {
- SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.tenant, sdb);
- SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.tenant.c_str(), sdb);
-
SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.user_id, sdb);
SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.id.c_str(), sdb);
-
- SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.ns, sdb);
- SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.ns.c_str(), sdb);
} else { // by default by userid
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.user.user_id, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.user.uinfo.user_id.id.c_str(), sdb);
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.has_data, sdb);
SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.has_data, sdb);
- SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_olh, sdb);
- SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.is_olh, sdb);
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_versioned, sdb);
+ SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_versioned, sdb);
- SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.olh_tag, sdb);
- SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.state.olh_tag, sdb);
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.version_num, sdb);
+ SQL_BIND_INT(dpp, stmt, index, params->op.obj.version_num, sdb);
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.pg_ver, sdb);
SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.pg_ver, sdb);
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.head_data, sdb);
SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.head_data, sdb);
-
out:
return rc;
}
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.has_data, sdb);
SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.has_data, sdb);
- SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_olh, sdb);
- SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.is_olh, sdb);
+ SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_versioned, sdb);
+ SQL_BIND_INT(dpp, *stmt, index, params->op.obj.is_versioned, sdb);
- SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.olh_tag, sdb);
- SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.state.olh_tag, sdb);
+ SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.version_num, sdb);
+ SQL_BIND_INT(dpp, *stmt, index, params->op.obj.version_num, sdb);
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.pg_ver, sdb);
SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.pg_ver, sdb);
return ret;
}
+int SQLListVersionedObjects::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+ int ret = -1;
+ struct DBOpPrepareParams p_params = PrepareParams;
+
+ if (!*sdb) {
+ ldpp_dout(dpp, 0)<<"In SQLListVersionedObjects - no db" << dendl;
+ goto out;
+ }
+
+ InitPrepareParams(dpp, p_params, params);
+
+ SQL_PREPARE(dpp, p_params, sdb, stmt, ret, "PrepareListVersionedObjects");
+
+out:
+ return ret;
+}
+
+int SQLListVersionedObjects::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+ int index = -1;
+ int rc = 0;
+ struct DBOpPrepareParams p_params = PrepareParams;
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
+ SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
+
+ SQL_BIND_INDEX(dpp, stmt, index, p_params.op.list_max_count, sdb);
+ SQL_BIND_INT(dpp, stmt, index, params->op.list_max_count, sdb);
+
+out:
+ return rc;
+}
+
+int SQLListVersionedObjects::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+ int ret = -1;
+
+ SQL_EXECUTE(dpp, params, stmt, list_object);
+out:
+ return ret;
+}
+
int SQLPutObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
{
int ret = -1;
int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
};
+class SQLListVersionedObjects : public SQLiteDB, public ListVersionedObjectsOp {
+ private:
+ sqlite3 **sdb = NULL;
+ sqlite3_stmt *stmt = NULL; // Prepared statement
+
+ public:
+ SQLListVersionedObjects(void **db, std::string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
+ SQLListVersionedObjects(sqlite3 **sdbi, std::string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
+
+ ~SQLListVersionedObjects() {
+ if (stmt)
+ sqlite3_finalize(stmt);
+ }
+ int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params);
+ int Execute(const DoutPrefixProvider *dpp, DBOpParams *params);
+ int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
+};
+
class SQLPutObjectData : public SQLiteDB, public PutObjectDataOp {
private:
sqlite3 **sdb = NULL;
ASSERT_EQ(ret, -2);
}
+TEST_F(DBStoreTest, WriteVersionedObject) {
+ struct DBOpParams params = GlobalParams;
+ int ret = -1;
+ std::string instances[] = {"inst1", "inst2", "inst3"};
+ bufferlist b1;
+
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+ params.op.obj.state.obj.key.name = "object1";
+
+ /* Write versioned objects */
+ DB::Object op_target(db, params.op.bucket.info, params.op.obj.state.obj);
+ DB::Object::Write write_op(&op_target);
+
+ /* Version1 */
+ params.op.obj.state.obj.key.instance = instances[0];
+ encode("HELLO WORLD", b1);
+ params.op.obj.head_data = b1;
+ params.op.obj.state.size = 12;
+ ret = write_op.write_versioned_obj(dpp, params);
+ ASSERT_EQ(ret, 0);
+
+ /* Version2 */
+ params.op.obj.state.obj.key.instance = instances[1];
+ b1.clear();
+ encode("HELLO WORLD ABC", b1);
+ params.op.obj.head_data = b1;
+ params.op.obj.state.size = 16;
+ ret = write_op.write_versioned_obj(dpp, params);
+ ASSERT_EQ(ret, 0);
+
+ /* Version3 */
+ params.op.obj.state.obj.key.instance = instances[2];
+ b1.clear();
+ encode("HELLO WORLD A", b1);
+ params.op.obj.head_data = b1;
+ params.op.obj.state.size = 14;
+ ret = write_op.write_versioned_obj(dpp, params);
+ ASSERT_EQ(ret, 0);
+}
+
+TEST_F(DBStoreTest, ListVersionedObject) {
+ struct DBOpParams params = GlobalParams;
+ int ret = -1;
+ std::string instances[] = {"inst1", "inst2", "inst3"};
+ int i = 0;
+
+ /* list versioned objects */
+ params.op.obj.state.obj.key.instance.clear();
+ params.op.list_max_count = MAX_VERSIONED_OBJECTS;
+ ret = db->ProcessOp(dpp, "ListVersionedObjects", ¶ms);
+ ASSERT_EQ(ret, 0);
+
+ i = 2;
+ for (auto ent: params.op.obj.list_entries) {
+ string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
+ cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is:current = " << is_current << "\n";
+
+ if (i == 2) {
+ ASSERT_EQ(is_current, "true");
+ } else {
+ ASSERT_EQ(is_current, "false");
+ }
+
+ ASSERT_EQ(ent.key.instance, instances[i]);
+ i--;
+ }
+}
+
+TEST_F(DBStoreTest, ReadVersionedObject) {
+ struct DBOpParams params = GlobalParams;
+ int ret = -1;
+ std::string instances[] = {"inst1", "inst2", "inst3"};
+ std::string data;
+
+ /* read object.. should fetch latest version */
+ RGWObjState* s;
+ params = GlobalParams;
+ params.op.obj.state.obj.key.instance.clear();
+ DB::Object op_target2(db, params.op.bucket.info, params.op.obj.state.obj);
+ ret = op_target2.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+ true, &s);
+ ASSERT_EQ(ret, 0);
+ ASSERT_EQ(s->obj.key.instance, instances[2]);
+ decode(data, s->data);
+ ASSERT_EQ(data, "HELLO WORLD A");
+ ASSERT_EQ(s->size, 14);
+
+ /* read a particular non-current version */
+ params.op.obj.state.obj.key.instance = instances[1];
+ DB::Object op_target3(db, params.op.bucket.info, params.op.obj.state.obj);
+ ret = op_target3.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+ true, &s);
+ ASSERT_EQ(ret, 0);
+ decode(data, s->data);
+ ASSERT_EQ(data, "HELLO WORLD ABC");
+ ASSERT_EQ(s->size, 16);
+}
+
+TEST_F(DBStoreTest, DeleteVersionedObject) {
+ struct DBOpParams params = GlobalParams;
+ int ret = -1;
+ std::string instances[] = {"inst1", "inst2", "inst3"};
+ std::string data;
+ std::string dm_instance;
+ int i = 0;
+
+ /* Delete object..should create delete marker */
+ params.op.obj.state.obj.key.instance.clear();
+ DB::Object op_target(db, params.op.bucket.info, params.op.obj.state.obj);
+ DB::Object::Delete delete_op(&op_target);
+ delete_op.params.versioning_status |= BUCKET_VERSIONED;
+
+ ret = delete_op.delete_obj(dpp);
+ ASSERT_EQ(ret, 0);
+
+ /* list versioned objects */
+ params = GlobalParams;
+ params.op.obj.state.obj.key.instance.clear();
+ params.op.list_max_count = MAX_VERSIONED_OBJECTS;
+ ret = db->ProcessOp(dpp, "ListVersionedObjects", ¶ms);
+
+ i = 3;
+ for (auto ent: params.op.obj.list_entries) {
+ string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
+ string is_delete_marker = (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)? "true" : "false";
+ cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_delete_marker = " << is_delete_marker << "\n";
+
+ if (i == 3) {
+ ASSERT_EQ(is_delete_marker, "true");
+ ASSERT_EQ(is_current, "true");
+ dm_instance = ent.key.instance;
+ } else {
+ ASSERT_EQ(is_current, "false");
+ ASSERT_EQ(is_delete_marker, "false");
+ ASSERT_EQ(ent.key.instance, instances[i]);
+ }
+
+ i--;
+ }
+
+ /* read object.. should return -ENOENT */
+ RGWObjState* s;
+ params = GlobalParams;
+ params.op.obj.state.obj.key.instance.clear();
+ DB::Object op_target2(db, params.op.bucket.info, params.op.obj.state.obj);
+ ret = op_target2.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+ true, &s);
+ ASSERT_EQ(ret, -ENOENT);
+
+ /* Delete delete marker..should be able to read object now */
+ params.op.obj.state.obj.key.instance = dm_instance;
+ DB::Object op_target3(db, params.op.bucket.info, params.op.obj.state.obj);
+ DB::Object::Delete delete_op2(&op_target3);
+ delete_op2.params.versioning_status |= BUCKET_VERSIONED;
+
+ ret = delete_op2.delete_obj(dpp);
+ ASSERT_EQ(ret, 0);
+
+ /* read object.. should fetch latest version */
+ params = GlobalParams;
+ params.op.obj.state.obj.key.instance.clear();
+ DB::Object op_target4(db, params.op.bucket.info, params.op.obj.state.obj);
+ ret = op_target4.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+ true, &s);
+ ASSERT_EQ(s->obj.key.instance, instances[2]);
+ decode(data, s->data);
+ ASSERT_EQ(data, "HELLO WORLD A");
+ ASSERT_EQ(s->size, 14);
+
+ /* delete latest version using version-id. Next version should get promoted */
+ params.op.obj.state.obj.key.instance = instances[2];
+ DB::Object op_target5(db, params.op.bucket.info, params.op.obj.state.obj);
+ DB::Object::Delete delete_op3(&op_target5);
+ delete_op3.params.versioning_status |= BUCKET_VERSIONED;
+
+ ret = delete_op3.delete_obj(dpp);
+ ASSERT_EQ(ret, 0);
+
+ /* list versioned objects..only two versions should be present
+ * with second version marked as CURRENT */
+ params = GlobalParams;
+ params.op.obj.state.obj.key.instance.clear();
+ params.op.list_max_count = MAX_VERSIONED_OBJECTS;
+ ret = db->ProcessOp(dpp, "ListVersionedObjects", ¶ms);
+
+ i = 1;
+ for (auto ent: params.op.obj.list_entries) {
+ string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
+ cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_current = " << is_current << "\n";
+
+ if (i == 1) {
+ ASSERT_EQ(is_current, "true");
+ dm_instance = ent.key.instance;
+ } else {
+ ASSERT_EQ(is_current, "false");
+ ASSERT_EQ(ent.key.instance, instances[i]);
+ }
+
+ i--;
+ }
+
+}
+
TEST_F(DBStoreTest, ObjectOmapSetVal) {
struct DBOpParams params = GlobalParams;
int ret = -1;