]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rgw: keep null-versioned object as versioned object
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 18 Oct 2014 00:08:19 +0000 (17:08 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 19 Jan 2015 23:57:46 +0000 (15:57 -0800)
when olh is created. Mark the object as such, so that we'll know to
handle it correctly. This is needed so that we could handle the 'null'
version as required.
Still doesn't work correctly.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_types.h

index c9f4aa683a764b0fe2b107b2f6b600c40ebaf33c..901ecc5f9f4c18abd4be24384942c03cd5ada705 100644 (file)
@@ -235,17 +235,25 @@ static void get_list_index_key(struct rgw_bucket_dir_entry& entry, string *index
 {
   *index_key = entry.key.name;
 
-  if (!entry.key.instance.empty()) {
-    string ver_str;
-    decreasing_str(entry.ver.epoch, &ver_str);
-    string instance_delim("\0i", 2);
-    string ver_delim("\0v", 2);
+  string ver_str;
+  decreasing_str(entry.ver.epoch, &ver_str);
+  string instance_delim("\0i", 2);
+  string ver_delim("\0v", 2);
 
-    index_key->append(ver_delim);
-    index_key->append(ver_str);
-    index_key->append(instance_delim);
-    index_key->append(entry.key.instance);
-  }
+  index_key->append(ver_delim);
+  index_key->append(ver_str);
+  index_key->append(instance_delim);
+  index_key->append(entry.key.instance);
+}
+
+static void encode_obj_versioned_data_key(const cls_rgw_obj_key& key, string *index_key)
+{
+  *index_key = BI_PREFIX_CHAR;
+  index_key->append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]);
+  index_key->append(key.name);
+  string delim("\0i", 2);
+  index_key->append(delim);
+  index_key->append(key.instance);
 }
 
 static void encode_obj_index_key(const cls_rgw_obj_key& key, string *index_key)
@@ -253,12 +261,7 @@ static void encode_obj_index_key(const cls_rgw_obj_key& key, string *index_key)
   if (key.instance.empty()) {
     *index_key = key.name;
   } else {
-    *index_key = BI_PREFIX_CHAR;
-    index_key->append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]);
-    index_key->append(key.name);
-    string delim("\0i", 2);
-    index_key->append(delim);
-    index_key->append(key.instance);
+    encode_obj_versioned_data_key(key, index_key);
   }
 }
 
@@ -452,6 +455,10 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     uint64_t ver;
     decode_list_index_key(kiter->first, &key, &ver);
 
+    if (!entry.is_valid()) {
+      continue;
+    }
+
     if (!op.list_versions && !entry.is_visible()) {
       continue;
     }
@@ -751,6 +758,26 @@ static int read_index_entry(cls_method_context_t hctx, string& name, T *entry)
   return 0;
 }
 
+static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry)
+{
+  encode_obj_index_key(key, idx);
+  int rc = read_index_entry(hctx, *idx, entry);
+  if (rc < 0) {
+    return rc;
+  }
+
+  if (key.instance.empty() &&
+      entry->flags & RGW_BUCKET_DIRENT_FLAG_VER_MARKER) {
+    encode_obj_versioned_data_key(key, idx);
+    rc = read_index_entry(hctx, *idx, entry);
+    if (rc < 0) {
+      return rc;
+    }
+  }
+
+  return 0;
+}
+
 int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   // decode request
@@ -784,8 +811,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   bool ondisk = true;
 
   string key;
-  encode_obj_index_key(op.key, &key);
-  rc = read_index_entry(hctx, key, &entry);
+  rc = read_key_entry(hctx, op.key, &key, &entry);
   if (rc == -ENOENT) {
     entry.key = op.key;
     entry.ver = op.ver;
@@ -896,8 +922,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
             remove_key.name.c_str(), remove_key.instance.c_str());
     struct rgw_bucket_dir_entry remove_entry;
     string k;
