obj_op.meta.owner = owner.get_id();
obj_op.meta.flags = PUT_OBJ_CREATE;
+ obj_op.meta.category = RGWObjCategory::Main;
obj_op.meta.modify_tail = true;
obj_op.meta.completeMultipart = true;
parent_op.meta.if_nomatch = if_nomatch;
parent_op.meta.user_data = user_data;
parent_op.meta.zones_trace = zones_trace;
+ parent_op.meta.category = RGWObjCategory::Main;
/* XXX: handle accounted size */
accounted_size = total_data_size;
int ret = 0;
DB *store = target->get_store();
int64_t count = 0;
+ std::string prev_obj;
DBOpParams db_params = {};
store->InitializeParams(dpp, &db_params);
}
for (auto& entry : db_params.op.obj.list_entries) {
+
if (!params.list_versions) {
- if (!(entry.flags & rgw_bucket_dir_entry::FLAG_CURRENT) ||
- (entry.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)) {
+ if (entry.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+ prev_obj = entry.key.name;
// skip all non-current entries and delete_marker
continue;
}
+ if (entry.key.name == prev_obj) {
+ // non current versions..skip the entry
+ continue;
+ }
+ entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+ } else {
+ if (entry.key.name != prev_obj) {
+ // current version
+ entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+ } else {
+ entry.flags &= ~(rgw_bucket_dir_entry::FLAG_CURRENT);
+ entry.flags |= rgw_bucket_dir_entry::FLAG_VER;
+ }
}
+ prev_obj = entry.key.name;
+
if (count >= max) {
*is_truncated = true;
next_marker.name = entry.key.name;
DBOpParams params = {};
RGWObjState* s;
- ret = get_object_impl(dpp, params);
- if (ret && ret != -ENOENT) {
- ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
- goto out;
- }
+ if (!obj.key.instance.empty()) {
+ /* Versionid provided. Fetch the object */
+ ret = get_object_impl(dpp, params);
- /* Check if its a versioned object */
- if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
- if ((ret == -ENOENT) || (params.op.obj.flags & rgw_bucket_dir_entry::FLAG_VER)) { // NEWWWW
- ret = list_versioned_objects(dpp, params.op.obj.list_entries);
-
- if (params.op.obj.list_entries.size() != 0) {
- /* versioned object. Read the latest version object provided its not a
- * delete marker */
- auto& ent = params.op.obj.list_entries.front();
- if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
- ret = -ENOENT;
- ldpp_dout(dpp, 0) <<" Latest is delete marker - err:(" <<ret<<")" << dendl;
- goto out;
- }
- params.op.obj.state.obj.key.instance = ent.key.instance;
+ if (ret && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
+ goto out;
+ }
+ } else {
+ /* Instance is empty. May or may not be versioned object.
+ * List all the versions and read the most recent entry */
+ ret = list_versioned_objects(dpp, params.op.obj.list_entries);
+
+ if (params.op.obj.list_entries.size() != 0) {
+ /* Ensure its not a delete marker */
+ auto& ent = params.op.obj.list_entries.front();
+ if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+ ret = -ENOENT;
+ goto out;
+ }
+ store->InitializeParams(dpp, ¶ms);
+ InitializeParamsfromObject(dpp, ¶ms);
+ params.op.obj.state.obj.key = ent.key;
- ret = get_object_impl(dpp, params);
+ ret = get_object_impl(dpp, params);
- if (ret) {
- ldpp_dout(dpp, 0) <<"get_object_impl of versioned object failed err:(" <<ret<<")" << dendl;
- goto out;
- }
+ if (ret) {
+ ldpp_dout(dpp, 0) <<"get_object_impl of versioned object failed err:(" <<ret<<")" << dendl;
+ goto out;
}
+ } else {
+ ret = -ENOENT;
+ return ret;
}
}
return 0;
}
-/*
- * If versioned,
- * - check if the old version of the object already exists
- * if exists,
- * - set its version-id/instance to "null" if instance was empty
- * - demote it to NON_CURRENT
- * - create versioned object with FLAG_CURRENT set
- *
- * XXX: call this function under a lock or sqlite transaction to prevent
- * parallel object uploads marking both the versions as FLAG_CURRENT;
- *
- */
-int DB::Object::Write::write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params) {
- DB *store = target->get_store();
- int ret = -1;
- DBOpParams obj_params = {};
- uint64_t version_num = 0;
-
- store->InitializeParams(dpp, &obj_params);
- target->InitializeParamsfromObject(dpp, &obj_params);
-
- /* Check if already exists */
- obj_params.op.obj.state.obj.key.instance.clear();
- ret = target->list_versioned_objects(dpp, obj_params.op.obj.list_entries);
-
- if (ret && ret != -ENOENT) {
- ldpp_dout(dpp, 0)<<"ListVersionedObjects of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
- return ret;
- }
-
- for (auto& ent : obj_params.op.obj.list_entries) {
- ret = target->update_obj_next_version(dpp, ent, false, version_num); // demote previous version
- if (ret != ENOENT) {
- break;
- }
- // continue to next object version
- }
-
- if (ret) {
- ldpp_dout(dpp, 0)<<"demote_next_version of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
- return ret;
- }
-
- params.op.obj.version_num = ++version_num;
-
- /* Put Object */
- params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
- ret = store->ProcessOp(dpp, "PutObject", ¶ms);
- if (ret) {
- ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
- }
-
- return ret;
-}
-
-/* promote or demote next object version */
-int DB::Object::update_obj_next_version(const DoutPrefixProvider *dpp,
- rgw_bucket_dir_entry &obj_entry,
- bool promote, uint64_t& version_num) {
- int ret = -1;
- DBOpParams params = {};
-
- params.op.obj.state.obj.key.name = obj_entry.key.name;
- params.op.obj.state.obj.key.instance = obj_entry.key.instance;
-
- // fetch the last version of the object
- store->InitializeParams(dpp, ¶ms);
- params.op.bucket.info.bucket.name = bucket_info.bucket.name;
-
- ret = get_object_impl(dpp, params);
-
- if (ret) { // do nothing as its best effort?
- ldpp_dout(dpp, 0)<<"In demote_object_next_ver get of prev version(" << params.op.obj.obj_id << ") failed err:(" <<ret<<")" << dendl;
- goto out;
- }
-
- if (params.op.obj.state.obj.key.instance.empty()) {
- /* This means, previous version of object exists which was created before
- * versioning is enabled on the bucket.
- *
- * Set its instance to "null" and mark it as versioned.
- */
- params.op.obj.state.obj.key.instance = "null";
- params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
- }
- version_num = params.op.obj.version_num;
-
- if (promote) {
- params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
- } else {
- params.op.obj.flags &= ~(rgw_bucket_dir_entry::FLAG_CURRENT);
- }
- params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
-
- ret = store->ProcessOp(dpp, "PutObject", ¶ms); // XXX: can be UpdateObject?
-
- if (ret) { // do nothing as its best effort?
- ldpp_dout(dpp, 0)<<"Put of Old version of object failed err:(" <<ret<<")" << dendl;
- }
-out :
- return ret;
-}
-
/* Write metadata & head object data */
int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
uint64_t size, uint64_t accounted_size,
params.op.obj.state.size = size;
params.op.obj.state.accounted_size = accounted_size;
params.op.obj.owner = target->get_bucket_info().owner.id;
+ params.op.obj.category = meta.category;
if (meta.mtime) {
*meta.mtime = meta.set_mtime;
params.op.query_str = "meta";
params.op.obj.obj_id = target->obj_id;
- params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
-
/* Check if versioned */
bool is_versioned = !target->obj.key.instance.empty() && (target->obj.key.instance != "null");
params.op.obj.is_versioned = is_versioned;
- if (!is_versioned) {
- ret = store->ProcessOp(dpp, "PutObject", ¶ms);
- if (ret) {
- ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
- goto out;
- }
- return ret;
+ if (is_versioned && (params.op.obj.category == RGWObjCategory::Main)) {
+ /* versioned object */
+ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+ }
+ ret = store->ProcessOp(dpp, "PutObject", ¶ms);
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
+ goto out;
}
- /* versioned object */
- params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
- ret = write_versioned_obj(dpp, params);
out:
if (ret < 0) {
int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
int ret = 0;
DBOpParams del_params = {};
+ bool versioning_enabled = ((params.versioning_status & BUCKET_VERSIONED) == BUCKET_VERSIONED);
+ bool versioning_suspended = ((params.versioning_status & BUCKET_VERSIONS_SUSPENDED) == BUCKET_VERSIONS_SUSPENDED);
+ bool regular_obj = true;
+ std::string versionid = target->obj.key.instance;
ret = target->get_object_impl(dpp, del_params);
return ret;
}
- if (!ret && !(del_params.op.obj.flags &
- (rgw_bucket_dir_entry::FLAG_DELETE_MARKER | rgw_bucket_dir_entry::FLAG_VER))) {
- /* Non versioned objects. Simple delete */
- ret = delete_obj_impl(dpp, del_params);
- return ret;
- }
-
- /* check if it is versioned object. */
- ret = target->list_versioned_objects(dpp, del_params.op.obj.list_entries);
-
- if (!del_params.op.obj.list_entries.empty()) {
- ret = delete_versioned_obj(dpp, del_params);
- } else {
- ret = -ENOENT;
+ regular_obj = (del_params.op.obj.category == RGWObjCategory::Main);
+ if (!ret) {
+ if (!versionid.empty()) {
+ // version-id is provided
+ ret = delete_obj_impl(dpp, del_params);
+ return ret;
+ } else { // version-id is empty..
+ /*
+ * case: bucket_versioned
+ * create_delete_marker;
+ * case: bucket_suspended
+ * delete entry
+ * create delete marker with version-id null;
+ * default:
+ * just delete the entry
+ */
+ if (versioning_suspended && regular_obj) {
+ ret = delete_obj_impl(dpp, del_params);
+ ret = create_dm(dpp, del_params);
+ } else if (versioning_enabled && regular_obj) {
+ ret = create_dm(dpp, del_params);
+ } else {
+ ret = delete_obj_impl(dpp, del_params);
+ }
+ }
+ } else { // ret == -ENOENT
+ /* case: VersionID given
+ * return -ENOENT
+ * else: // may or may not be versioned object
+ * Listversionedobjects
+ * if (list_entries.empty()) {
+ * nothing to do..return ENOENT
+ * } else {
+ * read top entry
+ * if (top.flags | FLAG_DELETE_MARKER) {
+ * // nothing to do
+ * return -ENOENT;
+ * }
+ * if (bucket_versioned) {
+ * // create delete marker with new version-id
+ * } else if (bucket_suspended) {
+ * // create delete marker with version-id null
+ * }
+ * bucket cannot be in unversioned state post having versions
+ * }
+ */
+ if (!versionid.empty()) {
+ return -ENOENT;
+ }
+ ret = target->list_versioned_objects(dpp, del_params.op.obj.list_entries);
+ if (ret) {
+ ldpp_dout(dpp, 0)<<"ListVersionedObjects failed err:(" <<ret<<")" << dendl;
+ return ret;
+ }
+ if (del_params.op.obj.list_entries.empty()) {
+ return -ENOENT;
+ }
+ auto &ent = del_params.op.obj.list_entries.front();
+ if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+ // for now do not create another delete marker..just exit
+ return 0;
+ }
+ ret = create_dm(dpp, del_params);
}
-
return ret;
}
* - create a delete marker with
* - new version/instanceID (if bucket versioned)
* - null versionID (if versioning suspended)
- * - demote the current version entry
- * XXX: If delete_marker is the latest version, another delete_marker
- * is not created for now (for any subsequent deletes) . Will revisit this if
- * needed.
- *
- * b) If versionID provided,
- * - delete that particular entry
- * - Incase the entry is current entry, promote next object version to CURRENT.
- *
- * XXX: call this function under a lock or sqlite transaction to prevent
- * parallel object uploads marking both the versions as FLAG_CURRENT;
*/
-int DB::Object::Delete::delete_versioned_obj(const DoutPrefixProvider *dpp,
+int DB::Object::Delete::create_dm(const DoutPrefixProvider *dpp,
DBOpParams& del_params) {
DB *store = target->get_store();
- bool versioning_enabled = (params.versioning_status & BUCKET_VERSIONED);
+ bool versioning_suspended = ((params.versioning_status & BUCKET_VERSIONS_SUSPENDED) == BUCKET_VERSIONS_SUSPENDED);
int ret = -1;
DBOpParams olh_params = {};
std::string version_id;
DBOpParams next_params = del_params;
version_id = del_params.op.obj.state.obj.key.instance;
- auto &ent = del_params.op.obj.list_entries.front();
- uint64_t version_num = MAX_VERSIONED_OBJECTS;
-
- if (version_id.empty()) {
- /* create delete marker */
-
- if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
- // for now do not create another delete marker..just exit
- return 0;
- }
- DBOpParams dm_params = del_params;
-
- //demote current entry
- target->update_obj_next_version(dpp, ent, false, version_num); //XXX: not checking return status
-
- // create delete marker
- store->InitializeParams(dpp, &dm_params);
- target->InitializeParamsfromObject(dpp, &dm_params);
- if (!versioning_enabled) {
- dm_params.op.obj.state.obj.key.instance = "null";
- } else {
- store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key);
- dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance;
- }
+ DBOpParams dm_params = del_params;
- dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER);
- dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_CURRENT);
- dm_params.op.obj.version_num = ++version_num; // max obj version list
+ // create delete marker
- ret = store->ProcessOp(dpp, "PutObject", &dm_params);
+ store->InitializeParams(dpp, &dm_params);
+ target->InitializeParamsfromObject(dpp, &dm_params);
+ dm_params.op.obj.category = RGWObjCategory::None;
- if (ret) {
- ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <<ret<<")" << dendl;
- return ret;
- }
+ if (versioning_suspended) {
+ dm_params.op.obj.state.obj.key.instance = "null";
} else {
- // delete paritcular version-id
- DBOpParams dm_params = {};
-
- store->InitializeParams(dpp, &dm_params);
- target->InitializeParamsfromObject(dpp, &dm_params);
+ store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key);
+ dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance;
+ }
- dm_params.op.obj.state.obj.key.instance = version_id;
+ dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER);
- ret = target->get_object_impl(dpp, dm_params);
+ ret = store->ProcessOp(dpp, "PutObject", &dm_params);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "get obj of versioned object failed - err:(" <<ret<<")" << dendl;
- return ret;
- }
- ret = delete_obj_impl(dpp, dm_params);
- if (dm_params.op.obj.flags & rgw_bucket_dir_entry::FLAG_CURRENT) {
- bool found = false;
- std::list<rgw_bucket_dir_entry>::iterator e;
- for (e = del_params.op.obj.list_entries.begin(); e != del_params.op.obj.list_entries.end(); e++) {
-
- if (e->key.instance == version_id) {
- e++;
- if (e != del_params.op.obj.list_entries.end()) {
- found = true;
- }
- break;
- }
- }
- if (found) {
- uint64_t version_num = 0;
- rgw_bucket_dir_entry& e1 = *e;
- target->update_obj_next_version(dpp, e1, true, version_num);
- }
- }
+ if (ret) {
+ ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <<ret<<")" << dendl;
+ return ret;
}
-
+ result.delete_marker = true;
+ result.version_id = dm_params.op.obj.state.obj.key.instance;
return ret;
}
/* for versioned objects */
bool is_versioned;
- uint64_t version_num = 1; // default value for plain entries (non-versioned)
+ uint64_t version_num = 0;
};
struct DBOpObjectDataInfo {
DBOpInfo op;
std::string objectdata_table;
+ std::string object_trigger;
std::string object_view;
std::string quota_table;
std::string lc_head_table;
static constexpr const char* new_obj_name = ":new_obj_name";
static constexpr const char* new_obj_instance = ":new_obj_instance";
static constexpr const char* new_obj_ns = ":new_obj_ns";
- static constexpr const char* versions = ":versions";
};
struct DBOpObjectDataPrepareInfo {
std::string objectdata_table;
+ std::string object_trigger;
std::string object_view;
std::string quota_table;
std::string lc_head_table;
FOREIGN KEY (OwnerID) \
REFERENCES '{}' (UserID) ON DELETE CASCADE ON UPDATE CASCADE \n);";
+ static constexpr std::string_view CreateObjectTableTriggerQ =
+ "CREATE TRIGGER IF NOT EXISTS '{}' \
+ AFTER INSERT ON '{}' \
+ BEGIN \
+ UPDATE '{}' \
+ SET VersionNum = (SELECT COALESCE(max(VersionNum), 0) from '{}' where ObjName = new.ObjName) + 1 \
+ where ObjName = new.ObjName and ObjInstance = new.ObjInstance; \
+ END;";
+
static constexpr std::string_view CreateObjectTableQ =
/* Corresponds to rgw::sal::Object
*
IsMultipart BOOL, \
MPPartsList BLOB, \
HeadData BLOB, \
- VERSIONS BLOB, \
PRIMARY KEY (ObjName, ObjInstance, BucketName), \
FOREIGN KEY (BucketName) \
REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);";
return fmt::format(CreateObjectTableQ,
params->object_table,
params->bucket_table);
+ if (!type.compare("ObjectTrigger"))
+ return fmt::format(CreateObjectTableTriggerQ,
+ params->object_trigger,
+ params->object_table,
+ params->object_table,
+ params->object_table);
if (!type.compare("ObjectData"))
return fmt::format(CreateObjectDataTableQ,
params->objectdata_table,
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
- HeadData, Versions) \
+ HeadData) \
VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
- {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
+ {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
+ {}, {}, {}, \
{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
public:
params.op.obj.accounted_size, params.op.obj.mtime,
params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag,
params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj,
- params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num,
+ params.op.obj.has_data, params.op.obj.is_versioned,
+ params.op.obj.version_num,
params.op.obj.pg_ver, params.op.obj.zone_short_id,
params.op.obj.obj_version, params.op.obj.obj_version_tag,
params.op.obj.obj_attrs, params.op.obj.head_size,
params.op.obj.manifest_part_objs,
params.op.obj.manifest_part_rules, params.op.obj.omap,
params.op.obj.is_multipart, params.op.obj.mp_parts,
- params.op.obj.head_data, params.op.obj.versions);
+ params.op.obj.head_data);
}
};
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
- HeadData, Versions from '{}' \
+ HeadData from '{}' \
where BucketName = {} and ObjName = {} and ObjInstance = {}";
public:
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
- where BucketName = {} and ObjName >= {} and ObjName LIKE {} ORDER BY ObjName ASC LIMIT {}";
+ where BucketName = {} and ObjName >= {} and ObjName LIKE {} ORDER BY ObjName ASC, VersionNum DESC LIMIT {}";
public:
virtual ~ListBucketObjectsOp() {}
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
TailPlacementRuleName, TailPlacementStorageClass, \
ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
- HeadData, Versions from '{}' \
+ HeadData from '{}' \
where BucketName = {} and ObjName = {} ORDER BY VersionNum DESC LIMIT {}";
public:
virtual ~ListVersionedObjectsOp() {}
HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \
TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \
ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \
- IsMultipart = {}, MPPartsList = {}, HeadData = {}, Versions = {} \
+ IsMultipart = {}, MPPartsList = {}, HeadData = {} \
WHERE ObjName = {} and ObjInstance = {} and BucketName = {}";
public:
params.op.obj.manifest_part_objs,
params.op.obj.manifest_part_rules, params.op.obj.omap,
params.op.obj.is_multipart, params.op.obj.mp_parts,
- params.op.obj.head_data, params.op.obj.versions,
+ params.op.obj.head_data,
params.op.obj.obj_name, params.op.obj.obj_instance,
params.op.bucket.bucket_name);
}
return db_name+"_"+bucket+"_objectdata_table"; }
const std::string getObjectView(std::string bucket) {
return db_name+"_"+bucket+"_object_view"; }
+ const std::string getObjectTrigger(std::string bucket) {
+ return db_name+"_"+bucket+"_object_trigger"; }
std::map<std::string, class ObjectOp*> getObjectMap();
bool assume_noent, bool modify_tail);
int write_meta(const DoutPrefixProvider *dpp, uint64_t size,
uint64_t accounted_size, std::map<std::string, bufferlist>& attrs);
- int write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params);
};
struct Delete {
int delete_obj(const DoutPrefixProvider *dpp);
int delete_obj_impl(const DoutPrefixProvider *dpp, DBOpParams& del_params);
- int delete_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& del_params);
+ int create_dm(const DoutPrefixProvider *dpp, DBOpParams& del_params);
};
/* XXX: the parameters may be subject to change. All we need is bucket name
const RGWBucketInfo& bucket_info, const rgw_obj& obj,
off_t ofs, off_t end, uint64_t max_chunk_size,
iterate_obj_cb cb, void *arg);
- int update_obj_next_version(const DoutPrefixProvider *dpp,
- rgw_bucket_dir_entry &obj_entry,
- bool promote, uint64_t& version_num);
};
int get_obj_iterate_cb(const DoutPrefixProvider *dpp,
const raw_obj& read_obj, off_t obj_ofs,
#define SQL_BIND_TEXT(dpp, stmt, index, str, sdb) \
do { \
- rc = sqlite3_bind_text(stmt, index, str, -1, SQLITE_TRANSIENT); \
+ rc = sqlite3_bind_text(stmt, index, str, -1, SQLITE_TRANSIENT); \
if (rc != SQLITE_OK) { \
ldpp_dout(dpp, 0)<<"sqlite bind text failed for index(" \
<<index<<"), str("<<str<<") in stmt(" \
if (params->object_view.empty()) {
params->object_view = getObjectView(bucket);
}
+ if (params->object_trigger.empty()) {
+ params->object_trigger = getObjectTrigger(bucket);
+ }
p_params.object_table = params->object_table;
p_params.objectdata_table = params->objectdata_table;
p_params.object_view = params->object_view;
return ret;
}
+int SQLiteDB::createObjectTableTrigger(const DoutPrefixProvider *dpp, DBOpParams *params)
+{
+ int ret = -1;
+ string schema;
+
+ schema = CreateTableSchema("ObjectTrigger", params);
+
+ ret = exec(dpp, schema.c_str(), NULL);
+ if (ret)
+ ldpp_dout(dpp, 0)<<"CreateObjectTableTrigger failed " << dendl;
+
+ ldpp_dout(dpp, 20)<<"CreateObjectTableTrigger suceeded " << dendl;
+
+ return ret;
+}
+
int SQLiteDB::createObjectView(const DoutPrefixProvider *dpp, DBOpParams *params)
{
int ret = -1;
(void)createObjectTable(dpp, params);
(void)createObjectDataTable(dpp, params);
+ (void)createObjectTableTrigger(dpp, params);
out:
return ret;
}
{
int index = -1;
int rc = 0;
+ int VersionNum = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_versioned, sdb);
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.version_num, sdb);
- SQL_BIND_INT(dpp, stmt, index, params->op.obj.version_num, sdb);
+ SQL_BIND_INT(dpp, stmt, index, VersionNum, sdb);
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.pg_ver, sdb);
SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.pg_ver, sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
goto out;
}
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.bucket.bucket_name, sdb);
SQL_BIND_TEXT(dpp, *stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
int rc = 0;
struct DBOpPrepareParams p_params = PrepareParams;
+ if (params->op.obj.state.obj.key.instance.empty()) {
+ params->op.obj.state.obj.key.instance = "null";
+ }
+
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
int createObjectTable(const DoutPrefixProvider *dpp, DBOpParams *params);
int createObjectDataTable(const DoutPrefixProvider *dpp, DBOpParams *params);
int createObjectView(const DoutPrefixProvider *dpp, DBOpParams *params);
+ int createObjectTableTrigger(const DoutPrefixProvider *dpp, DBOpParams *params);
int createQuotaTable(const DoutPrefixProvider *dpp, DBOpParams *params);
void populate_object_params(const DoutPrefixProvider *dpp,
struct DBOpPrepareParams& p_params,
void TearDown() {
}
+
+ int write_object(const DoutPrefixProvider *dpp, DBOpParams params) {
+ DB::Object op_target(db, params.op.bucket.info,
+ params.op.obj.state.obj);
+ DB::Object::Write write_op(&op_target);
+ map<string, bufferlist> setattrs;
+ ret = write_op.prepare(dpp);
+ if (ret)
+ return ret;
+
+ write_op.meta.mtime = &bucket_mtime;
+ write_op.meta.category = RGWObjCategory::Main;
+ write_op.meta.owner = params.op.user.uinfo.user_id;
+
+ bufferlist b1 = params.op.obj.head_data;
+ write_op.meta.data = &b1;
+
+ bufferlist b2;
+ encode("ACL", b2);
+ setattrs[RGW_ATTR_ACL] = b2;
+
+ ret = write_op.write_meta(0, params.op.obj.state.size, b1.length()+1, setattrs);
+ return ret;
+ }
};
}
decode(data, params.op.obj.head_data);
ASSERT_EQ(data, "HELLO WORLD");
ASSERT_EQ(params.op.obj.state.size, 12);
+ cout << "versionNum :" << params.op.obj.version_num << "\n";
}
TEST_F(DBStoreTest, GetObjectState) {
ASSERT_EQ(ret, 0);
ASSERT_EQ(s->size, 12);
ASSERT_EQ(s->is_olh, false);
+ cout << "versionNum :" << params.op.obj.version_num << "\n";
/* Recheck with get_state API */
ret = op_target.get_state(dpp, &s, false);
ASSERT_EQ(ret, 0);
ASSERT_EQ(s->size, 12);
ASSERT_EQ(s->is_olh, false);
+ cout << "versionNum :" << params.op.obj.version_num << "\n";
}
TEST_F(DBStoreTest, ObjAttrs) {
TEST_F(DBStoreTest, WriteObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
- map<string, bufferlist> setattrs;
params.op.obj.state.obj.key.name = "object3";
params.op.obj.state.obj.key.instance = "inst3";
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
- DB::Object::Write write_op(&op_target);
- ret = write_op.prepare(dpp);
- ASSERT_EQ(ret, 0);
-
- write_op.meta.mtime = &bucket_mtime;
- write_op.meta.category = RGWObjCategory::Main;
- write_op.meta.owner = params.op.user.uinfo.user_id;
bufferlist b1;
encode("HELLO WORLD - Object3", b1);
- cout<<"XXXXXXXXX Insert b1.length " << b1.length() << "\n";
- write_op.meta.data = &b1;
-
- bufferlist b2;
- encode("ACL", b2);
- setattrs[RGW_ATTR_ACL] = b2;
+ params.op.obj.head_data = b1;
+ params.op.obj.state.size = 22;
- ret = write_op.write_meta(0, 22, 25, setattrs);
+ ret = write_object(dpp, params);
ASSERT_EQ(ret, 0);
}
encode("HELLO WORLD", b1);
params.op.obj.head_data = b1;
params.op.obj.state.size = 12;
- ret = write_op.write_versioned_obj(dpp, params);
+ ret = write_object(dpp, params);
ASSERT_EQ(ret, 0);
/* Version2 */
encode("HELLO WORLD ABC", b1);
params.op.obj.head_data = b1;
params.op.obj.state.size = 16;
- ret = write_op.write_versioned_obj(dpp, params);
+ ret = write_object(dpp, params);
ASSERT_EQ(ret, 0);
/* Version3 */
encode("HELLO WORLD A", b1);
params.op.obj.head_data = b1;
params.op.obj.state.size = 14;
- ret = write_op.write_versioned_obj(dpp, params);
+ ret = write_object(dpp, params);
ASSERT_EQ(ret, 0);
}
i = 2;
for (auto ent: params.op.obj.list_entries) {
- string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
- cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is:current = " << is_current << "\n";
- if (i == 2) {
- ASSERT_EQ(is_current, "true");
- } else {
- ASSERT_EQ(is_current, "false");
- }
ASSERT_EQ(ent.key.instance, instances[i]);
i--;
i = 3;
for (auto ent: params.op.obj.list_entries) {
- string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
string is_delete_marker = (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)? "true" : "false";
cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_delete_marker = " << is_delete_marker << "\n";
if (i == 3) {
ASSERT_EQ(is_delete_marker, "true");
- ASSERT_EQ(is_current, "true");
dm_instance = ent.key.instance;
} else {
- ASSERT_EQ(is_current, "false");
ASSERT_EQ(is_delete_marker, "false");
ASSERT_EQ(ent.key.instance, instances[i]);
}
i = 1;
for (auto ent: params.op.obj.list_entries) {
- string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
- cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_current = " << is_current << "\n";
if (i == 1) {
- ASSERT_EQ(is_current, "true");
dm_instance = ent.key.instance;
} else {
- ASSERT_EQ(is_current, "false");
ASSERT_EQ(ent.key.instance, instances[i]);
}