]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rgw: revise null object instance handling, versioned epoch
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 13 Nov 2014 22:12:53 +0000 (14:12 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 19 Jan 2015 23:57:52 +0000 (15:57 -0800)
Separate the versioned epoch from the object entry version, since in the
case of null instane entries the object version actually contains
meaningful data that can be overwritten.
Keep separate instance entries for null versioned object and delete
marker, since the two can coexist for a period of time. With suspended
versioning bucket, when deleting a null versioned object we create an
delete marker, and set the olh log to remove the object instance, which
will make the gateway send a removal request. Had we kept the same
entry it'll remove the wrong entry.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h

index ede6ce0c2701658bfd2f60b62a0552bff5fc55ac..ace835c0070710bec6bdf1924963e2c641b976c3 100644 (file)
@@ -254,7 +254,7 @@ static void get_list_index_key(struct rgw_bucket_dir_entry& entry, string *index
   *index_key = entry.key.name;
 
   string ver_str;
-  decreasing_str(entry.ver.epoch, &ver_str);
+  decreasing_str(entry.versioned_epoch, &ver_str);
   string instance_delim("\0i", 2);
   string ver_delim("\0v", 2);
 
@@ -264,7 +264,7 @@ static void get_list_index_key(struct rgw_bucket_dir_entry& entry, string *index
   index_key->append(entry.key.instance);
 }
 
-static void encode_obj_versioned_data_key(const cls_rgw_obj_key& key, string *index_key)
+static void encode_obj_versioned_data_key(const cls_rgw_obj_key& key, string *index_key, bool append_delete_marker_suffix = false)
 {
   *index_key = BI_PREFIX_CHAR;
   index_key->append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]);
@@ -272,6 +272,10 @@ static void encode_obj_versioned_data_key(const cls_rgw_obj_key& key, string *in
   string delim("\0i", 2);
   index_key->append(delim);
   index_key->append(key.instance);
+  if (append_delete_marker_suffix) {
+    string dm("\0d", 2);
+    index_key->append(dm);
+  }
 }
 
 static void encode_obj_index_key(const cls_rgw_obj_key& key, string *index_key)
@@ -439,10 +443,12 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     decode_list_index_key(kiter->first, &key, &ver);
 
     if (!entry.is_valid()) {
+      CLS_LOG(20, "entry %s[%s] is not valid\n", key.name.c_str(), key.instance.c_str());
       continue;
     }
 
     if (!op.list_versions && !entry.is_visible()) {
+      CLS_LOG(20, "entry %s[%s] is not visible\n", key.name.c_str(), key.instance.c_str());
       continue;
     }
     m[kiter->first] = entry;
@@ -606,6 +612,9 @@ int rgw_bucket_set_tag_timeout(cls_method_context_t hctx, bufferlist *in, buffer
   return write_bucket_header(hctx, &header);
 }
 
+static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry,
+                          bool special_delete_marker_name = false);
+
 int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   // decode request
@@ -627,31 +636,17 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
           op.op, op.key.name.c_str(), op.key.instance.c_str(), op.tag.c_str());
 
   // get on-disk state
-  bufferlist cur_value;
-  string key;
-  encode_obj_index_key(op.key, &key);
-  int rc = cls_cxx_map_get_val(hctx, key, &cur_value);
-  if (rc < 0 && rc != -ENOENT)
-    return rc;
+  string idx;
 
   struct rgw_bucket_dir_entry entry;
+  int rc = read_key_entry(hctx, op.key, &idx, &entry);
+  if (rc < 0 && rc != -ENOENT)
+    return rc;
 
   bool noent = (rc == -ENOENT);
 
   rc = 0;
 
