]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/dbstore: Object versioning feature support
authorSoumya Koduri <skoduri@redhat.com>
Wed, 25 May 2022 08:30:34 +0000 (14:00 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Mon, 25 Jul 2022 06:07:08 +0000 (11:37 +0530)
This commit adds support for Object versioning feature in DBStore

In DBStore, each object is uniquely identified by <objectName, objectNS, instance-id>
In addition, for each object upload, a unique objectID is created to handle racing writes.
Note: For non-versioned objects, both head and tail entries have instance-id empty ""

Versioned objects:
 - same as non-versioned objects but with instance-id & objectID set to version-id i.e,
   each version upload will have a unique versionID created which will act as that object's intanceID and objectID as well.
 - In addition a version-number is stored (starting with '1' & incremented sequentially) for each version/delete-marker being created for that object.
   This version-number is used to identify and promote/demote next object version as CURRENT.

Current status:
- Basic functionality seem to be working when the bucket is versioned.
- If an object is removed, only one delete-marker is created for now. Subsequent deletes will be no-op (unless the object is re-created & deleted with the same name)
- Added test cases to test put/get/delete/list of versioned objects

TODO (not addressed as part of this PR):
- Test various cases with versioning suspended & enabled.

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
src/rgw/store/dbstore/common/dbstore.cc
src/rgw/store/dbstore/common/dbstore.h
src/rgw/store/dbstore/sqlite/sqliteDB.cc
src/rgw/store/dbstore/sqlite/sqliteDB.h
src/rgw/store/dbstore/tests/dbstore_tests.cc

index ad71cd0bcbd99f89aaa5db71581c34a531646cf1..c9217a760605474dfb6168956bdd2a27bfea68f3 100644 (file)
@@ -151,6 +151,8 @@ std::shared_ptr<class DBOp> DB::getDBOp(const DoutPrefixProvider *dpp, std::stri
     return Ob->UpdateObject;
   if (!Op.compare("ListBucketObjects"))
     return Ob->ListBucketObjects;
+  if (!Op.compare("ListVersionedObjects"))
+    return Ob->ListVersionedObjects;
   if (!Op.compare("PutObjectData"))
     return Ob->PutObjectData;
   if (!Op.compare("UpdateObjectData"))
@@ -766,14 +768,22 @@ int DB::Bucket::List::list_objects(const DoutPrefixProvider *dpp, int64_t max,
     goto out;
   }
 
-
   for (auto& entry : db_params.op.obj.list_entries) {
+    if (!params.list_versions) {
+      if (!(entry.flags & rgw_bucket_dir_entry::FLAG_CURRENT) || 
+           (entry.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)) {
+        // skip all non-current entries and delete_marker
+        continue;
+      }
+    }
+
     if (count >= max) {
       *is_truncated = true;
       next_marker.name = entry.key.name;
       next_marker.instance = entry.key.instance;
       break;
     }
+
     if (!params.delim.empty()) {
     const std::string& objname = entry.key.name;
        const int delim_pos = objname.find(params.delim, params.prefix.size());
@@ -845,6 +855,26 @@ int DB::Object::InitializeParamsfromObject(const DoutPrefixProvider *dpp,
   return ret;
 }
 
+int DB::Object::get_object_impl(const DoutPrefixProvider *dpp, DBOpParams& params) {
+  int ret = 0;
+
+  if (params.op.obj.state.obj.key.name.empty()) {
+    /* Initialize */
+    store->InitializeParams(dpp, &params);
+    InitializeParamsfromObject(dpp, &params);
+  }
+
+  ret = store->ProcessOp(dpp, "GetObject", &params);
+
+  /* pick one field check if object exists */
+  if (!ret && !params.op.obj.state.exists) {
+    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
+    ret = -ENOENT;
+  }
+
+  return ret;
+}
+
 int DB::Object::obj_omap_set_val_by_key(const DoutPrefixProvider *dpp,
                                         const std::string& key, bufferlist& val,
                                         bool must_exist) {
@@ -852,22 +882,13 @@ int DB::Object::obj_omap_set_val_by_key(const DoutPrefixProvider *dpp,
 
   DBOpParams params = {};
 
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
   if (ret) {
-    ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  /* pick one field check if object exists */
-  if (!params.op.obj.state.exists) {
-    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
-    return -1;
-  }
-
   params.op.obj.omap[key] = val;
   params.op.query_str = "omap";
   params.op.obj.state.mtime = real_clock::now();
@@ -895,22 +916,13 @@ int DB::Object::obj_omap_get_vals_by_keys(const DoutPrefixProvider *dpp,
   if (!vals)
     return -1;
 
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
   if (ret) {
-    ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  /* pick one field check if object exists */
-  if (!params.op.obj.state.exists) {
-    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
-    return -1;
-  }
-
   omap = params.op.obj.omap;
 
   for (const auto& k :  keys) {
@@ -927,22 +939,13 @@ int DB::Object::add_mp_part(const DoutPrefixProvider *dpp,
 
   DBOpParams params = {};
 
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
   if (ret) {
-    ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  /* pick one field check if object exists */
-  if (!params.op.obj.state.exists) {
-    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
-    return -1;
-  }
-
   params.op.obj.mp_parts.push_back(info);
   params.op.query_str = "mp";
   params.op.obj.state.mtime = real_clock::now();
@@ -965,22 +968,13 @@ int DB::Object::get_mp_parts_list(const DoutPrefixProvider *dpp,
   DBOpParams params = {};
   std::map<std::string, bufferlist> omap;
 
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
   if (ret) {
-    ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  /* pick one field check if object exists */
-  if (!params.op.obj.state.exists) {
-    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
-    return -1;
-  }
-
   info = params.op.obj.mp_parts;
 
 out:
@@ -1009,22 +1003,13 @@ int DB::Object::obj_omap_get_all(const DoutPrefixProvider *dpp,
   if (!m)
     return -1;
 
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
   if (ret) {
-    ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  /* pick one field check if object exists */
-  if (!params.op.obj.state.exists) {
-    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
-    return -1;
-  }
-
   (*m) = params.op.obj.omap;
 
 out:
@@ -1045,22 +1030,13 @@ int DB::Object::obj_omap_get_vals(const DoutPrefixProvider *dpp,
   if (!m)
     return -1;
 
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
   if (ret) {
-    ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  /* pick one field check if object exists */
-  if (!params.op.obj.state.exists) {
-    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
-    return -1;
-  }
-
   omap = params.op.obj.omap;
 
   for (iter = omap.begin(); iter != omap.end(); ++iter) {
@@ -1090,22 +1066,13 @@ int DB::Object::set_attrs(const DoutPrefixProvider *dpp,
   rgw::sal::Attrs *attrs;
   map<string, bufferlist>::iterator iter;
 
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
   if (ret) {
-    ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  /* pick one field check if object exists */
-  if (!params.op.obj.state.exists) {
-    ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
-    return -1;
-  }
-
   /* For now lets keep it simple..rmattrs & setattrs ..
    * XXX: Check rgw_rados::set_attrs
    */
@@ -1244,53 +1211,25 @@ int DB::raw_obj::write(const DoutPrefixProvider *dpp, int64_t ofs, int64_t write
   return write_len;
 }
 
-int DB::Object::follow_olh(const DoutPrefixProvider *dpp,
-                           const RGWBucketInfo& bucket_info, RGWObjState* state,
-                           const rgw_obj& olh_obj, rgw_obj *target)
-{
-  auto iter = state->attrset.find(RGW_ATTR_OLH_INFO);
-  if (iter == state->attrset.end()) {
-    return -EINVAL;
-  }
-
-  DBOLHInfo olh;
-  string s;
-  const bufferlist& bl = iter->second;
-  try {
-    auto biter = bl.cbegin();
-    decode(olh, biter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
-
-  if (olh.removed) {
-    return -ENOENT;
-  }
-
-  *target = olh.target;
+int DB::Object::list_versioned_objects(const DoutPrefixProvider *dpp,
+                                       std::list<rgw_bucket_dir_entry>& list_entries) {
+  int ret = 0;
+  store = get_store();
+  DBOpParams db_params = {};
 
-  return 0;
-}
+  store->InitializeParams(dpp, &db_params);
+  InitializeParamsfromObject(dpp, &db_params);
 
-int DB::Object::get_olh_target_state(const DoutPrefixProvider *dpp,
-                              const RGWBucketInfo& bucket_info, const rgw_obj& obj,
-                              RGWObjState* olh_state, RGWObjState** target)
-{
-  int ret = 0;
-  rgw_obj target_obj;
+  db_params.op.list_max_count = MAX_VERSIONED_OBJECTS;
 
-  if (!olh_state->is_olh) {
-    return EINVAL;
-  }
+  ret = store->ProcessOp(dpp, "ListVersionedObjects", &db_params);
 
-  ret = follow_olh(dpp, bucket_info, olh_state, obj, &target_obj); /* might return -EAGAIN */
-  if (ret < 0) {
-    ldpp_dout(dpp, 0)<<"In get_olh_target_state follow_olh() failed err:(" <<ret<<")" << dendl;
-    return ret;
+  if (ret) {
+    ldpp_dout(dpp, 0)<<"In ListVersionedObjects failed err:(" <<ret<<") " << dendl;
+  } else {
+    list_entries = db_params.op.obj.list_entries;
   }
 
-  ret = get_obj_state(dpp, bucket_info, target_obj, false, target);
-
   return ret;
 }
 
@@ -1302,18 +1241,37 @@ int DB::Object::get_obj_state(const DoutPrefixProvider *dpp,
 
   DBOpParams params = {};
   RGWObjState* s;
-  store->InitializeParams(dpp, &params);
-  InitializeParamsfromObject(dpp, &params);
-
-  ret = store->ProcessOp(dpp, "GetObject", &params);
+  ret = get_object_impl(dpp, params);
 
-  if (ret) {
-    ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
+  if (ret && ret != -ENOENT) {
+    ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <<ret<<")" << dendl;
     goto out;
   }
 
-  if (!params.op.obj.state.exists) {
-    return -ENOENT;
+  /* Check if its a versioned object */
+  if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
+    if ((ret == -ENOENT) || (params.op.obj.flags & rgw_bucket_dir_entry::FLAG_VER)) { // NEWWWW
+      ret = list_versioned_objects(dpp, params.op.obj.list_entries);
+
+      if (params.op.obj.list_entries.size() != 0) {
+        /* versioned object. Read the latest version object provided its not a
+         * delete marker */
+        auto& ent = params.op.obj.list_entries.front();
+        if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+          ret = -ENOENT;
+          ldpp_dout(dpp, 0) <<" Latest is delete marker -  err:(" <<ret<<")" << dendl;
+          goto out;
+        }
+        params.op.obj.state.obj.key.instance = ent.key.instance;
+    
+        ret = get_object_impl(dpp, params);
+
+        if (ret) {
+          ldpp_dout(dpp, 0) <<"get_object_impl of versioned object failed err:(" <<ret<<")" << dendl;
+          goto out;
+        }
+      }
+    }
   }
 
   s = &params.op.obj.state;
@@ -1323,15 +1281,6 @@ int DB::Object::get_obj_state(const DoutPrefixProvider *dpp,
   *state = &obj_state;
   **state = *s;
 
-  if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
-    /* fetch current version obj details */
-    ret = get_olh_target_state(dpp, bucket_info, obj, s, state);
-
-    if (ret < 0) {
-      ldpp_dout(dpp, 0)<<"get_olh_target_state failed err:(" <<ret<<")" << dendl;
-    }
-  }
-
 out:
   return ret;
 
@@ -1367,7 +1316,6 @@ int DB::Object::Read::prepare(const DoutPrefixProvider *dpp)
 
   RGWObjState* astate;
 
-  /* XXX Read obj_id too */
   int r = source->get_state(dpp, &astate, true);
   if (r < 0)
     return r;
@@ -1640,11 +1588,16 @@ int DB::Object::Write::prepare(const DoutPrefixProvider* dpp)
 
   obj_state.obj = target->obj;
  
-  if (target->obj_id.empty()) { 
-    // generate obj_id
-    char buf[33];
-    gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
-    target->obj_id = target->obj.key.name + "." + buf;
+  if (target->obj_id.empty()) {
+    if (!target->obj.key.instance.empty() && (target->obj.key.instance != "null")) {
+      /* versioned object. Set obj_id same as versionID/instance */
+      target->obj_id = target->obj.key.instance;
+    } else {
+      // generate obj_id
+      char buf[33];
+      gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+      target->obj_id = buf;
+    }
   }
 
   ret = 0;
@@ -1699,6 +1652,109 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp,
   return 0;
 }
 
+/*
+ * If versioned, 
+ * - check if the old version of the object already exists
+ *   if exists,
+ *     - set its version-id/instance to "null" if instance was empty
+ *     - demote it to NON_CURRENT 
+ * - create versioned object with FLAG_CURRENT set 
+ *
+ * XXX: call this function under a lock or sqlite transaction to prevent
+ * parallel object uploads marking both the versions as FLAG_CURRENT;
+ *
+ */
+int DB::Object::Write::write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params) {
+  DB *store = target->get_store();
+  int ret = -1;
+  DBOpParams obj_params = {};
+  uint64_t version_num = 0;
+
+  store->InitializeParams(dpp, &obj_params);
+  target->InitializeParamsfromObject(dpp, &obj_params);
+
+  /* Check if already exists */
+  obj_params.op.obj.state.obj.key.instance.clear();
+  ret = target->list_versioned_objects(dpp, obj_params.op.obj.list_entries);
+  
+  if (ret && ret != -ENOENT) {
+    ldpp_dout(dpp, 0)<<"ListVersionedObjects of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
+    return ret;
+  }
+
+  for (auto& ent : obj_params.op.obj.list_entries) {
+    ret = target->update_obj_next_version(dpp, ent, false, version_num); // demote previous version
+    if (ret != ENOENT) {
+      break;
+    }
+    // continue to next object version
+  }
+
+  if (ret) {
+    ldpp_dout(dpp, 0)<<"demote_next_version of object (" << obj_params.op.obj.state.obj.key.name << ") failed err:(" <<ret<<")" << dendl;
+    return ret;
+  }
+
+  params.op.obj.version_num = ++version_num;
+
+  /* Put Object */
+  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+  ret = store->ProcessOp(dpp, "PutObject", &params);
+  if (ret) {
+    ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
+  }
+
+  return ret;
+}
+
+/* promote or demote next object version */
+int DB::Object::update_obj_next_version(const DoutPrefixProvider *dpp,
+                                rgw_bucket_dir_entry &obj_entry,
+                                bool promote, uint64_t& version_num) {
+  int ret = -1;
+  DBOpParams params = {};
+
+  params.op.obj.state.obj.key.name  = obj_entry.key.name;
+  params.op.obj.state.obj.key.instance  = obj_entry.key.instance;
+
+  // fetch the last version of the object
+  store->InitializeParams(dpp, &params);
+  params.op.bucket.info.bucket.name = bucket_info.bucket.name;
+
+  ret = get_object_impl(dpp, params);
+
+  if (ret) { // do nothing as its best effort? 
+    ldpp_dout(dpp, 0)<<"In demote_object_next_ver get of prev version(" << params.op.obj.obj_id << ") failed err:(" <<ret<<")" << dendl;
+    goto out;
+  }
+
+  if (params.op.obj.state.obj.key.instance.empty()) {
+    /* This means, previous version of object exists which was created before
+     * versioning is enabled on the bucket.
+     *
+     * Set its instance to "null" and mark it as versioned.
+     */
+    params.op.obj.state.obj.key.instance = "null";
+    params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+  }
+  version_num = params.op.obj.version_num;
+
+  if (promote) {
+    params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+  } else {
+    params.op.obj.flags &= ~(rgw_bucket_dir_entry::FLAG_CURRENT);
+  }
+  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+
+  ret = store->ProcessOp(dpp, "PutObject", &params); // XXX: can be UpdateObject?
+
+  if (ret) { // do nothing as its best effort? 
+    ldpp_dout(dpp, 0)<<"Put of Old version of object failed err:(" <<ret<<")" << dendl;
+  }
+out :
+    return ret;
+}
+
 /* Write metadata & head object data */
 int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
     uint64_t size, uint64_t accounted_size,
@@ -1741,10 +1797,6 @@ int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
     }
   }
 
-  if (state->is_olh) {
-    (*attrset)[RGW_ATTR_OLH_ID_TAG] = state->olh_tag;
-  }
-
   state->mtime = meta.set_mtime;
 
   if (meta.data) {
@@ -1804,22 +1856,31 @@ int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
   params.op.obj.state.accounted_size = accounted_size;
   params.op.obj.owner = target->get_bucket_info().owner.id;
 
-  /* XXX: handle versioning */
   if (meta.mtime) {
     *meta.mtime = meta.set_mtime;
   }
 
   params.op.query_str = "meta";
   params.op.obj.obj_id = target->obj_id;
-  ret = store->ProcessOp(dpp, "PutObject", &params);
 
-  if (ret) {
-    ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
-    goto out;
+  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+
+  /* Check if versioned */
+  bool is_versioned = !target->obj.key.instance.empty() && (target->obj.key.instance != "null");
+  params.op.obj.is_versioned = is_versioned;
+
+  if (!is_versioned) {
+    ret = store->ProcessOp(dpp, "PutObject", &params);
+    if (ret) {
+      ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
+      goto out;
+    }
+    return ret;
   }
 
-  /* pick one field check if object exists */
-  return 0;
+  /* versioned object */
+  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER;
+  ret = write_versioned_obj(dpp, params);
 
 out:
   if (ret < 0) {
@@ -1842,24 +1903,38 @@ int DB::Object::Write::write_meta(const DoutPrefixProvider *dpp, uint64_t size,
 
 int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
   int ret = 0;
-  DB *store = target->get_store();
-  RGWObjState* astate;
+  DBOpParams del_params = {};
 
-  int r = target->get_state(dpp, &astate, true);
-  if (r < 0)
-    return r;
+  ret = target->get_object_impl(dpp, del_params);
 
-  if (!astate->exists) {
-    return -ENOENT;
+  if (ret < 0 && ret != -ENOENT) {
+    ldpp_dout(dpp, 0)<<"GetObject during delete failed err:(" <<ret<<")" << dendl;
+    return ret;
   }
 
-  /* XXX: handle versioned objects. Create delete marker */
+  if (!ret && !(del_params.op.obj.flags & 
+         (rgw_bucket_dir_entry::FLAG_DELETE_MARKER | rgw_bucket_dir_entry::FLAG_VER))) {
+    /* Non versioned objects. Simple delete */
+    ret = delete_obj_impl(dpp, del_params);
+    return ret;
+  }
 
-  /* XXX: check params conditions */
-  DBOpParams del_params = {};
+  /* check if it is versioned object. */
+  ret = target->list_versioned_objects(dpp, del_params.op.obj.list_entries);
+
+  if (!del_params.op.obj.list_entries.empty()) {
+    ret = delete_versioned_obj(dpp, del_params);
+  } else {
+    ret = -ENOENT;
+  }
+
+  return ret;
+}
 
-  store->InitializeParams(dpp, &del_params);
-  target->InitializeParamsfromObject(dpp, &del_params);
+int DB::Object::Delete::delete_obj_impl(const DoutPrefixProvider *dpp,
+                                        DBOpParams& del_params) {
+  int ret = 0;
+  DB *store = target->get_store();
 
   ret = store->ProcessOp(dpp, "DeleteObject", &del_params);
   if (ret) {
@@ -1876,16 +1951,117 @@ int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
    * its corresponding head object is deleted (like here in this case).
    */
   DBOpParams update_params = del_params;
-  update_params.op.obj.obj_id = astate->shadow_obj; // objectID is copied here in get_state()
   update_params.op.obj.state.mtime = real_clock::now();
   ret = store->ProcessOp(dpp, "UpdateObjectData", &update_params);
 
   if (ret) {
     ldpp_dout(dpp, 0) << "Updating tail objects mtime failed err:(" <<ret<<")" << dendl;
-    return ret;
   }
+  return ret;
+}
 
-  return 0;
+/*
+ * a) if no versionID specified,
+ *  - create a delete marker with 
+ *    - new version/instanceID (if bucket versioned)
+ *    - null versionID (if versioning suspended)
+ *    - demote the current version entry
+ *  XXX: If delete_marker is the latest version, another delete_marker
+ *    is not created for now (for any subsequent deletes) . Will revisit this if
+ *    needed.
+ *
+ * b) If versionID provided,
+ *  - delete that particular entry
+ *  - Incase the entry is current entry, promote next object version to CURRENT.
+ *
+ * XXX: call this function under a lock or sqlite transaction to prevent
+ * parallel object uploads marking both the versions as FLAG_CURRENT;
+ */
+int DB::Object::Delete::delete_versioned_obj(const DoutPrefixProvider *dpp,
+                                             DBOpParams& del_params) {
+
+  DB *store = target->get_store();
+  bool versioning_enabled = (params.versioning_status & BUCKET_VERSIONED); 
+  int ret = -1;
+  DBOpParams olh_params = {};
+  std::string version_id;
+  DBOpParams next_params = del_params;
+
+  version_id = del_params.op.obj.state.obj.key.instance;
+  auto &ent = del_params.op.obj.list_entries.front();
+  uint64_t version_num = MAX_VERSIONED_OBJECTS;
+
+  if (version_id.empty()) {
+    /* create delete marker */
+
+    if (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER) {
+      // for now do not create another delete marker..just exit
+      return 0;
+    }
+    DBOpParams dm_params = del_params;
+
+    //demote current entry
+    target->update_obj_next_version(dpp, ent, false, version_num); //XXX: not checking return status
+
+    // create delete marker
+    store->InitializeParams(dpp, &dm_params);
+    target->InitializeParamsfromObject(dpp, &dm_params);
+
+    if (!versioning_enabled) {
+      dm_params.op.obj.state.obj.key.instance = "null";
+    } else {
+      store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key);
+      dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance;
+    }
+
+    dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER);
+    dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_CURRENT);
+    dm_params.op.obj.version_num = ++version_num; // max obj version list
+
+    ret = store->ProcessOp(dpp, "PutObject", &dm_params);
+
+    if (ret) {
+      ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <<ret<<")" << dendl;
+      return ret;
+    }
+  } else {
+    // delete paritcular version-id
+    DBOpParams dm_params = {};
+
+    store->InitializeParams(dpp, &dm_params);
+    target->InitializeParamsfromObject(dpp, &dm_params);
+
+    dm_params.op.obj.state.obj.key.instance = version_id;
+
+    ret = target->get_object_impl(dpp, dm_params);
+
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "get obj of versioned object failed - err:(" <<ret<<")" << dendl;
+      return ret;
+    }
+    ret = delete_obj_impl(dpp, dm_params);
+    if (dm_params.op.obj.flags & rgw_bucket_dir_entry::FLAG_CURRENT) {
+      bool found = false;
+      std::list<rgw_bucket_dir_entry>::iterator e;
+      for (e = del_params.op.obj.list_entries.begin(); e != del_params.op.obj.list_entries.end(); e++) {
+
+        if (e->key.instance == version_id) {
+          e++;
+          if (e != del_params.op.obj.list_entries.end()) {
+            found = true;
+          }
+            break;
+        }
+      }
+      if (found) {
+        uint64_t version_num = 0;
+        rgw_bucket_dir_entry& e1 = *e;
+        target->update_obj_next_version(dpp, e1, true, version_num);
+      }
+    }
+  }
+
+  return ret;
 }
 
 int DB::get_entry(const std::string& oid, const std::string& marker,
@@ -2089,6 +2265,7 @@ int DB::delete_stale_objs(const DoutPrefixProvider *dpp, const std::string& buck
   ret = ProcessOp(dpp, "GetBucket", &params);
   if (ret) {
     ldpp_dout(dpp, 0) << "In GetBucket failed err:(" <<ret<<")" << dendl;
+    return ret;
   }
 
   ldpp_dout(dpp, 20) << " Deleting stale_objs of bucket( " << bucket <<")" << dendl;
index 5f84689134f6571f3aa62b5d038264e8b620e4c3..ed90285d162b7dc08a60233f110aff85124bc14c 100644 (file)
@@ -51,7 +51,7 @@ struct DBOpBucketInfo {
 
 struct DBOpObjectInfo {
   RGWAccessControlPolicy acls;
-  RGWObjState state;
+  RGWObjState state = {};
 
   /* Below are taken from rgw_bucket_dir_entry */
   RGWObjCategory category;
@@ -91,6 +91,10 @@ struct DBOpObjectInfo {
   std::string prefix;
   std::list<rgw_bucket_dir_entry> list_entries;
   /* XXX: Maybe use std::vector instead of std::list */
+
+  /* for versioned objects */
+  bool is_versioned;
+  uint64_t version_num = 1; // default value for plain entries (non-versioned)
 };
 
 struct DBOpObjectDataInfo {
@@ -259,8 +263,8 @@ struct DBOpObjectPrepareInfo {
   static constexpr const char* fake_tag = ":fake_tag";
   static constexpr const char* shadow_obj = ":shadow_obj";
   static constexpr const char* has_data = ":has_data";
-  static constexpr const char* is_olh = ":is_ols";
-  static constexpr const char* olh_tag = ":olh_tag";
+  static constexpr const char* is_versioned = ":is_versioned";
+  static constexpr const char* version_num = ":version_num";
   static constexpr const char* pg_ver = ":pg_ver";
   static constexpr const char* zone_short_id = ":zone_short_id";
   static constexpr const char* obj_version = ":obj_version";
@@ -288,6 +292,7 @@ struct DBOpObjectPrepareInfo {
   static constexpr const char* new_obj_name = ":new_obj_name";
   static constexpr const char* new_obj_instance = ":new_obj_instance";
   static constexpr const char* new_obj_ns  = ":new_obj_ns";
+  static constexpr const char* versions = ":versions";
 };
 
 struct DBOpObjectDataPrepareInfo {
@@ -369,6 +374,7 @@ class ObjectOp {
     std::shared_ptr<class GetObjectOp> GetObject;
     std::shared_ptr<class UpdateObjectOp> UpdateObject;
     std::shared_ptr<class ListBucketObjectsOp> ListBucketObjects;
+    std::shared_ptr<class ListVersionedObjectsOp> ListVersionedObjects;
     std::shared_ptr<class PutObjectDataOp> PutObjectData;
     std::shared_ptr<class UpdateObjectDataOp> UpdateObjectData;
     std::shared_ptr<class GetObjectDataOp> GetObjectData;
@@ -542,8 +548,8 @@ class DBOp {
       FakeTag BOOL,   \
       ShadowObj   TEXT,   \
       HasData  BOOL,  \
-      IsOLH BOOL,  \
-      OLHTag    BLOB, \
+      IsVersioned BOOL,  \
+      VersionNum  INTEGER, \
       PGVer   INTEGER, \
       ZoneShortID  INTEGER,  \
       ObjVersion   INTEGER,    \
@@ -563,6 +569,7 @@ class DBOp {
       IsMultipart     BOOL,   \
       MPPartsList    BLOB,   \
       HeadData  BLOB,   \
+      VERSIONS BLOB,    \
       PRIMARY KEY (ObjName, ObjInstance, BucketName), \
       FOREIGN KEY (BucketName) \
       REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);";
@@ -784,7 +791,7 @@ class GetUserOp: virtual public DBOp {
                                   System, PlacementName, PlacementStorageClass, PlacementTags, \
                                   BucketQuota, TempURLKeys, UserQuota, Type, MfaIDs, AssumedRoleARN, \
                                   UserAttrs, UserVersion, UserVersionTag \
-                                  from '{}' where Tenant = {} and UserID = {} and NS = {}";
+                                  from '{}' where UserID = {}";
 
   public:
     virtual ~GetUserOp() {}
@@ -800,9 +807,7 @@ class GetUserOp: virtual public DBOp {
       } else if (params.op.query_str == "user_id") {
         return fmt::format(QueryByUserID,
             params.user_table,
-            params.op.user.tenant,
-            params.op.user.user_id,
-            params.op.user.ns);
+            params.op.user.user_id);
       } else {
         return fmt::format(Query, params.user_table,
             params.op.user.user_id);
@@ -985,13 +990,14 @@ class PutObjectOp: virtual public DBOp {
        Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
        StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
        AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
-       ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
+       ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
        ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
        ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
        TailPlacementRuleName, TailPlacementStorageClass, \
-       ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData )     \
+       ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
+       HeadData, Versions)     \
       VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
-          {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
+          {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
           {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
 
   public:
@@ -1010,7 +1016,7 @@ class PutObjectOp: virtual public DBOp {
           params.op.obj.accounted_size, params.op.obj.mtime,
           params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag,
           params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj,
-          params.op.obj.has_data, params.op.obj.is_olh, params.op.obj.olh_tag,
+          params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num,
           params.op.obj.pg_ver, params.op.obj.zone_short_id,
           params.op.obj.obj_version, params.op.obj.obj_version_tag,
           params.op.obj.obj_attrs, params.op.obj.head_size,
@@ -1022,7 +1028,8 @@ class PutObjectOp: virtual public DBOp {
           params.op.obj.tail_placement_storage_class,
           params.op.obj.manifest_part_objs,
           params.op.obj.manifest_part_rules, params.op.obj.omap,
-          params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data);
+          params.op.obj.is_multipart, params.op.obj.mp_parts,
+          params.op.obj.head_data, params.op.obj.versions);
     }
 };
 
@@ -1050,11 +1057,12 @@ class GetObjectOp: virtual public DBOp {
       Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
       StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
       AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
-      ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
+      ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
       ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
       ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
       TailPlacementRuleName, TailPlacementStorageClass, \
-      ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
+      ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
+      HeadData, Versions from '{}' \
       where BucketName = {} and ObjName = {} and ObjInstance = {}";
 
   public:
@@ -1079,7 +1087,7 @@ class ListBucketObjectsOp: virtual public DBOp {
       Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
       StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
       AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
-      ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
+      ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
       ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
       ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
       TailPlacementRuleName, TailPlacementStorageClass, \
@@ -1099,6 +1107,37 @@ class ListBucketObjectsOp: virtual public DBOp {
     }
 };
 
+#define MAX_VERSIONED_OBJECTS 20
+class ListVersionedObjectsOp: virtual public DBOp {
+  private:
+    // once we have stats also stored, may have to update this query to join
+    // these two tables.
+    static constexpr std::string_view Query =
+      "SELECT  \
+      ObjName, ObjInstance, ObjNS, BucketName, ACLs, IndexVer, Tag, \
+      Flags, VersionedEpoch, ObjCategory, Etag, Owner, OwnerDisplayName, \
+      StorageClass, Appendable, ContentType, IndexHashSource, ObjSize, \
+      AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
+      ShadowObj, HasData, IsVersioned, VersionNum, PGVer, ZoneShortID, \
+      ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
+      ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
+      TailPlacementRuleName, TailPlacementStorageClass, \
+      ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, \
+      HeadData, Versions from '{}' \
+      where BucketName = {} and ObjName = {} ORDER BY VersionNum DESC LIMIT {}";
+  public:
+    virtual ~ListVersionedObjectsOp() {}
+
+    static std::string Schema(DBOpPrepareParams &params) {
+      /* XXX: Include obj_id, delim */
+      return fmt::format(Query,
+          params.object_table,
+          params.op.bucket.bucket_name,
+          params.op.obj.obj_name,
+          params.op.list_max_count);
+    }
+};
+
 class UpdateObjectOp: virtual public DBOp {
   private:
     // Updates Omap
@@ -1118,13 +1157,13 @@ class UpdateObjectOp: virtual public DBOp {
        StorageClass = {}, Appendable = {}, ContentType = {}, \
        IndexHashSource = {}, ObjSize = {}, AccountedSize = {}, Mtime = {}, \
        Epoch = {}, ObjTag = {}, TailTag = {}, WriteTag = {}, FakeTag = {}, \
-       ShadowObj = {}, HasData = {}, IsOLH = {}, OLHTag = {}, PGVer = {}, \
+       ShadowObj = {}, HasData = {}, IsVersioned = {}, VersionNum = {}, PGVer = {}, \
        ZoneShortID = {}, ObjVersion = {}, ObjVersionTag = {}, ObjAttrs = {}, \
        HeadSize = {}, MaxHeadSize = {}, ObjID = {}, TailInstance = {}, \
        HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \
        TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \
        ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \
-       IsMultipart = {}, MPPartsList = {}, HeadData = {} \
+       IsMultipart = {}, MPPartsList = {}, HeadData = {}, Versions = {} \
        WHERE ObjName = {} and ObjInstance = {} and BucketName = {}";
 
   public:
@@ -1167,7 +1206,7 @@ class UpdateObjectOp: virtual public DBOp {
           params.op.obj.accounted_size, params.op.obj.mtime,
           params.op.obj.epoch, params.op.obj.obj_tag, params.op.obj.tail_tag,
           params.op.obj.write_tag, params.op.obj.fake_tag, params.op.obj.shadow_obj,
-          params.op.obj.has_data, params.op.obj.is_olh, params.op.obj.olh_tag,
+          params.op.obj.has_data, params.op.obj.is_versioned, params.op.obj.version_num,
           params.op.obj.pg_ver, params.op.obj.zone_short_id,
           params.op.obj.obj_version, params.op.obj.obj_version_tag,
           params.op.obj.obj_attrs, params.op.obj.head_size,
@@ -1179,7 +1218,8 @@ class UpdateObjectOp: virtual public DBOp {
           params.op.obj.tail_placement_storage_class,
           params.op.obj.manifest_part_objs,
           params.op.obj.manifest_part_rules, params.op.obj.omap,
-          params.op.obj.is_multipart, params.op.obj.mp_parts, params.op.obj.head_data,
+          params.op.obj.is_multipart, params.op.obj.mp_parts,
+          params.op.obj.head_data, params.op.obj.versions,
           params.op.obj.obj_name, params.op.obj.obj_instance,
           params.op.bucket.bucket_name);
       }
@@ -1233,6 +1273,7 @@ class UpdateObjectDataOp: virtual public DBOp {
           params.op.obj.obj_id);
     }
 };
+
 class GetObjectDataOp: virtual public DBOp {
   private:
     static constexpr std::string_view Query =
@@ -1853,6 +1894,7 @@ class DB {
             bool assume_noent, bool modify_tail);
         int write_meta(const DoutPrefixProvider *dpp, uint64_t size,
            uint64_t accounted_size, std::map<std::string, bufferlist>& attrs);
+        int write_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& params);
       };
 
       struct Delete {
@@ -1887,19 +1929,19 @@ class DB {
         explicit Delete(DB::Object *_target) : target(_target) {}
 
         int delete_obj(const DoutPrefixProvider *dpp);
+        int delete_obj_impl(const DoutPrefixProvider *dpp, DBOpParams& del_params);
+        int delete_versioned_obj(const DoutPrefixProvider *dpp, DBOpParams& del_params);
       };
 
       /* XXX: the parameters may be subject to change. All we need is bucket name
        * & obj name,instance - keys */
+      int get_object_impl(const DoutPrefixProvider *dpp, DBOpParams& params);
       int get_obj_state(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
                         const rgw_obj& obj,
                         bool follow_olh, RGWObjState **state);
-      int get_olh_target_state(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
-          RGWObjState* olh_state, RGWObjState** target);
-      int follow_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState *state,
-          const rgw_obj& olh_obj, rgw_obj *target);
-
       int get_state(const DoutPrefixProvider *dpp, RGWObjState **pstate, bool follow_olh);
+      int list_versioned_objects(const DoutPrefixProvider *dpp,
+                                 std::list<rgw_bucket_dir_entry>& list_entries);
 
       DB *get_store() { return store; }
       rgw_obj& get_obj() { return obj; }
@@ -1927,6 +1969,9 @@ class DB {
           const RGWBucketInfo& bucket_info, const rgw_obj& obj,
           off_t ofs, off_t end, uint64_t max_chunk_size,
           iterate_obj_cb cb, void *arg);
+      int update_obj_next_version(const DoutPrefixProvider *dpp,
+                                  rgw_bucket_dir_entry &obj_entry,
+                                  bool promote, uint64_t& version_num);
     };
     int get_obj_iterate_cb(const DoutPrefixProvider *dpp,
         const raw_obj& read_obj, off_t obj_ofs,
index 649c644bebfa1f34db609fb0a3ae201311c5f10f..f0aa39aac8b951a59773e7af8140362b03d3c360 100644 (file)
@@ -42,12 +42,7 @@ using namespace std;
 
 #define SQL_BIND_TEXT(dpp, stmt, index, str, sdb)                      \
   do {                                                         \
-    if (strcmp(str, "null") == 0) {          \
-      rc = sqlite3_bind_text(stmt, index, "", -1, SQLITE_TRANSIENT);   \
-    } else {                                                       \
       rc = sqlite3_bind_text(stmt, index, str, -1, SQLITE_TRANSIENT);  \
-    }                                   \
-    \
     if (rc != SQLITE_OK) {                                             \
       ldpp_dout(dpp, 0)<<"sqlite bind text failed for index("          \
       <<index<<"), str("<<str<<") in stmt("    \
@@ -309,8 +304,8 @@ enum GetObject {
   FakeTag,
   ShadowObj,
   HasData,
-  IsOLH,
-  OLHTag,
+  IsVersioned,
+  VersionNum,
   PGVer,
   ZoneShortID,
   ObjVersion,
@@ -329,7 +324,8 @@ enum GetObject {
   Omap,
   IsMultipart,
   MPPartsList,
-  HeadData
+  HeadData,
+  Versions
 };
 
 enum GetObjectData {
@@ -497,8 +493,8 @@ static int list_object(const DoutPrefixProvider *dpp, DBOpInfo &op, sqlite3_stmt
   op.obj.state.fake_tag = sqlite3_column_int(stmt, FakeTag);
   op.obj.state.shadow_obj = (const char*)sqlite3_column_text(stmt, ShadowObj);
   op.obj.state.has_data = sqlite3_column_int(stmt, HasData); 
-  op.obj.state.is_olh = sqlite3_column_int(stmt, IsOLH); 
-  SQL_DECODE_BLOB_PARAM(dpp, stmt, OLHTag, op.obj.state.olh_tag, sdb);
+  op.obj.is_versioned = sqlite3_column_int(stmt, IsVersioned); 
+  op.obj.version_num = sqlite3_column_int(stmt, VersionNum); 
   op.obj.state.pg_ver = sqlite3_column_int(stmt, PGVer); 
   op.obj.state.zone_short_id = sqlite3_column_int(stmt, ZoneShortID); 
   op.obj.state.objv_tracker.read_version.ver = sqlite3_column_int(stmt, ObjVersion); 
@@ -1046,6 +1042,7 @@ int SQLObjectOp::InitializeObjectOps(string db_name, const DoutPrefixProvider *d
   GetObject = make_shared<SQLGetObject>(sdb, db_name, cct);
   UpdateObject = make_shared<SQLUpdateObject>(sdb, db_name, cct);
   ListBucketObjects = make_shared<SQLListBucketObjects>(sdb, db_name, cct);
+  ListVersionedObjects = make_shared<SQLListVersionedObjects>(sdb, db_name, cct);
   PutObjectData = make_shared<SQLPutObjectData>(sdb, db_name, cct);
   UpdateObjectData = make_shared<SQLUpdateObjectData>(sdb, db_name, cct);
   GetObjectData = make_shared<SQLGetObjectData>(sdb, db_name, cct);
@@ -1270,14 +1267,8 @@ int SQLGetUser::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
       SQL_BIND_TEXT(dpp, ak_stmt, index, access_key.c_str(), sdb);
     }
   } else if (params->op.query_str == "user_id") { 
-    SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.tenant, sdb);
-    SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.tenant.c_str(), sdb);
-
     SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.user_id, sdb);
     SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.id.c_str(), sdb);
-
-    SQL_BIND_INDEX(dpp, userid_stmt, index, p_params.op.user.ns, sdb);
-    SQL_BIND_TEXT(dpp, userid_stmt, index, params->op.user.uinfo.user_id.ns.c_str(), sdb);
   } else { // by default by userid
     SQL_BIND_INDEX(dpp, stmt, index, p_params.op.user.user_id, sdb);
     SQL_BIND_TEXT(dpp, stmt, index, params->op.user.uinfo.user_id.id.c_str(), sdb);
@@ -1853,11 +1844,11 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.has_data, sdb);
   SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.has_data, sdb);
 
-  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_olh, sdb);
-  SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.is_olh, sdb);
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.is_versioned, sdb);
+  SQL_BIND_INT(dpp, stmt, index, params->op.obj.is_versioned, sdb);
 
-  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.olh_tag, sdb);
-  SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.state.olh_tag, sdb);
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.version_num, sdb);
+  SQL_BIND_INT(dpp, stmt, index, params->op.obj.version_num, sdb);
 
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.pg_ver, sdb);
   SQL_BIND_INT(dpp, stmt, index, params->op.obj.state.pg_ver, sdb);
@@ -1916,7 +1907,6 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
   SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.head_data, sdb);
   SQL_ENCODE_BLOB_PARAM(dpp, stmt, index, params->op.obj.head_data, sdb);
 
-
 out:
   return rc;
 }
@@ -2167,11 +2157,11 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para
     SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.has_data, sdb);
     SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.has_data, sdb);
 
-    SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_olh, sdb);
-    SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.is_olh, sdb);
+    SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.is_versioned, sdb);
+    SQL_BIND_INT(dpp, *stmt, index, params->op.obj.is_versioned, sdb);
 
-    SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.olh_tag, sdb);
-    SQL_ENCODE_BLOB_PARAM(dpp, *stmt, index, params->op.obj.state.olh_tag, sdb);
+    SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.version_num, sdb);
+    SQL_BIND_INT(dpp, *stmt, index, params->op.obj.version_num, sdb);
 
     SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.pg_ver, sdb);
     SQL_BIND_INT(dpp, *stmt, index, params->op.obj.state.pg_ver, sdb);
@@ -2308,6 +2298,52 @@ out:
   return ret;
 }
 
+int SQLListVersionedObjects::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+  int ret = -1;
+  struct DBOpPrepareParams p_params = PrepareParams;
+
+  if (!*sdb) {
+    ldpp_dout(dpp, 0)<<"In SQLListVersionedObjects - no db" << dendl;
+    goto out;
+  }
+
+  InitPrepareParams(dpp, p_params, params);
+
+  SQL_PREPARE(dpp, p_params, sdb, stmt, ret, "PrepareListVersionedObjects");
+
+out:
+  return ret;
+}
+
+int SQLListVersionedObjects::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+  int index = -1;
+  int rc = 0;
+  struct DBOpPrepareParams p_params = PrepareParams;
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.bucket.bucket_name, sdb);
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_name, sdb);
+  SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.name.c_str(), sdb);
+
+  SQL_BIND_INDEX(dpp, stmt, index, p_params.op.list_max_count, sdb);
+  SQL_BIND_INT(dpp, stmt, index, params->op.list_max_count, sdb);
+
+out:
+  return rc;
+}
+
+int SQLListVersionedObjects::Execute(const DoutPrefixProvider *dpp, struct DBOpParams *params)
+{
+  int ret = -1;
+
+  SQL_EXECUTE(dpp, params, stmt, list_object);
+out:
+  return ret;
+}
+
 int SQLPutObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *params)
 {
   int ret = -1;
index 836a8702a03e203bc2f2fc1e47aa67456b0b7c7d..444400b04eb59b455d3b7250e10ce337de9eb1d3 100644 (file)
@@ -327,6 +327,24 @@ class SQLListBucketObjects : public SQLiteDB, public ListBucketObjectsOp {
     int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
 };
 
+class SQLListVersionedObjects : public SQLiteDB, public ListVersionedObjectsOp {
+  private:
+    sqlite3 **sdb = NULL;
+    sqlite3_stmt *stmt = NULL; // Prepared statement
+
+  public:
+    SQLListVersionedObjects(void **db, std::string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
+    SQLListVersionedObjects(sqlite3 **sdbi, std::string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
+
+    ~SQLListVersionedObjects() {
+      if (stmt)
+        sqlite3_finalize(stmt);
+    }
+    int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params);
+    int Execute(const DoutPrefixProvider *dpp, DBOpParams *params);
+    int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
+};
+
 class SQLPutObjectData : public SQLiteDB, public PutObjectDataOp {
   private:
     sqlite3 **sdb = NULL;
index f99383b62186f25238b6af999e9a882c7112b5a2..2e8c535f6487b517b6556c97e29f2be1b43c8619 100644 (file)
@@ -895,6 +895,209 @@ TEST_F(DBStoreTest, DeleteObj) {
   ASSERT_EQ(ret, -2);
 }
 
+TEST_F(DBStoreTest, WriteVersionedObject) {
+  struct DBOpParams params = GlobalParams;
+  int ret = -1;
+  std::string instances[] = {"inst1", "inst2", "inst3"};
+  bufferlist b1;
+
+  params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+  params.op.obj.state.obj.key.name = "object1";
+
+  /* Write versioned objects */
+  DB::Object op_target(db, params.op.bucket.info, params.op.obj.state.obj);
+  DB::Object::Write write_op(&op_target);
+
+  /* Version1 */
+  params.op.obj.state.obj.key.instance = instances[0];
+  encode("HELLO WORLD", b1);
+  params.op.obj.head_data = b1;
+  params.op.obj.state.size = 12;
+  ret = write_op.write_versioned_obj(dpp, params);
+  ASSERT_EQ(ret, 0);
+
+  /* Version2 */
+  params.op.obj.state.obj.key.instance = instances[1];
+  b1.clear();
+  encode("HELLO WORLD ABC", b1);
+  params.op.obj.head_data = b1;
+  params.op.obj.state.size = 16;
+  ret = write_op.write_versioned_obj(dpp, params);
+  ASSERT_EQ(ret, 0);
+
+  /* Version3 */
+  params.op.obj.state.obj.key.instance = instances[2];
+  b1.clear();
+  encode("HELLO WORLD A", b1);
+  params.op.obj.head_data = b1;
+  params.op.obj.state.size = 14;
+  ret = write_op.write_versioned_obj(dpp, params);
+  ASSERT_EQ(ret, 0);
+}
+
+TEST_F(DBStoreTest, ListVersionedObject) {
+  struct DBOpParams params = GlobalParams;
+  int ret = -1;
+  std::string instances[] = {"inst1", "inst2", "inst3"};
+  int i = 0;
+
+  /* list versioned objects */
+  params.op.obj.state.obj.key.instance.clear();
+  params.op.list_max_count = MAX_VERSIONED_OBJECTS;
+  ret = db->ProcessOp(dpp, "ListVersionedObjects", &params);
+  ASSERT_EQ(ret, 0);
+
+  i = 2;
+  for (auto ent: params.op.obj.list_entries) {
+    string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
+    cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is:current = " << is_current << "\n";
+
+    if (i == 2) {
+      ASSERT_EQ(is_current, "true");
+    } else {
+      ASSERT_EQ(is_current, "false");
+    }
+
+    ASSERT_EQ(ent.key.instance, instances[i]);
+    i--;
+  }
+}
+
+TEST_F(DBStoreTest, ReadVersionedObject) {
+  struct DBOpParams params = GlobalParams;
+  int ret = -1;
+  std::string instances[] = {"inst1", "inst2", "inst3"};
+  std::string data;
+
+  /* read object.. should fetch latest version */
+  RGWObjState* s;
+  params = GlobalParams;
+  params.op.obj.state.obj.key.instance.clear();
+  DB::Object op_target2(db, params.op.bucket.info, params.op.obj.state.obj);
+  ret = op_target2.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+                                 true, &s);
+  ASSERT_EQ(ret, 0);
+  ASSERT_EQ(s->obj.key.instance, instances[2]);
+  decode(data, s->data);
+  ASSERT_EQ(data, "HELLO WORLD A");
+  ASSERT_EQ(s->size, 14);
+
+  /* read a particular non-current version */
+  params.op.obj.state.obj.key.instance = instances[1];
+  DB::Object op_target3(db, params.op.bucket.info, params.op.obj.state.obj);
+  ret = op_target3.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+                                 true, &s);
+  ASSERT_EQ(ret, 0);
+  decode(data, s->data);
+  ASSERT_EQ(data, "HELLO WORLD ABC");
+  ASSERT_EQ(s->size, 16);
+}
+
+TEST_F(DBStoreTest, DeleteVersionedObject) {
+  struct DBOpParams params = GlobalParams;
+  int ret = -1;
+  std::string instances[] = {"inst1", "inst2", "inst3"};
+  std::string data;
+  std::string dm_instance;
+  int i = 0;
+
+  /* Delete object..should create delete marker */
+  params.op.obj.state.obj.key.instance.clear();
+  DB::Object op_target(db, params.op.bucket.info, params.op.obj.state.obj);
+  DB::Object::Delete delete_op(&op_target);
+  delete_op.params.versioning_status |= BUCKET_VERSIONED;
+
+  ret = delete_op.delete_obj(dpp);
+  ASSERT_EQ(ret, 0);
+
+  /* list versioned objects */
+  params = GlobalParams;
+  params.op.obj.state.obj.key.instance.clear();
+  params.op.list_max_count = MAX_VERSIONED_OBJECTS;
+  ret = db->ProcessOp(dpp, "ListVersionedObjects", &params);
+
+  i = 3;
+  for (auto ent: params.op.obj.list_entries) {
+    string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
+    string is_delete_marker = (ent.flags & rgw_bucket_dir_entry::FLAG_DELETE_MARKER)? "true" : "false";
+    cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_delete_marker = " << is_delete_marker << "\n";
+
+    if (i == 3) {
+      ASSERT_EQ(is_delete_marker, "true");
+      ASSERT_EQ(is_current, "true");
+      dm_instance = ent.key.instance;
+    } else {
+      ASSERT_EQ(is_current, "false");
+      ASSERT_EQ(is_delete_marker, "false");
+      ASSERT_EQ(ent.key.instance, instances[i]);
+    }
+
+    i--;
+  }
+
+  /* read object.. should return -ENOENT */
+  RGWObjState* s;
+  params = GlobalParams;
+  params.op.obj.state.obj.key.instance.clear();
+  DB::Object op_target2(db, params.op.bucket.info, params.op.obj.state.obj);
+  ret = op_target2.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+                                 true, &s);
+  ASSERT_EQ(ret, -ENOENT);
+
+  /* Delete delete marker..should be able to read object now */ 
+  params.op.obj.state.obj.key.instance = dm_instance;
+  DB::Object op_target3(db, params.op.bucket.info, params.op.obj.state.obj);
+  DB::Object::Delete delete_op2(&op_target3);
+  delete_op2.params.versioning_status |= BUCKET_VERSIONED;
+
+  ret = delete_op2.delete_obj(dpp);
+  ASSERT_EQ(ret, 0);
+
+  /* read object.. should fetch latest version */
+  params = GlobalParams;
+  params.op.obj.state.obj.key.instance.clear();
+  DB::Object op_target4(db, params.op.bucket.info, params.op.obj.state.obj);
+  ret = op_target4.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
+                                 true, &s);
+  ASSERT_EQ(s->obj.key.instance, instances[2]);
+  decode(data, s->data);
+  ASSERT_EQ(data, "HELLO WORLD A");
+  ASSERT_EQ(s->size, 14);
+
+  /* delete latest version using version-id. Next version should get promoted */
+  params.op.obj.state.obj.key.instance = instances[2];
+  DB::Object op_target5(db, params.op.bucket.info, params.op.obj.state.obj);
+  DB::Object::Delete delete_op3(&op_target5);
+  delete_op3.params.versioning_status |= BUCKET_VERSIONED;
+
+  ret = delete_op3.delete_obj(dpp);
+  ASSERT_EQ(ret, 0);
+
+  /* list versioned objects..only two versions should be present
+   * with second version marked as CURRENT */
+  params = GlobalParams;
+  params.op.obj.state.obj.key.instance.clear();
+  params.op.list_max_count = MAX_VERSIONED_OBJECTS;
+  ret = db->ProcessOp(dpp, "ListVersionedObjects", &params);
+
+  i = 1;
+  for (auto ent: params.op.obj.list_entries) {
+    string is_current = (ent.flags & rgw_bucket_dir_entry::FLAG_CURRENT)? "true" : "false";
+    cout << "ent.name: " << ent.key.name << ". ent.instance: " << ent.key.instance << " is_current = " << is_current << "\n";
+
+    if (i == 1) {
+      ASSERT_EQ(is_current, "true");
+      dm_instance = ent.key.instance;
+    } else {
+      ASSERT_EQ(is_current, "false");
+      ASSERT_EQ(ent.key.instance, instances[i]);
+    }
+
+    i--;
+  }
+
+}
+
 TEST_F(DBStoreTest, ObjectOmapSetVal) {
   struct DBOpParams params = GlobalParams;
   int ret = -1;