-    encode_obj_index_key(remove_key, &k);
-    int ret = read_index_entry(hctx, k, &remove_entry);
+    int ret = read_key_entry(hctx, remove_key, &k, &remove_entry);
     if (ret < 0) {
       CLS_LOG(1, "rgw_bucket_complete_op(): removing entries, read_index_entry name=%s instance=%s ret=%d\n",
             remove_key.name.c_str(), remove_key.instance.c_str(), ret);
@@ -951,9 +976,7 @@ static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, struct r
 
 static int read_key_list_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, rgw_bucket_dir_entry *entry, string *idx)
 {
-  encode_list_index_key(hctx, key, idx);
-
-  int ret = read_index_entry(hctx, *idx, entry);
+  int ret = read_key_entry(hctx, key, idx, entry);
   if (ret < 0) {
     return ret;
   }
@@ -1011,13 +1034,12 @@ class BIVerObjEntry {
 
 public:
   BIVerObjEntry(cls_method_context_t& _hctx, const cls_rgw_obj_key& _key) : hctx(_hctx), key(_key), initialized(false) {
-    encode_obj_index_key(key, &instance_idx);
   }
 
   int init() {
-    int ret = read_index_entry(hctx, instance_idx, &instance_entry);
+    int ret = read_key_entry(hctx, key, &instance_idx, &instance_entry);
     if (ret < 0) {
-      CLS_LOG(0, "ERROR: read_index_entry() key=%s ret=%d", instance_idx.c_str(), ret);
+      CLS_LOG(0, "ERROR: read_key_entry() idx=%s ret=%d", instance_idx.c_str(), ret);
       return ret;
     }
     initialized = true;
@@ -1057,6 +1079,7 @@ public:
     instance_entry.flags &= ~flags_reset;
 
     /* write the instance and list entries */
+    encode_obj_versioned_data_key(key, &instance_idx);
     int ret = write_obj_entries(hctx, instance_entry, instance_idx);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
@@ -1184,6 +1207,53 @@ public:
 
 };
 
+/*
+ * plain entries are the ones who were created when bucket was not versioned,
+ * if we override these objects, we need to convert these to versioned entries -- ones that have
+ * both data entry, and listing key. Their version is going to be empty though
+ */
+static int convert_plain_entry_to_versioned(cls_method_context_t hctx, cls_rgw_obj_key& key, uint64_t epoch, bool demote_current)
+{
+  if (!key.instance.empty()) {
+    return -EINVAL;
+  }
+
+  struct rgw_bucket_dir_entry entry;
+
+  string orig_idx;
+  int ret = read_key_entry(hctx, key, &orig_idx, &entry);
+  if (ret != -ENOENT) {
+    if (ret < 0) {
+      CLS_LOG(0, "ERROR: read_key_entry() returned ret=%d", ret);
+      return ret;
+    }
+
+    entry.ver.epoch = epoch;
+    entry.flags |= RGW_BUCKET_DIRENT_FLAG_VER;
+
+    if (demote_current) {
+      entry.flags &= ~RGW_BUCKET_DIRENT_FLAG_CURRENT;
+    }
+
+    string new_idx;
+    encode_obj_versioned_data_key(key, &new_idx);
+
+    ret = write_obj_entries(hctx, entry, new_idx);
+    if (ret < 0) {
+      CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d", new_idx.c_str(), ret);
+      return ret;
+    }
+  }
+
+  entry.flags = RGW_BUCKET_DIRENT_FLAG_VER_MARKER;
+  ret = write_entry(hctx, entry, key.name);
+  if (ret < 0) {
+    CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret);
+    return ret;
+  }
+  return 0;
+}
+
 /*
  * link an object version to an olh, update the relevant index entries. It will also handle the
  * deletion marker case. We have a few entries that we need to take care of. For object 'foo',
@@ -1242,27 +1312,14 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
       CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
       return ret;
     }
-  }
-
-  /* might need to remove the plain object listing key */
-  if (!op.key.instance.empty()) {
-    struct rgw_bucket_dir_entry plain_entry;
-    string plain_idx;
-    cls_rgw_obj_key no_instance_key(op.key.name);
-
-    ret = read_key_list_entry(hctx, no_instance_key, &plain_entry, &plain_idx);
-    if (ret >= 0) {
-#warning handle overwrite of non-olh object, need to update stats
-      ret = cls_cxx_map_remove_key(hctx, plain_idx);
-      if (ret < 0) {
-        CLS_LOG(0, "ERROR: cls_Cxx_map_remove_key() ret=%d", ret);
-        return ret;
-      }
-
-      if (!plain_entry.is_delete_marker()) {
-        olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, plain_entry.key, false);
-      }
+  } else {
+    cls_rgw_obj_key key(op.key.name);
+    ret = convert_plain_entry_to_versioned(hctx, key, olh.get_epoch(), true);
+    if (ret < 0) {
+      CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
+      return ret;
     }
+    olh.inc_epoch();
   }
 
   /* update the olh log */
index e7767c6242e638ac957fda3cb2136f2008856dfb..17f214f8788e4547d7ee555587ab062e9b59479d 100644 (file)
@@ -247,6 +247,7 @@ WRITE_CLASS_ENCODER(cls_rgw_obj_key)
 #define RGW_BUCKET_DIRENT_FLAG_VER           0x1    /* a versioned object instance */
 #define RGW_BUCKET_DIRENT_FLAG_CURRENT       0x2    /* the last object instance of a versioned object */
 #define RGW_BUCKET_DIRENT_FLAG_DELETE_MARKER 0x4    /* delete marker */
+#define RGW_BUCKET_DIRENT_FLAG_VER_MARKER    0x8    /* object is versioned, a placeholder for the plain entry */
 
 struct rgw_bucket_dir_entry {
   cls_rgw_obj_key key;
@@ -314,6 +315,7 @@ struct rgw_bucket_dir_entry {
   bool is_visible() {
     return is_current() && !is_delete_marker();
   }
+  bool is_valid() { return (flags & RGW_BUCKET_DIRENT_FLAG_VER_MARKER) == 0; }
 
   void dump(Formatter *f) const;
   static void generate_test_instances(list<rgw_bucket_dir_entry*>& o);