From b43a755acc6a31a538f53c2da5e19e0ee803f2ee Mon Sep 17 00:00:00 2001 From: Soumya Koduri Date: Wed, 25 May 2022 14:00:34 +0530 Subject: [PATCH] rgw/dbstore: Object versioning feature support This commit adds support for Object versioning feature in DBStore In DBStore, each object is uniquely identified by In addition, for each object upload, a unique objectID is created to handle racing writes. Note: For non-versioned objects, both head and tail entries have instance-id empty "" Versioned objects: - same as non-versioned objects but with instance-id & objectID set to version-id i.e, each version upload will have a unique versionID created which will act as that object's intanceID and objectID as well. - In addition a version-number is stored (starting with '1' & incremented sequentially) for each version/delete-marker being created for that object. This version-number is used to identify and promote/demote next object version as CURRENT. Current status: - Basic functionality seem to be working when the bucket is versioned. - If an object is removed, only one delete-marker is created for now. Subsequent deletes will be no-op (unless the object is re-created & deleted with the same name) - Added test cases to test put/get/delete/list of versioned objects TODO (not addressed as part of this PR): - Test various cases with versioning suspended & enabled. Signed-off-by: Soumya Koduri --- src/rgw/store/dbstore/common/dbstore.cc | 513 +++++++++++++------ src/rgw/store/dbstore/common/dbstore.h | 97 +++- src/rgw/store/dbstore/sqlite/sqliteDB.cc | 86 +++- src/rgw/store/dbstore/sqlite/sqliteDB.h | 18 + src/rgw/store/dbstore/tests/dbstore_tests.cc | 203 ++++++++ 5 files changed, 698 insertions(+), 219 deletions(-) diff --git a/src/rgw/store/dbstore/common/dbstore.cc b/src/rgw/store/dbstore/common/dbstore.cc index ad71cd0bcbd..c9217a76060 100644 --- a/src/rgw/store/dbstore/common/dbstore.cc +++ b/src/rgw/store/dbstore/common/dbstore.cc @@ -151,6 +151,8 @@ std::shared_ptr DB::getDBOp(const DoutPrefixProvider *dpp, std::stri return Ob->UpdateObject; if (!Op.compare("ListBucketObjects")) return Ob->ListBucketObjects; + if (!Op.compare("ListVersionedObjects")) + return Ob->ListVersionedObjects; if (!Op.compare("PutObjectData")) return Ob->PutObjectData; if (!Op.compare("UpdateObjectData")) @@ -766,14 +768,22 @@ int DB::Bucket::List::list_objects(const DoutPrefixProvider *dpp, int64_t max, goto out; } - for (auto& entry : db_params.op.obj.list_entries) { + if (!params.list_versions) { + if (!(entry.flags & rgw_bucket_dir_entry::FLAG_CURRENT) || + (entry.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)) { + // skip all non-current entries and delete_marker + continue; + } + } + if (count >= max) { *is_truncated = true; next_marker.name = entry.key.name; next_marker.instance = entry.key.instance; break; } + if (!params.delim.empty()) { const std::string& objname = entry.key.name; const int delim_pos = objname.find(params.delim, params.prefix.size()); @@ -845,6 +855,26 @@ int DB::Object::InitializeParamsfromObject(const DoutPrefixProvider *dpp, return ret; } +int DB::Object::get_object_impl(const DoutPrefixProvider *dpp, DBOpParams& params) { + int ret = 0; + + if (params.op.obj.state.obj.key.name.empty()) { + /* Initialize */ + store->InitializeParams(dpp, ¶ms); + InitializeParamsfromObject(dpp, ¶ms); + } + + ret = store->ProcessOp(dpp, "GetObject", ¶ms); + + /* pick one field check if object exists */ + if (!ret && !params.op.obj.state.exists) { + ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl; + ret = -ENOENT; + } + + return ret; +} + int DB::Object::obj_omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val, bool must_exist) { @@ -852,22 +882,13 @@ int DB::Object::obj_omap_set_val_by_key(const DoutPrefixProvider *dpp, DBOpParams params = {}; - store->InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); if (ret) { - ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); if (ret) { - ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); if (ret) { - ldpp_dout(dpp, 0) <<"In GetObject failed err:(" < omap; - store->InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); if (ret) { - ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); if (ret) { - ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); if (ret) { - ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <::iterator iter; - store->InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); if (ret) { - ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <attrset.find(RGW_ATTR_OLH_INFO); - if (iter == state->attrset.end()) { - return -EINVAL; - } - - DBOLHInfo olh; - string s; - const bufferlist& bl = iter->second; - try { - auto biter = bl.cbegin(); - decode(olh, biter); - } catch (buffer::error& err) { - return -EIO; - } - - if (olh.removed) { - return -ENOENT; - } - - *target = olh.target; +int DB::Object::list_versioned_objects(const DoutPrefixProvider *dpp, + std::list& list_entries) { + int ret = 0; + store = get_store(); + DBOpParams db_params = {}; - return 0; -} + store->InitializeParams(dpp, &db_params); + InitializeParamsfromObject(dpp, &db_params); -int DB::Object::get_olh_target_state(const DoutPrefixProvider *dpp, - const RGWBucketInfo& bucket_info, const rgw_obj& obj, - RGWObjState* olh_state, RGWObjState** target) -{ - int ret = 0; - rgw_obj target_obj; + db_params.op.list_max_count = MAX_VERSIONED_OBJECTS; - if (!olh_state->is_olh) { - return EINVAL; - } + ret = store->ProcessOp(dpp, "ListVersionedObjects", &db_params); - ret = follow_olh(dpp, bucket_info, olh_state, obj, &target_obj); /* might return -EAGAIN */ - if (ret < 0) { - ldpp_dout(dpp, 0)<<"In get_olh_target_state follow_olh() failed err:(" <InitializeParams(dpp, ¶ms); - InitializeParamsfromObject(dpp, ¶ms); - - ret = store->ProcessOp(dpp, "GetObject", ¶ms); + ret = get_object_impl(dpp, params); - if (ret) { - ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <get_state(dpp, &astate, true); if (r < 0) return r; @@ -1640,11 +1588,16 @@ int DB::Object::Write::prepare(const DoutPrefixProvider* dpp) obj_state.obj = target->obj; - if (target->obj_id.empty()) { - // generate obj_id - char buf[33]; - gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); - target->obj_id = target->obj.key.name + "." + buf; + if (target->obj_id.empty()) { + if (!target->obj.key.instance.empty() && (target->obj.key.instance != "null")) { + /* versioned object. Set obj_id same as versionID/instance */ + target->obj_id = target->obj.key.instance; + } else { + // generate obj_id + char buf[33]; + gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); + target->obj_id = buf; + } } ret = 0; @@ -1699,6 +1652,109 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp, return 0; } +/* + * If versioned, + * - check if the old version of the object already exists + * if exists, + * - set its version-id/instance to "null" if instance was empty + * - demote it to NON_CURRENT + * - create versioned object with FLAG_CURRENT set + * + * XXX: call this function under a lock or sqlite transaction to prevent + * parallel object uploads marking both the versions as FLAG_CURRENT; + * + */ +int DB::Object::Write::write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params) { + DB *store = target->get_store(); + int ret = -1; + DBOpParams obj_params = {}; + uint64_t version_num = 0; + + store->InitializeParams(dpp, &obj_params); + target->InitializeParamsfromObject(dpp, &obj_params); + + /* Check if already exists */ + obj_params.op.obj.state.obj.key.instance.clear(); + ret = target->list_versioned_objects(dpp, obj_params.op.obj.list_entries); + + if (ret && ret != -ENOENT) { + ldpp_dout(dpp, 0)<<"ListVersionedObjects of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <update_obj_next_version(dpp, ent, false, version_num); // demote previous version + if (ret != ENOENT) { + break; + } + // continue to next object version + } + + if (ret) { + ldpp_dout(dpp, 0)<<"demote_next_version of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <ProcessOp(dpp, "PutObject", ¶ms); + if (ret) { + ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <InitializeParams(dpp, ¶ms); + params.op.bucket.info.bucket.name = bucket_info.bucket.name; + + ret = get_object_impl(dpp, params); + + if (ret) { // do nothing as its best effort? + ldpp_dout(dpp, 0)<<"In demote_object_next_ver get of prev version(" << params.op.obj.obj_id << ") failed err:(" <ProcessOp(dpp, "PutObject", ¶ms); // XXX: can be UpdateObject? + + if (ret) { // do nothing as its best effort? + ldpp_dout(dpp, 0)<<"Put of Old version of object failed err:(" <is_olh) { - (*attrset)[RGW_ATTR_OLH_ID_TAG] = state->olh_tag; - } - state->mtime = meta.set_mtime; if (meta.data) { @@ -1804,22 +1856,31 @@ int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp, params.op.obj.state.accounted_size = accounted_size; params.op.obj.owner = target->get_bucket_info().owner.id; - /* XXX: handle versioning */ if (meta.mtime) { *meta.mtime = meta.set_mtime; } params.op.query_str = "meta"; params.op.obj.obj_id = target->obj_id; - ret = store->ProcessOp(dpp, "PutObject", ¶ms); - if (ret) { - ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <obj.key.instance.empty() && (target->obj.key.instance != "null"); + params.op.obj.is_versioned = is_versioned; + + if (!is_versioned) { + ret = store->ProcessOp(dpp, "PutObject", ¶ms); + if (ret) { + ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <get_store(); - RGWObjState* astate; + DBOpParams del_params = {}; - int r = target->get_state(dpp, &astate, true); - if (r < 0) - return r; + ret = target->get_object_impl(dpp, del_params); - if (!astate->exists) { - return -ENOENT; + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0)<<"GetObject during delete failed err:(" <list_versioned_objects(dpp, del_params.op.obj.list_entries); + + if (!del_params.op.obj.list_entries.empty()) { + ret = delete_versioned_obj(dpp, del_params); + } else { + ret = -ENOENT; + } + + return ret; +} - store->InitializeParams(dpp, &del_params); - target->InitializeParamsfromObject(dpp, &del_params); +int DB::Object::Delete::delete_obj_impl(const DoutPrefixProvider *dpp, + DBOpParams& del_params) { + int ret = 0; + DB *store = target->get_store(); ret = store->ProcessOp(dpp, "DeleteObject", &del_params); if (ret) { @@ -1876,16 +1951,117 @@ int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) { * its corresponding head object is deleted (like here in this case). */ DBOpParams update_params = del_params; - update_params.op.obj.obj_id = astate->shadow_obj; // objectID is copied here in get_state() update_params.op.obj.state.mtime = real_clock::now(); ret = store->ProcessOp(dpp, "UpdateObjectData", &update_params); if (ret) { ldpp_dout(dpp, 0) << "Updating tail objects mtime failed err:(" <get_store(); + bool versioning_enabled = (params.versioning_status & BUCKET_VERSIONED); + int ret = -1; + DBOpParams olh_params = {}; + std::string version_id; + DBOpParams next_params = del_params; + + version_id = del_params.op.obj.state.obj.key.instance; + auto &ent = del_params.op.obj.list_entries.front(); + uint64_t version_num = MAX_VERSIONED_OBJECTS; + + if (version_id.empty()) { + /* create delete marker */ + + if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) { + // for now do not create another delete marker..just exit + return 0; + } + DBOpParams dm_params = del_params; + + //demote current entry + target->update_obj_next_version(dpp, ent, false, version_num); //XXX: not checking return status + + // create delete marker + store->InitializeParams(dpp, &dm_params); + target->InitializeParamsfromObject(dpp, &dm_params); + + if (!versioning_enabled) { + dm_params.op.obj.state.obj.key.instance = "null"; + } else { + store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key); + dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance; + } + + dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER); + dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_CURRENT); + dm_params.op.obj.version_num = ++version_num; // max obj version list + + ret = store->ProcessOp(dpp, "PutObject", &dm_params); + + if (ret) { + ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <InitializeParams(dpp, &dm_params); + target->InitializeParamsfromObject(dpp, &dm_params); + + dm_params.op.obj.state.obj.key.instance = version_id; + + ret = target->get_object_impl(dpp, dm_params); + + if (ret < 0) { + ldpp_dout(dpp, 0) << "get obj of versioned object failed - err:(" <::iterator e; + for (e = del_params.op.obj.list_entries.begin(); e != del_params.op.obj.list_entries.end(); e++) { + + if (e->key.instance == version_id) { + e++; + if (e != del_params.op.obj.list_entries.end()) { + found = true; + } + break; + } + } + if (found) { + uint64_t version_num = 0; + rgw_bucket_dir_entry& e1 = *e; + target->update_obj_next_version(dpp, e1, true, version_num); + } + } + } + + return ret; } int DB::get_entry(const std::string& oid, const std::string& marker, @@ -2089,6 +2265,7 @@ int DB::delete_stale_objs(const DoutPrefixProvider *dpp, const std::string& buck ret = ProcessOp(dpp, "GetBucket", ¶ms); if (ret) { ldpp_dout(dpp, 0) << "In GetBucket failed err:(" < list_entries; /* XXX: Maybe use std::vector instead of std::list */ + + /* for versioned objects */ + bool is_versioned; + uint64_t version_num = 1; // default value for plain entries (non-versioned) }; struct DBOpObjectDataInfo { @@ -259,8 +263,8 @@ struct DBOpObjectPrepareInfo { static constexpr const char* fake_tag = ":fake_tag"; static constexpr const char* shadow_obj = ":shadow_obj"; static constexpr const char* has_data = ":has_data"; - static constexpr const char* is_olh = ":is_ols"; - static constexpr const char* olh_tag = ":olh_tag"; + static constexpr const char* is_versioned = ":is_versioned"; + static constexpr const char* version_num = ":version_num"; static constexpr const char* pg_ver = ":pg_ver"; static constexpr const char* zone_short_id = ":zone_short_id"; static constexpr const char* obj_version = ":obj_version"; @@ -288,6 +292,7 @@ struct DBOpObjectPrepareInfo { static constexpr const char* new_obj_name = ":new_obj_name"; static constexpr const char* new_obj_instance = ":new_obj_instance"; static constexpr const char* new_obj_ns = ":new_obj_ns"; + static constexpr const char* versions = ":versions"; }; struct DBOpObjectDataPrepareInfo { @@ -369,6 +374,7 @@ class ObjectOp { std::shared_ptr GetObject; std::shared_ptr UpdateObject; std::shared_ptr ListBucketObjects; + std::shared_ptr ListVersionedObjects; std::shared_ptr PutObjectData; std::shared_ptr UpdateObjectData; std::shared_ptr GetObjectData; @@ -542,8 +548,8 @@ class DBOp { FakeTag BOOL, \ ShadowObj TEXT, \ HasData BOOL, \ - IsOLH BOOL, \ - OLHTag BLOB, \ + IsVersioned BOOL, \ + VersionNum INTEGER, \ PGVer INTEGER, \ ZoneShortID INTEGER, \ ObjVersion INTEGER, \ @@ -563,6 +569,7 @@ class DBOp { IsMultipart BOOL, \ MPPartsList BLOB, \ HeadData BLOB, \ + VERSIONS BLOB, \ PRIMARY KEY (ObjName, ObjInstance, BucketName), \ FOREIGN KEY (BucketName) \ REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);"; @@ -784,7 +791,7 @@ class GetUserOp: virtual public DBOp { System, PlacementName, PlacementStorageClass, PlacementTags, \ BucketQuota, TempURLKeys, UserQuota, Type, MfaIDs, AssumedRoleARN, \ UserAttrs, UserVersion, UserVersionTag \ - from '{}' where Tenant = {} and UserID = {} and NS = {}"; + from '{}' where UserID = {}"; public: virtual ~GetUserOp() {} @@ -800,9 +807,7 @@ class GetUserOp: virtual public DBOp { } else if (params.op.query_str == "user_id") { return fmt::format(QueryByUserID, params.user_table, - params.op.user.tenant, - params.op.user.user_id, - params.op.user.ns); + params.op.user.user_id); } else { return fmt::format(Query, params.user_table, params.op.user.user_id); @@ -985,13 +990,14 @@ class PutObjectOp: virtual public DBOp { Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \ StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \ AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \ - ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \ + ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \ ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \ ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \ TailPlacementRuleName, TailPlacementStorageClass, \ - ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData ) \ + ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \ + HeadData, Versions) \ VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \ - {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \ + {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \ {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})"; public: @@ -1010,7 +1016,7 @@ class PutObjectOp: virtual public DBOp { params.op.obj.accounted_size, params.op.obj.mtime, params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag, params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj, - params.op.obj.has_data, params.op.obj.is_olh, params.op.obj.olh_tag, + params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num, params.op.obj.pg_ver, params.op.obj.zone_short_id, params.op.obj.obj_version, params.op.obj.obj_version_tag, params.op.obj.obj_attrs, params.op.obj.head_size, @@ -1022,7 +1028,8 @@ class PutObjectOp: virtual public DBOp { params.op.obj.tail_placement_storage_class, params.op.obj.manifest_part_objs, params.op.obj.manifest_part_rules, params.op.obj.omap, - params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data); + params.op.obj.is_multipart, params.op.obj.mp_parts, + params.op.obj.head_data, params.op.obj.versions); } }; @@ -1050,11 +1057,12 @@ class GetObjectOp: virtual public DBOp { Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \ StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \ AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \ - ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \ + ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \ ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \ ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \ TailPlacementRuleName, TailPlacementStorageClass, \ - ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \ + ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \ + HeadData, Versions from '{}' \ where BucketName = {} and ObjName = {} and ObjInstance = {}"; public: @@ -1079,7 +1087,7 @@ class ListBucketObjectsOp: virtual public DBOp { Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \ StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \ AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \ - ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \ + ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \ ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \ ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \ TailPlacementRuleName, TailPlacementStorageClass, \ @@ -1099,6 +1107,37 @@ class ListBucketObjectsOp: virtual public DBOp { } }; +#define MAX_VERSIONED_OBJECTS 20 +class ListVersionedObjectsOp: virtual public DBOp { + private: + // once we have stats also stored, may have to update this query to join + // these two tables. + static constexpr std::string_view Query = + "SELECT \ + ObjName, ObjInstance, ObjNS, BucketName, ACLs, IndexVer, Tag, \ + Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \ + StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \ + AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \ + ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \ + ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \ + ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \ + TailPlacementRuleName, TailPlacementStorageClass, \ + ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \ + HeadData, Versions from '{}' \ + where BucketName = {} and ObjName = {} ORDER BY VersionNum DESC LIMIT {}"; + public: + virtual ~ListVersionedObjectsOp() {} + + static std::string Schema(DBOpPrepareParams ¶ms) { + /* XXX: Include obj_id, delim */ + return fmt::format(Query, + params.object_table, + params.op.bucket.bucket_name, + params.op.obj.obj_name, + params.op.list_max_count); + } +}; + class UpdateObjectOp: virtual public DBOp { private: // Updates Omap @@ -1118,13 +1157,13 @@ class UpdateObjectOp: virtual public DBOp { StorageClass = {}, Appendable = {}, ContentType = {}, \ IndexHashSource = {}, ObjSize = {}, AccountedSize = {}, Mtime = {}, \ Epoch = {}, ObjTag = {}, TailTag = {}, WriteTag = {}, FakeTag = {}, \ - ShadowObj = {}, HasData = {}, IsOLH = {}, OLHTag = {}, PGVer = {}, \ + ShadowObj = {}, HasData = {}, IsVersioned = {}, VersionNum = {}, PGVer = {}, \ ZoneShortID = {}, ObjVersion = {}, ObjVersionTag = {}, ObjAttrs = {}, \ HeadSize = {}, MaxHeadSize = {}, ObjID = {}, TailInstance = {}, \ HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \ TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \ ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \ - IsMultipart = {}, MPPartsList = {}, HeadData = {} \ + IsMultipart = {}, MPPartsList = {}, HeadData = {}, Versions = {} \ WHERE ObjName = {} and ObjInstance = {} and BucketName = {}"; public: @@ -1167,7 +1206,7 @@ class UpdateObjectOp: virtual public DBOp { params.op.obj.accounted_size, params.op.obj.mtime, params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag, params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj, - params.op.obj.has_data, params.op.obj.is_olh, params.op.obj.olh_tag, + params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num, params.op.obj.pg_ver, params.op.obj.zone_short_id, params.op.obj.obj_version, params.op.obj.obj_version_tag, params.op.obj.obj_attrs, params.op.obj.head_size, @@ -1179,7 +1218,8 @@ class UpdateObjectOp: virtual public DBOp { params.op.obj.tail_placement_storage_class, params.op.obj.manifest_part_objs, params.op.obj.manifest_part_rules, params.op.obj.omap, - params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data, + params.op.obj.is_multipart, params.op.obj.mp_parts, + params.op.obj.head_data, params.op.obj.versions, params.op.obj.obj_name, params.op.obj.obj_instance, params.op.bucket.bucket_name); } @@ -1233,6 +1273,7 @@ class UpdateObjectDataOp: virtual public DBOp { params.op.obj.obj_id); } }; + class GetObjectDataOp: virtual public DBOp { private: static constexpr std::string_view Query = @@ -1853,6 +1894,7 @@ class DB { bool assume_noent, bool modify_tail); int write_meta(const DoutPrefixProvider *dpp, uint64_t size, uint64_t accounted_size, std::map& attrs); + int write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params); }; struct Delete { @@ -1887,19 +1929,19 @@ class DB { explicit Delete(DB::Object *_target) : target(_target) {} int delete_obj(const DoutPrefixProvider *dpp); + int delete_obj_impl(const DoutPrefixProvider *dpp, DBOpParams& del_params); + int delete_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& del_params); }; /* XXX: the parameters may be subject to change. All we need is bucket name * & obj name,instance - keys */ + int get_object_impl(const DoutPrefixProvider *dpp, DBOpParams& params); int get_obj_state(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, bool follow_olh, RGWObjState **state); - int get_olh_target_state(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, - RGWObjState* olh_state, RGWObjState** target); - int follow_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState *state, - const rgw_obj& olh_obj, rgw_obj *target); - int get_state(const DoutPrefixProvider *dpp, RGWObjState **pstate, bool follow_olh); + int list_versioned_objects(const DoutPrefixProvider *dpp, + std::list& list_entries); DB *get_store() { return store; } rgw_obj& get_obj() { return obj; } @@ -1927,6 +1969,9 @@ class DB { const RGWBucketInfo& bucket_info, const rgw_obj& obj, off_t ofs, off_t end, uint64_t max_chunk_size, iterate_obj_cb cb, void *arg); + int update_obj_next_version(const DoutPrefixProvider *dpp, + rgw_bucket_dir_entry &obj_entry, + bool promote, uint64_t& version_num); }; int get_obj_iterate_cb(const DoutPrefixProvider *dpp, const raw_obj& read_obj, off_t obj_ofs, diff --git a/src/rgw/store/dbstore/sqlite/sqliteDB.cc b/src/rgw/store/dbstore/sqlite/sqliteDB.cc index 649c644bebf..f0aa39aac8b 100644 --- a/src/rgw/store/dbstore/sqlite/sqliteDB.cc +++ b/src/rgw/store/dbstore/sqlite/sqliteDB.cc @@ -42,12 +42,7 @@ using namespace std; #define SQL_BIND_TEXT(dpp, stmt, index, str, sdb) \ do { \ - if (strcmp(str, "null") == 0) { \ - rc = sqlite3_bind_text(stmt, index, "", -1, SQLITE_TRANSIENT); \ - } else { \ rc = sqlite3_bind_text(stmt, index, str, -1, SQLITE_TRANSIENT); \ - } \ - \ if (rc != SQLITE_OK) { \ ldpp_dout(dpp, 0)<<"sqlite bind text failed for index(" \ <(sdb, db_name, cct); UpdateObject = make_shared(sdb, db_name, cct); ListBucketObjects = make_shared(sdb, db_name, cct); + ListVersionedObjects = make_shared(sdb, db_name, cct); PutObjectData = make_shared(sdb, db_name, cct); UpdateObjectData = make_shared(sdb, db_name, cct); GetObjectData = make_shared(sdb, db_name, cct); @@ -1270,14 +1267,8 @@ int SQLGetUser::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params) SQL_BIND_TEXT(dpp, ak_stmt, index, access_key.c_str(), sdb); } } else if (params->op.query_str == "user_id") { - SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.tenant, sdb); - SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.tenant.c_str(), sdb); - SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.user_id, sdb); SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.id.c_str(), sdb); - - SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.ns, sdb); - SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.ns.c_str(), sdb); } else { // by default by userid SQL_BIND_INDEX(dpp, stmt, index, p_params.op.user.user_id, sdb); SQL_BIND_TEXT(dpp, stmt, index, params->op.user.uinfo.user_id.id.c_str(), sdb); @@ -1853,11 +1844,11 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params) SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.has_data, sdb); SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.has_data, sdb); - SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_olh, sdb); - SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.is_olh, sdb); + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_versioned, sdb); + SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_versioned, sdb); - SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.olh_tag, sdb); - SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.state.olh_tag, sdb); + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.version_num, sdb); + SQL_BIND_INT(dpp, stmt, index, params->op.obj.version_num, sdb); SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.pg_ver, sdb); SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.pg_ver, sdb); @@ -1916,7 +1907,6 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params) SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.head_data, sdb); SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.head_data, sdb); - out: return rc; } @@ -2167,11 +2157,11 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.has_data, sdb); SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.has_data, sdb); - SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_olh, sdb); - SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.is_olh, sdb); + SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_versioned, sdb); + SQL_BIND_INT(dpp, *stmt, index, params->op.obj.is_versioned, sdb); - SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.olh_tag, sdb); - SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.state.olh_tag, sdb); + SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.version_num, sdb); + SQL_BIND_INT(dpp, *stmt, index, params->op.obj.version_num, sdb); SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.pg_ver, sdb); SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.pg_ver, sdb); @@ -2308,6 +2298,52 @@ out: return ret; } +int SQLListVersionedObjects::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params) +{ + int ret = -1; + struct DBOpPrepareParams p_params = PrepareParams; + + if (!*sdb) { + ldpp_dout(dpp, 0)<<"In SQLListVersionedObjects - no db" << dendl; + goto out; + } + + InitPrepareParams(dpp, p_params, params); + + SQL_PREPARE(dpp, p_params, sdb, stmt, ret, "PrepareListVersionedObjects"); + +out: + return ret; +} + +int SQLListVersionedObjects::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params) +{ + int index = -1; + int rc = 0; + struct DBOpPrepareParams p_params = PrepareParams; + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb); + SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb); + SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb); + + SQL_BIND_INDEX(dpp, stmt, index, p_params.op.list_max_count, sdb); + SQL_BIND_INT(dpp, stmt, index, params->op.list_max_count, sdb); + +out: + return rc; +} + +int SQLListVersionedObjects::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *params) +{ + int ret = -1; + + SQL_EXECUTE(dpp, params, stmt, list_object); +out: + return ret; +} + int SQLPutObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params) { int ret = -1; diff --git a/src/rgw/store/dbstore/sqlite/sqliteDB.h b/src/rgw/store/dbstore/sqlite/sqliteDB.h index 836a8702a03..444400b04eb 100644 --- a/src/rgw/store/dbstore/sqlite/sqliteDB.h +++ b/src/rgw/store/dbstore/sqlite/sqliteDB.h @@ -327,6 +327,24 @@ class SQLListBucketObjects : public SQLiteDB, public ListBucketObjectsOp { int Bind(const DoutPrefixProvider *dpp, DBOpParams *params); }; +class SQLListVersionedObjects : public SQLiteDB, public ListVersionedObjectsOp { + private: + sqlite3 **sdb = NULL; + sqlite3_stmt *stmt = NULL; // Prepared statement + + public: + SQLListVersionedObjects(void **db, std::string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {} + SQLListVersionedObjects(sqlite3 **sdbi, std::string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {} + + ~SQLListVersionedObjects() { + if (stmt) + sqlite3_finalize(stmt); + } + int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params); + int Execute(const DoutPrefixProvider *dpp, DBOpParams *params); + int Bind(const DoutPrefixProvider *dpp, DBOpParams *params); +}; + class SQLPutObjectData : public SQLiteDB, public PutObjectDataOp { private: sqlite3 **sdb = NULL; diff --git a/src/rgw/store/dbstore/tests/dbstore_tests.cc b/src/rgw/store/dbstore/tests/dbstore_tests.cc index f99383b6218..2e8c535f648 100644 --- a/src/rgw/store/dbstore/tests/dbstore_tests.cc +++ b/src/rgw/store/dbstore/tests/dbstore_tests.cc @@ -895,6 +895,209 @@ TEST_F(DBStoreTest, DeleteObj) { ASSERT_EQ(ret, -2); } +TEST_F(DBStoreTest, WriteVersionedObject) { + struct DBOpParams params = GlobalParams; + int ret = -1; + std::string instances[] = {"inst1", "inst2", "inst3"}; + bufferlist b1; + + params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT; + params.op.obj.state.obj.key.name = "object1"; + + /* Write versioned objects */ + DB::Object op_target(db, params.op.bucket.info, params.op.obj.state.obj); + DB::Object::Write write_op(&op_target); + + /* Version1 */ + params.op.obj.state.obj.key.instance = instances[0]; + encode("HELLO WORLD", b1); + params.op.obj.head_data = b1; + params.op.obj.state.size = 12; + ret = write_op.write_versioned_obj(dpp, params); + ASSERT_EQ(ret, 0); + + /* Version2 */ + params.op.obj.state.obj.key.instance = instances[1]; + b1.clear(); + encode("HELLO WORLD ABC", b1); + params.op.obj.head_data = b1; + params.op.obj.state.size = 16; + ret = write_op.write_versioned_obj(dpp, params); + ASSERT_EQ(ret, 0); + + /* Version3 */ + params.op.obj.state.obj.key.instance = instances[2]; + b1.clear(); + encode("HELLO WORLD A", b1); + params.op.obj.head_data = b1; + params.op.obj.state.size = 14; + ret = write_op.write_versioned_obj(dpp, params); + ASSERT_EQ(ret, 0); +} + +TEST_F(DBStoreTest, ListVersionedObject) { + struct DBOpParams params = GlobalParams; + int ret = -1; + std::string instances[] = {"inst1", "inst2", "inst3"}; + int i = 0; + + /* list versioned objects */ + params.op.obj.state.obj.key.instance.clear(); + params.op.list_max_count = MAX_VERSIONED_OBJECTS; + ret = db->ProcessOp(dpp, "ListVersionedObjects", ¶ms); + ASSERT_EQ(ret, 0); + + i = 2; + for (auto ent: params.op.obj.list_entries) { + string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false"; + cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is:current = " << is_current << "\n"; + + if (i == 2) { + ASSERT_EQ(is_current, "true"); + } else { + ASSERT_EQ(is_current, "false"); + } + + ASSERT_EQ(ent.key.instance, instances[i]); + i--; + } +} + +TEST_F(DBStoreTest, ReadVersionedObject) { + struct DBOpParams params = GlobalParams; + int ret = -1; + std::string instances[] = {"inst1", "inst2", "inst3"}; + std::string data; + + /* read object.. should fetch latest version */ + RGWObjState* s; + params = GlobalParams; + params.op.obj.state.obj.key.instance.clear(); + DB::Object op_target2(db, params.op.bucket.info, params.op.obj.state.obj); + ret = op_target2.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj, + true, &s); + ASSERT_EQ(ret, 0); + ASSERT_EQ(s->obj.key.instance, instances[2]); + decode(data, s->data); + ASSERT_EQ(data, "HELLO WORLD A"); + ASSERT_EQ(s->size, 14); + + /* read a particular non-current version */ + params.op.obj.state.obj.key.instance = instances[1]; + DB::Object op_target3(db, params.op.bucket.info, params.op.obj.state.obj); + ret = op_target3.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj, + true, &s); + ASSERT_EQ(ret, 0); + decode(data, s->data); + ASSERT_EQ(data, "HELLO WORLD ABC"); + ASSERT_EQ(s->size, 16); +} + +TEST_F(DBStoreTest, DeleteVersionedObject) { + struct DBOpParams params = GlobalParams; + int ret = -1; + std::string instances[] = {"inst1", "inst2", "inst3"}; + std::string data; + std::string dm_instance; + int i = 0; + + /* Delete object..should create delete marker */ + params.op.obj.state.obj.key.instance.clear(); + DB::Object op_target(db, params.op.bucket.info, params.op.obj.state.obj); + DB::Object::Delete delete_op(&op_target); + delete_op.params.versioning_status |= BUCKET_VERSIONED; + + ret = delete_op.delete_obj(dpp); + ASSERT_EQ(ret, 0); + + /* list versioned objects */ + params = GlobalParams; + params.op.obj.state.obj.key.instance.clear(); + params.op.list_max_count = MAX_VERSIONED_OBJECTS; + ret = db->ProcessOp(dpp, "ListVersionedObjects", ¶ms); + + i = 3; + for (auto ent: params.op.obj.list_entries) { + string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false"; + string is_delete_marker = (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)? "true" : "false"; + cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_delete_marker = " << is_delete_marker << "\n"; + + if (i == 3) { + ASSERT_EQ(is_delete_marker, "true"); + ASSERT_EQ(is_current, "true"); + dm_instance = ent.key.instance; + } else { + ASSERT_EQ(is_current, "false"); + ASSERT_EQ(is_delete_marker, "false"); + ASSERT_EQ(ent.key.instance, instances[i]); + } + + i--; + } + + /* read object.. should return -ENOENT */ + RGWObjState* s; + params = GlobalParams; + params.op.obj.state.obj.key.instance.clear(); + DB::Object op_target2(db, params.op.bucket.info, params.op.obj.state.obj); + ret = op_target2.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj, + true, &s); + ASSERT_EQ(ret, -ENOENT); + + /* Delete delete marker..should be able to read object now */ + params.op.obj.state.obj.key.instance = dm_instance; + DB::Object op_target3(db, params.op.bucket.info, params.op.obj.state.obj); + DB::Object::Delete delete_op2(&op_target3); + delete_op2.params.versioning_status |= BUCKET_VERSIONED; + + ret = delete_op2.delete_obj(dpp); + ASSERT_EQ(ret, 0); + + /* read object.. should fetch latest version */ + params = GlobalParams; + params.op.obj.state.obj.key.instance.clear(); + DB::Object op_target4(db, params.op.bucket.info, params.op.obj.state.obj); + ret = op_target4.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj, + true, &s); + ASSERT_EQ(s->obj.key.instance, instances[2]); + decode(data, s->data); + ASSERT_EQ(data, "HELLO WORLD A"); + ASSERT_EQ(s->size, 14); + + /* delete latest version using version-id. Next version should get promoted */ + params.op.obj.state.obj.key.instance = instances[2]; + DB::Object op_target5(db, params.op.bucket.info, params.op.obj.state.obj); + DB::Object::Delete delete_op3(&op_target5); + delete_op3.params.versioning_status |= BUCKET_VERSIONED; + + ret = delete_op3.delete_obj(dpp); + ASSERT_EQ(ret, 0); + + /* list versioned objects..only two versions should be present + * with second version marked as CURRENT */ + params = GlobalParams; + params.op.obj.state.obj.key.instance.clear(); + params.op.list_max_count = MAX_VERSIONED_OBJECTS; + ret = db->ProcessOp(dpp, "ListVersionedObjects", ¶ms); + + i = 1; + for (auto ent: params.op.obj.list_entries) { + string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false"; + cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_current = " << is_current << "\n"; + + if (i == 1) { + ASSERT_EQ(is_current, "true"); + dm_instance = ent.key.instance; + } else { + ASSERT_EQ(is_current, "false"); + ASSERT_EQ(ent.key.instance, instances[i]); + } + + i--; + } + +} + TEST_F(DBStoreTest, ObjectOmapSetVal) { struct DBOpParams params = GlobalParams; int ret = -1; -- 2.39.5