-  if (!noent) {
-    try {
-      bufferlist::iterator biter = cur_value.begin();
-      ::decode(entry, biter);
-    } catch (buffer::error& err) {
-      CLS_LOG(1, "ERROR: rgw_bucket_prepare_op(): failed to decode entry\n");
-      /* ignoring error */
-
-      noent = true;
-    }
-  }
-
   if (noent) { // no entry, initialize fields
     entry.key = op.key;
     entry.ver = rgw_bucket_entry_ver();
@@ -690,8 +685,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   // write out new key to disk
   bufferlist info_bl;
   ::encode(entry, info_bl);
-  encode_obj_index_key(op.key, &key);
-  rc = cls_cxx_map_set_val(hctx, key, &info_bl);
+  rc = cls_cxx_map_set_val(hctx, idx, &info_bl);
   if (rc < 0)
     return rc;
 
@@ -741,7 +735,8 @@ static int read_index_entry(cls_method_context_t hctx, string& name, T *entry)
   return 0;
 }
 
-static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry)
+static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry,
+                          bool special_delete_marker_name)
 {
   encode_obj_index_key(key, idx);
   int rc = read_index_entry(hctx, *idx, entry);
@@ -751,9 +746,20 @@ static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, strin
 
   if (key.instance.empty() &&
       entry->flags & RGW_BUCKET_DIRENT_FLAG_VER_MARKER) {
+    /* we only do it where key.instance is empty. In this case the delete marker will have a
+     * separate entry in the index to avoid collisions with the actual object, as it's mutable
+     */
+    if (special_delete_marker_name) {
+      encode_obj_versioned_data_key(key, idx, true);
+      rc = read_index_entry(hctx, *idx, entry);
+      if (rc == 0) {
+        return 0;
+      }
+    }
     encode_obj_versioned_data_key(key, idx);
     rc = read_index_entry(hctx, *idx, entry);
     if (rc < 0) {
+      *entry = rgw_bucket_dir_entry(); /* need to reset entry because we initialized it earlier */
       return rc;
     }
   }
@@ -793,8 +799,8 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   struct rgw_bucket_dir_entry entry;
   bool ondisk = true;
 
-  string key;
-  rc = read_key_entry(hctx, op.key, &key, &entry);
+  string idx;
+  rc = read_key_entry(hctx, op.key, &idx, &entry);
   if (rc == -ENOENT) {
     entry.key = op.key;
     entry.ver = op.ver;
@@ -806,6 +812,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   }
 
   entry.index_ver = header.ver;
+  entry.flags = 0; /* resetting entry flags, entry might have been previously a delete marker */
 
   if (op.tag.size()) {
     map<string, struct rgw_bucket_pending_info>::iterator pinter = entry.pending_map.find(op.tag);
@@ -840,7 +847,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
     if (op.tag.size()) {
       bufferlist new_key_bl;
       ::encode(entry, new_key_bl);
-      return cls_cxx_map_set_val(hctx, key, &new_key_bl);
+      return cls_cxx_map_set_val(hctx, idx, &new_key_bl);
     } else {
       return 0;
     }
@@ -855,14 +862,14 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   case CLS_RGW_OP_DEL:
     if (ondisk) {
       if (!entry.pending_map.size()) {
-       int ret = cls_cxx_map_remove_key(hctx, key);
+       int ret = cls_cxx_map_remove_key(hctx, idx);
        if (ret < 0)
          return ret;
       } else {
         entry.exists = false;
         bufferlist new_key_bl;
         ::encode(entry, new_key_bl);
-       int ret = cls_cxx_map_set_val(hctx, key, &new_key_bl);
+       int ret = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
        if (ret < 0)
          return ret;
       }
@@ -883,7 +890,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
       stats.total_size_rounded += get_rounded_size(meta.accounted_size);
       bufferlist new_key_bl;
       ::encode(entry, new_key_bl);
-      int ret = cls_cxx_map_set_val(hctx, key, &new_key_bl);
+      int ret = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
       if (ret < 0)
        return ret;
     }
@@ -1017,8 +1024,11 @@ public:
   BIVerObjEntry(cls_method_context_t& _hctx, const cls_rgw_obj_key& _key) : hctx(_hctx), key(_key), initialized(false) {
   }
 
-  int init() {
-    int ret = read_key_entry(hctx, key, &instance_idx, &instance_entry);
+  int init(bool check_delete_marker = true) {
+    int ret = read_key_entry(hctx, key, &instance_idx, &instance_entry,
+                             check_delete_marker && key.instance.empty()); /* this is potentially a delete marker, for null objects we
+                                                                              keep separate instance entry for the delete markers */
+
     if (ret < 0) {
       CLS_LOG(0, "ERROR: read_key_entry() idx=%s ret=%d", instance_idx.c_str(), ret);
       return ret;
@@ -1033,6 +1043,7 @@ public:
     instance_entry.key = key;
     instance_entry.flags = RGW_BUCKET_DIRENT_FLAG_DELETE_MARKER;
     instance_entry.meta = meta;
+    instance_entry.tag = "delete-marker";
 
     initialized = true;
   }
@@ -1051,9 +1062,9 @@ public:
     return 0;
   }
 
-  int unlink_instance_entry() {
-    /* this instance has a previous list entry, remove that entry */
-    CLS_LOG(20, "unlink_instance_entry() instance_idx=%s", escape_str(instance_idx).c_str());
+  int unlink() {
+    /* remove the instance entry */
+    CLS_LOG(20, "unlink() idx=%s", escape_str(instance_idx).c_str());
     int ret = cls_cxx_map_remove_key(hctx, instance_idx);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
@@ -1069,11 +1080,12 @@ public:
         return ret;
       }
     }
-    instance_entry.flags |= flags_set;
     instance_entry.flags &= ~flags_reset;
+    instance_entry.flags |= flags_set;
 
     /* write the instance and list entries */
-    encode_obj_versioned_data_key(key, &instance_idx);
+    bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty());
+    encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key);
     int ret = write_obj_entries(hctx, instance_entry, instance_idx);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
@@ -1084,8 +1096,8 @@ public:
   }
 
   int set_current(uint64_t epoch) {
-    if (instance_entry.ver.epoch > 0) {
-      CLS_LOG(20, "%s(): instance_entry.ver.epoch=%d epoch=%d", __func__, (int)instance_entry.ver.epoch, (int)epoch);
+    if (instance_entry.versioned_epoch > 0) {
+      CLS_LOG(20, "%s(): instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
       /* this instance has a previous list entry, remove that entry */
       int ret = unlink_list_entry();
       if (ret < 0) {
@@ -1093,7 +1105,7 @@ public:
       }
     }
 
-    instance_entry.ver.epoch = epoch;
+    instance_entry.versioned_epoch = epoch;
     return write_entries(RGW_BUCKET_DIRENT_FLAG_VER | RGW_BUCKET_DIRENT_FLAG_CURRENT, 0);
   }
 
@@ -1234,7 +1246,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, cls_rgw_o
       return ret;
     }
 
-    entry.ver.epoch = epoch;
+    entry.versioned_epoch = epoch;
     entry.flags |= RGW_BUCKET_DIRENT_FLAG_VER;
 
     if (demote_current) {
@@ -1293,7 +1305,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   BIOLHEntry olh(hctx, op.key);
 
   /* read instance entry */
-  int ret = obj.init();
+  int ret = obj.init(op.delete_marker);
   bool existed = (ret == 0);
   if (ret == -ENOENT && op.delete_marker) {
     ret = 0;
@@ -1302,7 +1314,32 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
     return ret;
   }
 
-  bool removing = (existed && !obj.is_delete_marker() && op.delete_marker);
+  bool removing;
+
+  /*
+   * Special handling for null instance object / delete-marker. For these objects we're going to
+   * have separate instances for a data object vs. delete-marker to avoid collisions. We now check
+   * if we got to overwrite a previous entry, and in that case we'll remove its list entry.
+   */
+  if (op.key.instance.empty()) {
+    BIVerObjEntry other_obj(hctx, op.key);
+    ret = other_obj.init(!op.delete_marker); /* try reading the other null versioned entry */
+    existed = (ret >= 0 && !other_obj.is_delete_marker());
+    if (ret >= 0 && other_obj.is_delete_marker() != op.delete_marker) {
+      ret = other_obj.unlink_list_entry();
+      if (ret < 0) {
+        return ret;
+      }
+      ret = other_obj.unlink();
+      if (ret < 0) {
+        return ret;
+      }
+    }
+
+    removing = existed && op.delete_marker;
+  } else {
+    removing = (existed && !obj.is_delete_marker() && op.delete_marker);
+  }
 
   if (op.delete_marker) {
     /* a deletion marker, need to initialize entry as such */
@@ -1323,13 +1360,16 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
       return -ECANCELED;
     }
     if (olh.exists()) {
+      rgw_bucket_olh_entry& olh_entry = olh.get_entry();
       /* found olh, previous instance is no longer the latest, need to update */
-      BIVerObjEntry old_obj(hctx, olh.get_entry().key);
+      if (!(olh_entry.key == op.key)) {
+        BIVerObjEntry old_obj(hctx, olh_entry.key);
 
-      ret = old_obj.demote_current();
-      if (ret < 0) {
-        CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
-        return ret;
+        ret = old_obj.demote_current();
+        if (ret < 0) {
+          CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
+          return ret;
+        }
       }
     }
   } else {
@@ -1444,11 +1484,6 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
 
   if (!obj.is_delete_marker()) {
     olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
-  } else {
-    ret = obj.unlink_instance_entry();
-    if (ret < 0) {
-      return ret;
-    }
   }
 
   ret = obj.unlink_list_entry();
index 89d171d687c0624710de93e7c83bedd7129bee22..69732945b5476b776cf814ab1a06323264ee924d 100644 (file)
@@ -127,6 +127,7 @@ void rgw_bucket_dir_entry::dump(Formatter *f) const
   encode_json("tag", tag , f);
   encode_json("flags", (int)flags , f);
   encode_json("pending_map", pending_map, f);
+  encode_json("versioned_epoch", versioned_epoch , f);
 }
 
 void rgw_bucket_dir_entry::decode_json(JSONObj *obj) {
@@ -141,6 +142,7 @@ void rgw_bucket_dir_entry::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("flags", val , obj);
   flags = (uint16_t)val;
   JSONDecoder::decode_json("pending_map", pending_map, obj);
+  JSONDecoder::decode_json("versioned_epoch", versioned_epoch, obj);
 }
 
 static void dump_bi_entry(bufferlist bl, BIIndexType index_type, Formatter *formatter)
index 7438147dc9187d36a762c625c0f04764dc37304b..5884b41fe37f0c948c7e53aefda092c309279d4a 100644 (file)
@@ -265,12 +265,13 @@ struct rgw_bucket_dir_entry {
   uint64_t index_ver;
   string tag;
   uint16_t flags;
+  uint64_t versioned_epoch;
 
   rgw_bucket_dir_entry() :
-    exists(false), index_ver(0), flags(0) {}
+    exists(false), index_ver(0), flags(0), versioned_epoch(0) {}
 
   void encode(bufferlist &bl) const {
-    ENCODE_START(7, 3, bl);
+    ENCODE_START(8, 3, bl);
     ::encode(key.name, bl);
     ::encode(ver.epoch, bl);
     ::encode(exists, bl);
@@ -282,6 +283,7 @@ struct rgw_bucket_dir_entry {
     ::encode(tag, bl);
     ::encode(key.instance, bl);
     ::encode(flags, bl);
+    ::encode(versioned_epoch, bl);
     ENCODE_FINISH(bl);
   }
   void decode(bufferlist::iterator &bl) {
@@ -309,6 +311,9 @@ struct rgw_bucket_dir_entry {
     if (struct_v >= 7) {
       ::decode(flags, bl);
     }
+    if (struct_v >= 8) {
+      ::decode(versioned_epoch, bl);
+    }
     DECODE_FINISH(bl);
   }