]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/dbstore: Atomically update version number for versioned objects
authorSoumya Koduri <skoduri@redhat.com>
Wed, 15 Jun 2022 12:20:09 +0000 (17:50 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Mon, 25 Jul 2022 06:07:08 +0000 (11:37 +0530)
VersionNum is used to track the latest version available of the versioned
object. In order to handle racing updates/overwrites, have used sqlite
trigger which atomically updates the version number for every object insert.

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
src/rgw/rgw_sal_dbstore.cc
src/rgw/store/dbstore/common/dbstore.cc
src/rgw/store/dbstore/common/dbstore.h
src/rgw/store/dbstore/sqlite/sqliteDB.cc
src/rgw/store/dbstore/sqlite/sqliteDB.h
src/rgw/store/dbstore/tests/dbstore_tests.cc

index dbf0f812920c8af3c9521ad7e65f7f2d74a88460..60deb84d13cd1b2028e96062b01c05aa929c54d7 100644 (file)
@@ -1148,6 +1148,7 @@ namespace rgw::sal {
 
     obj_op.meta.owner = owner.get_id();
     obj_op.meta.flags = PUT_OBJ_CREATE;
+    obj_op.meta.category = RGWObjCategory::Main;
     obj_op.meta.modify_tail = true;
     obj_op.meta.completeMultipart = true;
 
@@ -1525,6 +1526,7 @@ namespace rgw::sal {
     parent_op.meta.if_nomatch = if_nomatch;
     parent_op.meta.user_data = user_data;
     parent_op.meta.zones_trace = zones_trace;
+    parent_op.meta.category = RGWObjCategory::Main;
     
     /* XXX: handle accounted size */
     accounted_size = total_data_size;
index c9217a760605474dfb6168956bdd2a27bfea68f3..3936368e6f7f8057fe394820684ecc1a994c76a2 100644 (file)
@@ -750,6 +750,7 @@ int DB::Bucket::List::list_objects(const DoutPrefixProvider *dpp, int64_t max,
   int ret = 0;
   DB *store = target->get_store();
   int64_t count = 0;
+  std::string prev_obj;
 
   DBOpParams db_params = {};
   store->InitializeParams(dpp, &db_params);
@@ -769,14 +770,30 @@ int DB::Bucket::List::list_objects(const DoutPrefixProvider *dpp, int64_t max,
   }
 
   for (auto& entry : db_params.op.obj.list_entries) {
+
     if (!params.list_versions) {
-      if (!(entry.flags & rgw_bucket_dir_entry::FLAG_CURRENT) || 
-           (entry.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)) {
+      if (entry.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+        prev_obj = entry.key.name;
         // skip all non-current entries and delete_marker
         continue;
       }
+      if (entry.key.name == prev_obj) {
+        // non current versions..skip the entry
+        continue;
+      }
+      entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+    } else {
+      if (entry.key.name != prev_obj) {
+        // current version
+        entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+      } else {
+        entry.flags &= ~(rgw_bucket_dir_entry::FLAG_CURRENT);
+        entry.flags |= rgw_bucket_dir_entry::FLAG_VER;
+      }
     }
 
+    prev_obj = entry.key.name;
+
     if (count >= max) {
       *is_truncated = true;
       next_marker.name = entry.key.name;
@@ -1241,36 +1258,40 @@ int DB::Object::get_obj_state(const DoutPrefixProvider *dpp,
 
   DBOpParams params = {};
   RGWObjState* s;
-  ret = get_object_impl(dpp, params);
 
-  if (ret && ret != -ENOENT) {
-    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
-    goto out;
-  }
+  if (!obj.key.instance.empty()) {
+    /* Versionid provided. Fetch the object */
+    ret = get_object_impl(dpp, params);
 
-  /* Check if its a versioned object */
-  if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
-    if ((ret == -ENOENT) || (params.op.obj.flags & rgw_bucket_dir_entry::FLAG_VER)) { // NEWWWW
-      ret = list_versioned_objects(dpp, params.op.obj.list_entries);
-
-      if (params.op.obj.list_entries.size() != 0) {
-        /* versioned object. Read the latest version object provided its not a
-         * delete marker */
-        auto& ent = params.op.obj.list_entries.front();
-        if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
-          ret = -ENOENT;
-          ldpp_dout(dpp, 0) <<" Latest is delete marker -  err:(" <<ret<<")" << dendl;
-          goto out;
-        }
-        params.op.obj.state.obj.key.instance = ent.key.instance;
+    if (ret && ret != -ENOENT) {
+      ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
+      goto out;
+    }
+  } else {
+    /* Instance is empty. May or may not be versioned object.
+     * List all the versions and read the most recent entry */
+    ret = list_versioned_objects(dpp, params.op.obj.list_entries);
+
+    if (params.op.obj.list_entries.size() != 0) {
+       /* Ensure its not a delete marker */
+      auto& ent = params.op.obj.list_entries.front();
+      if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+        ret = -ENOENT;
+        goto out;
+      }
+      store->InitializeParams(dpp, &params);
+      InitializeParamsfromObject(dpp, &params);
+      params.op.obj.state.obj.key = ent.key;
     
-        ret = get_object_impl(dpp, params);
+      ret = get_object_impl(dpp, params);
 
-        if (ret) {
-          ldpp_dout(dpp, 0) <<"get_object_impl of versioned object failed err:(" <<ret<<")" << dendl;
-          goto out;
-        }
+      if (ret) {
+        ldpp_dout(dpp, 0) <<"get_object_impl of versioned object failed err:(" <<ret<<")" << dendl;
+        goto out;
       }
+    } else {
+      ret = -ENOENT;
+      return ret;
     }
   }
 
@@ -1652,109 +1673,6 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp,
   return 0;
 }
 
-/*
- * If versioned, 
- * - check if the old version of the object already exists
- *   if exists,
- *     - set its version-id/instance to "null" if instance was empty
- *     - demote it to NON_CURRENT 
- * - create versioned object with FLAG_CURRENT set 
- *
- * XXX: call this function under a lock or sqlite transaction to prevent
- * parallel object uploads marking both the versions as FLAG_CURRENT;
- *
- */
-int DB::Object::Write::write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params) {
-  DB *store = target->get_store();
-  int ret = -1;
-  DBOpParams obj_params = {};
-  uint64_t version_num = 0;
-
-  store->InitializeParams(dpp, &obj_params);
-  target->InitializeParamsfromObject(dpp, &obj_params);
-
-  /* Check if already exists */
-  obj_params.op.obj.state.obj.key.instance.clear();
-  ret = target->list_versioned_objects(dpp, obj_params.op.obj.list_entries);
-  
-  if (ret && ret != -ENOENT) {
-    ldpp_dout(dpp, 0)<<"ListVersionedObjects of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
-    return ret;
-  }
-
-  for (auto& ent : obj_params.op.obj.list_entries) {
-    ret = target->update_obj_next_version(dpp, ent, false, version_num); // demote previous version
-    if (ret != ENOENT) {
-      break;
-    }
-    // continue to next object version
-  }
-
-  if (ret) {
-    ldpp_dout(dpp, 0)<<"demote_next_version of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
-    return ret;
-  }
-
-  params.op.obj.version_num = ++version_num;
-
-  /* Put Object */
-  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
-  ret = store->ProcessOp(dpp, "PutObject", &params);
-  if (ret) {
-    ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
-  }
-
-  return ret;
-}
-
-/* promote or demote next object version */
-int DB::Object::update_obj_next_version(const DoutPrefixProvider *dpp,
-                                rgw_bucket_dir_entry &obj_entry,
-                                bool promote, uint64_t& version_num) {
-  int ret = -1;
-  DBOpParams params = {};
-
-  params.op.obj.state.obj.key.name  = obj_entry.key.name;
-  params.op.obj.state.obj.key.instance  = obj_entry.key.instance;
-
-  // fetch the last version of the object
-  store->InitializeParams(dpp, &params);
-  params.op.bucket.info.bucket.name = bucket_info.bucket.name;
-
-  ret = get_object_impl(dpp, params);
-
-  if (ret) { // do nothing as its best effort? 
-    ldpp_dout(dpp, 0)<<"In demote_object_next_ver get of prev version(" << params.op.obj.obj_id << ") failed err:(" <<ret<<")" << dendl;
-    goto out;
-  }
-
-  if (params.op.obj.state.obj.key.instance.empty()) {
-    /* This means, previous version of object exists which was created before
-     * versioning is enabled on the bucket.
-     *
-     * Set its instance to "null" and mark it as versioned.
-     */
-    params.op.obj.state.obj.key.instance = "null";
-    params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
-  }
-  version_num = params.op.obj.version_num;
-
-  if (promote) {
-    params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
-  } else {
-    params.op.obj.flags &= ~(rgw_bucket_dir_entry::FLAG_CURRENT);
-  }
-  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
-
-  ret = store->ProcessOp(dpp, "PutObject", &params); // XXX: can be UpdateObject?
-
-  if (ret) { // do nothing as its best effort? 
-    ldpp_dout(dpp, 0)<<"Put of Old version of object failed err:(" <<ret<<")" << dendl;
-  }
-out :
-    return ret;
-}
-
 /* Write metadata & head object data */
 int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
     uint64_t size, uint64_t accounted_size,
@@ -1855,6 +1773,7 @@ int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
   params.op.obj.state.size = size;
   params.op.obj.state.accounted_size = accounted_size;
   params.op.obj.owner = target->get_bucket_info().owner.id;
+  params.op.obj.category = meta.category;
 
   if (meta.mtime) {
     *meta.mtime = meta.set_mtime;
@@ -1863,24 +1782,20 @@ int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
   params.op.query_str = "meta";
   params.op.obj.obj_id = target->obj_id;
 
-  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
-
   /* Check if versioned */
   bool is_versioned = !target->obj.key.instance.empty() && (target->obj.key.instance != "null");
   params.op.obj.is_versioned = is_versioned;
 
-  if (!is_versioned) {
-    ret = store->ProcessOp(dpp, "PutObject", &params);
-    if (ret) {
-      ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
-      goto out;
-    }
-    return ret;
+  if (is_versioned && (params.op.obj.category == RGWObjCategory::Main)) {
+    /* versioned object */
+    params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+  }
+  ret = store->ProcessOp(dpp, "PutObject", &params);
+  if (ret) {
+    ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
+    goto out;
   }
 
-  /* versioned object */
-  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
-  ret = write_versioned_obj(dpp, params);
 
 out:
   if (ret < 0) {
@@ -1904,6 +1819,10 @@ int DB::Object::Write::write_meta(const DoutPrefixProvider *dpp, uint64_t size,
 int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
   int ret = 0;
   DBOpParams del_params = {};
+  bool versioning_enabled = ((params.versioning_status & BUCKET_VERSIONED) == BUCKET_VERSIONED); 
+  bool versioning_suspended = ((params.versioning_status & BUCKET_VERSIONS_SUSPENDED) == BUCKET_VERSIONS_SUSPENDED); 
+  bool regular_obj = true;
+  std::string versionid = target->obj.key.instance;
 
   ret = target->get_object_impl(dpp, del_params);
 
@@ -1912,22 +1831,70 @@ int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
     return ret;
   }
 
-  if (!ret && !(del_params.op.obj.flags & 
-         (rgw_bucket_dir_entry::FLAG_DELETE_MARKER | rgw_bucket_dir_entry::FLAG_VER))) {
-    /* Non versioned objects. Simple delete */
-    ret = delete_obj_impl(dpp, del_params);
-    return ret;
-  }
-
-  /* check if it is versioned object. */
-  ret = target->list_versioned_objects(dpp, del_params.op.obj.list_entries);
-
-  if (!del_params.op.obj.list_entries.empty()) {
-    ret = delete_versioned_obj(dpp, del_params);
-  } else {
-    ret = -ENOENT;
+  regular_obj = (del_params.op.obj.category == RGWObjCategory::Main);
+  if (!ret) {
+    if (!versionid.empty()) {
+      // version-id is provided
+      ret = delete_obj_impl(dpp, del_params);
+      return ret;
+    } else { // version-id is empty..
+      /*
+       * case: bucket_versioned
+       *    create_delete_marker;
+       * case: bucket_suspended
+       *    delete entry
+       *    create delete marker with version-id null;
+       * default:
+       *   just delete the entry
+       */
+      if (versioning_suspended && regular_obj) {
+        ret = delete_obj_impl(dpp, del_params);
+        ret = create_dm(dpp, del_params);
+      } else if (versioning_enabled && regular_obj) {
+        ret = create_dm(dpp, del_params);
+      } else {
+        ret = delete_obj_impl(dpp, del_params);
+      }
+    }
+  } else { // ret == -ENOENT
+     /* case: VersionID given
+      *     return -ENOENT
+      * else: // may or may not be versioned object
+      *     Listversionedobjects
+      *     if (list_entries.empty()) {
+      *         nothing to do..return ENOENT
+      *     } else {
+      *         read top entry
+      *         if (top.flags | FLAG_DELETE_MARKER) {
+      *            // nothing to do
+      *            return -ENOENT;
+      *          }
+      *          if (bucket_versioned)  {
+      *            // create delete marker with new version-id
+      *          } else if (bucket_suspended) {
+      *            // create delete marker with version-id null
+      *          }
+      *          bucket cannot be in unversioned state post having versions
+      *     }
+      */
+     if (!versionid.empty()) {
+       return -ENOENT;
+     }
+     ret = target->list_versioned_objects(dpp, del_params.op.obj.list_entries);
+     if (ret) {
+        ldpp_dout(dpp, 0)<<"ListVersionedObjects failed err:(" <<ret<<")" << dendl;
+        return ret;
+     }
+    if (del_params.op.obj.list_entries.empty()) {
+      return -ENOENT;
+    }
+    auto &ent = del_params.op.obj.list_entries.front();
+    if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+      // for now do not create another delete marker..just exit
+      return 0;
+    }
+    ret = create_dm(dpp, del_params);
   }
-
   return ret;
 }
 
@@ -1965,102 +1932,44 @@ int DB::Object::Delete::delete_obj_impl(const DoutPrefixProvider *dpp,
  *  - create a delete marker with 
  *    - new version/instanceID (if bucket versioned)
  *    - null versionID (if versioning suspended)
- *    - demote the current version entry
- *  XXX: If delete_marker is the latest version, another delete_marker
- *    is not created for now (for any subsequent deletes) . Will revisit this if
- *    needed.
- *
- * b) If versionID provided,
- *  - delete that particular entry
- *  - Incase the entry is current entry, promote next object version to CURRENT.
- *
- * XXX: call this function under a lock or sqlite transaction to prevent
- * parallel object uploads marking both the versions as FLAG_CURRENT;
  */
-int DB::Object::Delete::delete_versioned_obj(const DoutPrefixProvider *dpp,
+int DB::Object::Delete::create_dm(const DoutPrefixProvider *dpp,
                                              DBOpParams& del_params) {
 
   DB *store = target->get_store();
-  bool versioning_enabled = (params.versioning_status & BUCKET_VERSIONED); 
+  bool versioning_suspended = ((params.versioning_status & BUCKET_VERSIONS_SUSPENDED) == BUCKET_VERSIONS_SUSPENDED); 
   int ret = -1;
   DBOpParams olh_params = {};
   std::string version_id;
   DBOpParams next_params = del_params;
 
   version_id = del_params.op.obj.state.obj.key.instance;
-  auto &ent = del_params.op.obj.list_entries.front();
-  uint64_t version_num = MAX_VERSIONED_OBJECTS;
-
-  if (version_id.empty()) {
-    /* create delete marker */
-
-    if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
-      // for now do not create another delete marker..just exit
-      return 0;
-    }
-    DBOpParams dm_params = del_params;
-
-    //demote current entry
-    target->update_obj_next_version(dpp, ent, false, version_num); //XXX: not checking return status
-
-    // create delete marker
-    store->InitializeParams(dpp, &dm_params);
-    target->InitializeParamsfromObject(dpp, &dm_params);
 
-    if (!versioning_enabled) {
-      dm_params.op.obj.state.obj.key.instance = "null";
-    } else {
-      store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key);
-      dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance;
-    }
+  DBOpParams dm_params = del_params;
 
-    dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER);
-    dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_CURRENT);
-    dm_params.op.obj.version_num = ++version_num; // max obj version list
+  // create delete marker
 
-    ret = store->ProcessOp(dpp, "PutObject", &dm_params);
+  store->InitializeParams(dpp, &dm_params);
+  target->InitializeParamsfromObject(dpp, &dm_params);
+  dm_params.op.obj.category = RGWObjCategory::None;
 
-    if (ret) {
-      ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <<ret<<")" << dendl;
-      return ret;
-    }
+  if (versioning_suspended) {
+    dm_params.op.obj.state.obj.key.instance = "null";
   } else {
-    // delete paritcular version-id
-    DBOpParams dm_params = {};
-
-    store->InitializeParams(dpp, &dm_params);
-    target->InitializeParamsfromObject(dpp, &dm_params);
+    store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key);
+    dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance;
+  }
 
-    dm_params.op.obj.state.obj.key.instance = version_id;
+  dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER);
 
-    ret = target->get_object_impl(dpp, dm_params);
+  ret = store->ProcessOp(dpp, "PutObject", &dm_params);
 
-    if (ret < 0) {
-      ldpp_dout(dpp, 0) << "get obj of versioned object failed - err:(" <<ret<<")" << dendl;
-      return ret;
-    }
-    ret = delete_obj_impl(dpp, dm_params);
-    if (dm_params.op.obj.flags & rgw_bucket_dir_entry::FLAG_CURRENT) {
-      bool found = false;
-      std::list<rgw_bucket_dir_entry>::iterator e;
-      for (e = del_params.op.obj.list_entries.begin(); e != del_params.op.obj.list_entries.end(); e++) {
-
-        if (e->key.instance == version_id) {
-          e++;
-          if (e != del_params.op.obj.list_entries.end()) {
-            found = true;
-          }
-            break;
-        }
-      }
-      if (found) {
-        uint64_t version_num = 0;
-        rgw_bucket_dir_entry& e1 = *e;
-        target->update_obj_next_version(dpp, e1, true, version_num);
-      }
-    }
+  if (ret) {
+    ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <<ret<<")" << dendl;
+    return ret;
   }
-
+  result.delete_marker = true;
+  result.version_id = dm_params.op.obj.state.obj.key.instance;
   return ret;
 }
 
index ed90285d162b7dc08a60233f110aff85124bc14c..80517c44eb825a3be8b34f754d0825bd2ea868a9 100644 (file)
@@ -94,7 +94,7 @@ struct DBOpObjectInfo {
 
   /* for versioned objects */
   bool is_versioned;
-  uint64_t version_num = 1; // default value for plain entries (non-versioned)
+  uint64_t version_num = 0;
 };
 
 struct DBOpObjectDataInfo {
@@ -148,6 +148,7 @@ struct DBOpParams {
   DBOpInfo op;
 
   std::string objectdata_table;
+  std::string object_trigger;
   std::string object_view;
   std::string quota_table;
   std::string lc_head_table;
@@ -292,7 +293,6 @@ struct DBOpObjectPrepareInfo {
   static constexpr const char* new_obj_name = ":new_obj_name";
   static constexpr const char* new_obj_instance = ":new_obj_instance";
   static constexpr const char* new_obj_ns  = ":new_obj_ns";
-  static constexpr const char* versions = ":versions";
 };
 
 struct DBOpObjectDataPrepareInfo {
@@ -339,6 +339,7 @@ struct DBOpPrepareParams {
 
 
   std::string objectdata_table;
+  std::string object_trigger;
   std::string object_view;
   std::string quota_table;
   std::string lc_head_table;
@@ -490,6 +491,15 @@ class DBOp {
       FOREIGN KEY (OwnerID) \
       REFERENCES '{}' (UserID) ON DELETE CASCADE ON UPDATE CASCADE \n);";
 
+    static constexpr std::string_view CreateObjectTableTriggerQ =
+      "CREATE TRIGGER IF NOT EXISTS '{}' \
+          AFTER INSERT ON '{}' \
+       BEGIN \
+          UPDATE '{}' \
+          SET VersionNum = (SELECT COALESCE(max(VersionNum), 0) from '{}' where ObjName = new.ObjName) + 1 \
+          where ObjName = new.ObjName and ObjInstance = new.ObjInstance; \
+       END;";
+
     static constexpr std::string_view CreateObjectTableQ =
       /* Corresponds to rgw::sal::Object
        *
@@ -569,7 +579,6 @@ class DBOp {
       IsMultipart     BOOL,   \
       MPPartsList    BLOB,   \
       HeadData  BLOB,   \
-      VERSIONS BLOB,    \
       PRIMARY KEY (ObjName, ObjInstance, BucketName), \
       FOREIGN KEY (BucketName) \
       REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);";
@@ -662,6 +671,12 @@ class DBOp {
         return fmt::format(CreateObjectTableQ,
             params->object_table,
             params->bucket_table);
+      if (!type.compare("ObjectTrigger"))
+        return fmt::format(CreateObjectTableTriggerQ,
+            params->object_trigger,
+            params->object_table,
+            params->object_table,
+            params->object_table);
       if (!type.compare("ObjectData"))
         return fmt::format(CreateObjectDataTableQ,
             params->objectdata_table,
@@ -995,9 +1010,10 @@ class PutObjectOp: virtual public DBOp {
        ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
        TailPlacementRuleName, TailPlacementStorageClass, \
        ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
-       HeadData, Versions)     \
+       HeadData)     \
       VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
-          {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
+          {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
+          {}, {}, {}, \
           {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
 
   public:
@@ -1016,7 +1032,8 @@ class PutObjectOp: virtual public DBOp {
           params.op.obj.accounted_size, params.op.obj.mtime,
           params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag,
           params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj,
-          params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num,
+          params.op.obj.has_data, params.op.obj.is_versioned,
+          params.op.obj.version_num,
           params.op.obj.pg_ver, params.op.obj.zone_short_id,
           params.op.obj.obj_version, params.op.obj.obj_version_tag,
           params.op.obj.obj_attrs, params.op.obj.head_size,
@@ -1029,7 +1046,7 @@ class PutObjectOp: virtual public DBOp {
           params.op.obj.manifest_part_objs,
           params.op.obj.manifest_part_rules, params.op.obj.omap,
           params.op.obj.is_multipart, params.op.obj.mp_parts,
-          params.op.obj.head_data, params.op.obj.versions);
+          params.op.obj.head_data);
     }
 };
 
@@ -1062,7 +1079,7 @@ class GetObjectOp: virtual public DBOp {
       ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
       TailPlacementRuleName, TailPlacementStorageClass, \
       ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
-      HeadData, Versions from '{}' \
+      HeadData from '{}' \
       where BucketName = {} and ObjName = {} and ObjInstance = {}";
 
   public:
@@ -1092,7 +1109,7 @@ class ListBucketObjectsOp: virtual public DBOp {
       ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
       TailPlacementRuleName, TailPlacementStorageClass, \
       ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
-      where BucketName = {} and ObjName >= {} and ObjName LIKE {} ORDER BY ObjName ASC LIMIT {}";
+      where BucketName = {} and ObjName >= {} and ObjName LIKE {} ORDER BY ObjName ASC, VersionNum DESC LIMIT {}";
   public:
     virtual ~ListBucketObjectsOp() {}
 
@@ -1123,7 +1140,7 @@ class ListVersionedObjectsOp: virtual public DBOp {
       ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
       TailPlacementRuleName, TailPlacementStorageClass, \
       ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
-      HeadData, Versions from '{}' \
+      HeadData from '{}' \
       where BucketName = {} and ObjName = {} ORDER BY VersionNum DESC LIMIT {}";
   public:
     virtual ~ListVersionedObjectsOp() {}
@@ -1163,7 +1180,7 @@ class UpdateObjectOp: virtual public DBOp {
        HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \
        TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \
        ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \
-       IsMultipart = {}, MPPartsList = {}, HeadData = {}, Versions = {} \
+       IsMultipart = {}, MPPartsList = {}, HeadData = {} \
        WHERE ObjName = {} and ObjInstance = {} and BucketName = {}";
 
   public:
@@ -1219,7 +1236,7 @@ class UpdateObjectOp: virtual public DBOp {
           params.op.obj.manifest_part_objs,
           params.op.obj.manifest_part_rules, params.op.obj.omap,
           params.op.obj.is_multipart, params.op.obj.mp_parts,
-          params.op.obj.head_data, params.op.obj.versions,
+          params.op.obj.head_data, 
           params.op.obj.obj_name, params.op.obj.obj_instance,
           params.op.bucket.bucket_name);
       }
@@ -1525,6 +1542,8 @@ class DB {
       return db_name+"_"+bucket+"_objectdata_table"; }
     const std::string getObjectView(std::string bucket) {
       return db_name+"_"+bucket+"_object_view"; }
+    const std::string getObjectTrigger(std::string bucket) {
+      return db_name+"_"+bucket+"_object_trigger"; }
 
     std::map<std::string, class ObjectOp*> getObjectMap();
 
@@ -1894,7 +1913,6 @@ class DB {
             bool assume_noent, bool modify_tail);
         int write_meta(const DoutPrefixProvider *dpp, uint64_t size,
            uint64_t accounted_size, std::map<std::string, bufferlist>& attrs);
-        int write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params);
       };
 
       struct Delete {
@@ -1930,7 +1948,7 @@ class DB {
 
         int delete_obj(const DoutPrefixProvider *dpp);
         int delete_obj_impl(const DoutPrefixProvider *dpp, DBOpParams& del_params);
-        int delete_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& del_params);
+        int create_dm(const DoutPrefixProvider *dpp, DBOpParams& del_params);
       };
 
       /* XXX: the parameters may be subject to change. All we need is bucket name
@@ -1969,9 +1987,6 @@ class DB {
           const RGWBucketInfo& bucket_info, const rgw_obj& obj,
           off_t ofs, off_t end, uint64_t max_chunk_size,
           iterate_obj_cb cb, void *arg);
-      int update_obj_next_version(const DoutPrefixProvider *dpp,
-                                  rgw_bucket_dir_entry &obj_entry,
-                                  bool promote, uint64_t& version_num);
     };
     int get_obj_iterate_cb(const DoutPrefixProvider *dpp,
         const raw_obj& read_obj, off_t obj_ofs,
index f0aa39aac8b951a59773e7af8140362b03d3c360..27be84842714c0791bc67c69b5783e52b3097048 100644 (file)
@@ -42,7 +42,7 @@ using namespace std;
 
 #define SQL_BIND_TEXT(dpp, stmt, index, str, sdb)                      \
   do {                                                         \
-      rc = sqlite3_bind_text(stmt, index, str, -1, SQLITE_TRANSIENT);  \
+    rc = sqlite3_bind_text(stmt, index, str, -1, SQLITE_TRANSIENT);    \
     if (rc != SQLITE_OK) {                                             \
       ldpp_dout(dpp, 0)<<"sqlite bind text failed for index("          \
       <<index<<"), str("<<str<<") in stmt("    \
@@ -195,6 +195,9 @@ int SQLiteDB::InitPrepareParams(const DoutPrefixProvider *dpp,
     if (params->object_view.empty()) {
       params->object_view = getObjectView(bucket);
     }
+    if (params->object_trigger.empty()) {
+      params->object_trigger = getObjectTrigger(bucket);
+    }
     p_params.object_table = params->object_table;
     p_params.objectdata_table = params->objectdata_table;
     p_params.object_view = params->object_view;
@@ -791,6 +794,22 @@ int SQLiteDB::createObjectTable(const DoutPrefixProvider *dpp, DBOpParams *param
   return ret;
 }
 
+int SQLiteDB::createObjectTableTrigger(const DoutPrefixProvider *dpp, DBOpParams *params)
+{
+  int ret = -1;
+  string schema;
+
+  schema = CreateTableSchema("ObjectTrigger", params);
+
+  ret = exec(dpp, schema.c_str(), NULL);
+  if (ret)
+    ldpp_dout(dpp, 0)<<"CreateObjectTableTrigger failed " << dendl;
+
+  ldpp_dout(dpp, 20)<<"CreateObjectTableTrigger suceeded " << dendl;
+
+  return ret;
+}
+
 int SQLiteDB::createObjectView(const DoutPrefixProvider *dpp, DBOpParams *params)
 {
   int ret = -1;
@@ -1428,6 +1447,7 @@ int SQLInsertBucket::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *p
 
   (void)createObjectTable(dpp, params);
   (void)createObjectDataTable(dpp, params);
+  (void)createObjectTableTrigger(dpp, params);
 out:
   return ret;
 }
@@ -1761,8 +1781,13 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
 {
   int index = -1;
   int rc = 0;
+  int VersionNum = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
 
@@ -1848,7 +1873,7 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
   SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_versioned, sdb);
 
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.version_num, sdb);
-  SQL_BIND_INT(dpp, stmt, index, params->op.obj.version_num, sdb);
+  SQL_BIND_INT(dpp, stmt, index, VersionNum, sdb);
 
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.pg_ver, sdb);
   SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.pg_ver, sdb);
@@ -1943,6 +1968,10 @@ int SQLDeleteObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
 
@@ -1987,6 +2016,10 @@ int SQLGetObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
 
@@ -2063,6 +2096,10 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para
     goto out;
   }
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.bucket.bucket_name, sdb);
   SQL_BIND_TEXT(dpp, *stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
 
@@ -2273,6 +2310,10 @@ int SQLListBucketObjects::Bind(const DoutPrefixProvider *dpp, struct DBOpParams
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
 
@@ -2322,6 +2363,10 @@ int SQLListVersionedObjects::Bind(const DoutPrefixProvider *dpp, struct DBOpPara
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
 
@@ -2368,6 +2413,10 @@ int SQLPutObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *par
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
 
   SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
@@ -2445,6 +2494,10 @@ int SQLUpdateObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
 
@@ -2496,6 +2549,10 @@ int SQLGetObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *par
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
 
@@ -2544,6 +2601,10 @@ int SQLDeleteObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *
   int rc = 0;
   struct DBOpPrepareParams p_params = PrepareParams;
 
+  if (params->op.obj.state.obj.key.instance.empty()) {
+    params->op.obj.state.obj.key.instance = "null";
+  }
+
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
   SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
 
index 444400b04eb59b455d3b7250e10ce337de9eb1d3..038b24fe5b32a00695b4e8da5e268e5f559250c9 100644 (file)
@@ -51,6 +51,7 @@ class SQLiteDB : public DB, virtual public DBOp {
     int createObjectTable(const DoutPrefixProvider *dpp, DBOpParams *params);
     int createObjectDataTable(const DoutPrefixProvider *dpp, DBOpParams *params);
     int createObjectView(const DoutPrefixProvider *dpp, DBOpParams *params);
+    int createObjectTableTrigger(const DoutPrefixProvider *dpp, DBOpParams *params);
     int createQuotaTable(const DoutPrefixProvider *dpp, DBOpParams *params);
     void populate_object_params(const DoutPrefixProvider *dpp,
                                 struct DBOpPrepareParams& p_params,
index 2e8c535f6487b517b6556c97e29f2be1b43c8619..e87002f61b542d4ffd343743cfb2b97b166854b1 100644 (file)
@@ -112,6 +112,30 @@ namespace {
 
       void TearDown() {
       }
+
+      int write_object(const DoutPrefixProvider *dpp, DBOpParams params) {
+        DB::Object op_target(db, params.op.bucket.info,
+                             params.op.obj.state.obj);
+        DB::Object::Write write_op(&op_target);
+        map<string, bufferlist> setattrs;
+        ret = write_op.prepare(dpp);
+        if (ret)
+          return ret;
+
+        write_op.meta.mtime = &bucket_mtime;
+        write_op.meta.category = RGWObjCategory::Main;
+        write_op.meta.owner = params.op.user.uinfo.user_id;
+
+        bufferlist b1 = params.op.obj.head_data;
+        write_op.meta.data = &b1;
+
+        bufferlist b2;
+        encode("ACL", b2);
+        setattrs[RGW_ATTR_ACL] = b2;
+
+        ret = write_op.write_meta(0, params.op.obj.state.size, b1.length()+1, setattrs);
+        return ret;
+      }
   };
 }
 
@@ -681,6 +705,7 @@ TEST_F(DBStoreTest, GetObject) {
   decode(data, params.op.obj.head_data);
   ASSERT_EQ(data, "HELLO WORLD");
   ASSERT_EQ(params.op.obj.state.size, 12);
+  cout << "versionNum :" << params.op.obj.version_num << "\n";
 }
 
 TEST_F(DBStoreTest, GetObjectState) {
@@ -698,12 +723,14 @@ TEST_F(DBStoreTest, GetObjectState) {
   ASSERT_EQ(ret, 0);
   ASSERT_EQ(s->size, 12);
   ASSERT_EQ(s->is_olh, false);
+  cout << "versionNum :" << params.op.obj.version_num << "\n";
 
   /* Recheck with get_state API */
   ret = op_target.get_state(dpp, &s, false);
   ASSERT_EQ(ret, 0);
   ASSERT_EQ(s->size, 12);
   ASSERT_EQ(s->is_olh, false);
+  cout << "versionNum :" << params.op.obj.version_num << "\n";
 }
 
 TEST_F(DBStoreTest, ObjAttrs) {
@@ -762,29 +789,17 @@ TEST_F(DBStoreTest, ObjAttrs) {
 TEST_F(DBStoreTest, WriteObject) {
   struct DBOpParams params = GlobalParams;
   int ret = -1;
-  map<string, bufferlist> setattrs;
   params.op.obj.state.obj.key.name = "object3";
   params.op.obj.state.obj.key.instance = "inst3";
   DB::Object op_target(db, params.op.bucket.info,
       params.op.obj.state.obj);
-  DB::Object::Write write_op(&op_target);
-  ret = write_op.prepare(dpp);
-  ASSERT_EQ(ret, 0);
-
-  write_op.meta.mtime = &bucket_mtime;
-  write_op.meta.category = RGWObjCategory::Main;
-  write_op.meta.owner = params.op.user.uinfo.user_id;
 
   bufferlist b1;
   encode("HELLO WORLD - Object3", b1);
-  cout<<"XXXXXXXXX Insert b1.length " << b1.length() << "\n";
-  write_op.meta.data = &b1;
-
-  bufferlist b2;
-  encode("ACL", b2);
-  setattrs[RGW_ATTR_ACL] = b2;
+  params.op.obj.head_data = b1;
+  params.op.obj.state.size = 22;
 
-  ret = write_op.write_meta(0, 22, 25, setattrs);
+  ret = write_object(dpp, params);
   ASSERT_EQ(ret, 0);
 }
 
@@ -913,7 +928,7 @@ TEST_F(DBStoreTest, WriteVersionedObject) {
   encode("HELLO WORLD", b1);
   params.op.obj.head_data = b1;
   params.op.obj.state.size = 12;
-  ret = write_op.write_versioned_obj(dpp, params);
+  ret = write_object(dpp, params);
   ASSERT_EQ(ret, 0);
 
   /* Version2 */
@@ -922,7 +937,7 @@ TEST_F(DBStoreTest, WriteVersionedObject) {
   encode("HELLO WORLD ABC", b1);
   params.op.obj.head_data = b1;
   params.op.obj.state.size = 16;
-  ret = write_op.write_versioned_obj(dpp, params);
+  ret = write_object(dpp, params);
   ASSERT_EQ(ret, 0);
 
   /* Version3 */
@@ -931,7 +946,7 @@ TEST_F(DBStoreTest, WriteVersionedObject) {
   encode("HELLO WORLD A", b1);
   params.op.obj.head_data = b1;
   params.op.obj.state.size = 14;
-  ret = write_op.write_versioned_obj(dpp, params);
+  ret = write_object(dpp, params);
   ASSERT_EQ(ret, 0);
 }
 
@@ -949,14 +964,7 @@ TEST_F(DBStoreTest, ListVersionedObject) {
 
   i = 2;
   for (auto ent: params.op.obj.list_entries) {
-    string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
-    cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is:current = " << is_current << "\n";
 
-    if (i == 2) {
-      ASSERT_EQ(is_current, "true");
-    } else {
-      ASSERT_EQ(is_current, "false");
-    }
 
     ASSERT_EQ(ent.key.instance, instances[i]);
     i--;
@@ -1018,16 +1026,13 @@ TEST_F(DBStoreTest, DeleteVersionedObject) {
 
   i = 3;
   for (auto ent: params.op.obj.list_entries) {
-    string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
     string is_delete_marker = (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)? "true" : "false";
     cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_delete_marker = " << is_delete_marker << "\n";
 
     if (i == 3) {
       ASSERT_EQ(is_delete_marker, "true");
-      ASSERT_EQ(is_current, "true");
       dm_instance = ent.key.instance;
     } else {
-      ASSERT_EQ(is_current, "false");
       ASSERT_EQ(is_delete_marker, "false");
       ASSERT_EQ(ent.key.instance, instances[i]);
     }
@@ -1082,14 +1087,10 @@ TEST_F(DBStoreTest, DeleteVersionedObject) {
 
   i = 1;
   for (auto ent: params.op.obj.list_entries) {
-    string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
-    cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_current = " << is_current << "\n";
 
     if (i == 1) {
-      ASSERT_EQ(is_current, "true");
       dm_instance = ent.key.instance;
     } else {
-      ASSERT_EQ(is_current, "false");
       ASSERT_EQ(ent.key.instance, instances[i]);
     }