*index_key = entry.key.name;
string ver_str;
- decreasing_str(entry.ver.epoch, &ver_str);
+ decreasing_str(entry.versioned_epoch, &ver_str);
string instance_delim("\0i", 2);
string ver_delim("\0v", 2);
index_key->append(entry.key.instance);
}
-static void encode_obj_versioned_data_key(const cls_rgw_obj_key& key, string *index_key)
+static void encode_obj_versioned_data_key(const cls_rgw_obj_key& key, string *index_key, bool append_delete_marker_suffix = false)
{
*index_key = BI_PREFIX_CHAR;
index_key->append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]);
string delim("\0i", 2);
index_key->append(delim);
index_key->append(key.instance);
+ if (append_delete_marker_suffix) {
+ string dm("\0d", 2);
+ index_key->append(dm);
+ }
}
static void encode_obj_index_key(const cls_rgw_obj_key& key, string *index_key)
decode_list_index_key(kiter->first, &key, &ver);
if (!entry.is_valid()) {
+ CLS_LOG(20, "entry %s[%s] is not valid\n", key.name.c_str(), key.instance.c_str());
continue;
}
if (!op.list_versions && !entry.is_visible()) {
+ CLS_LOG(20, "entry %s[%s] is not visible\n", key.name.c_str(), key.instance.c_str());
continue;
}
m[kiter->first] = entry;
return write_bucket_header(hctx, &header);
}
+static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry,
+ bool special_delete_marker_name = false);
+
int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
// decode request
op.op, op.key.name.c_str(), op.key.instance.c_str(), op.tag.c_str());
// get on-disk state
- bufferlist cur_value;
- string key;
- encode_obj_index_key(op.key, &key);
- int rc = cls_cxx_map_get_val(hctx, key, &cur_value);
- if (rc < 0 && rc != -ENOENT)
- return rc;
+ string idx;
struct rgw_bucket_dir_entry entry;
+ int rc = read_key_entry(hctx, op.key, &idx, &entry);
+ if (rc < 0 && rc != -ENOENT)
+ return rc;
bool noent = (rc == -ENOENT);
rc = 0;
- if (!noent) {
- try {
- bufferlist::iterator biter = cur_value.begin();
- ::decode(entry, biter);
- } catch (buffer::error& err) {
- CLS_LOG(1, "ERROR: rgw_bucket_prepare_op(): failed to decode entry\n");
- /* ignoring error */
-
- noent = true;
- }
- }
-
if (noent) { // no entry, initialize fields
entry.key = op.key;
entry.ver = rgw_bucket_entry_ver();
// write out new key to disk
bufferlist info_bl;
::encode(entry, info_bl);
- encode_obj_index_key(op.key, &key);
- rc = cls_cxx_map_set_val(hctx, key, &info_bl);
+ rc = cls_cxx_map_set_val(hctx, idx, &info_bl);
if (rc < 0)
return rc;
return 0;
}
-static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry)
+static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry,
+ bool special_delete_marker_name)
{
encode_obj_index_key(key, idx);
int rc = read_index_entry(hctx, *idx, entry);
if (key.instance.empty() &&
entry->flags & RGW_BUCKET_DIRENT_FLAG_VER_MARKER) {
+ /* we only do it where key.instance is empty. In this case the delete marker will have a
+ * separate entry in the index to avoid collisions with the actual object, as it's mutable
+ */
+ if (special_delete_marker_name) {
+ encode_obj_versioned_data_key(key, idx, true);
+ rc = read_index_entry(hctx, *idx, entry);
+ if (rc == 0) {
+ return 0;
+ }
+ }
encode_obj_versioned_data_key(key, idx);
rc = read_index_entry(hctx, *idx, entry);
if (rc < 0) {
+ *entry = rgw_bucket_dir_entry(); /* need to reset entry because we initialized it earlier */
return rc;
}
}
struct rgw_bucket_dir_entry entry;
bool ondisk = true;
- string key;
- rc = read_key_entry(hctx, op.key, &key, &entry);
+ string idx;
+ rc = read_key_entry(hctx, op.key, &idx, &entry);
if (rc == -ENOENT) {
entry.key = op.key;
entry.ver = op.ver;
}
entry.index_ver = header.ver;
+ entry.flags = 0; /* resetting entry flags, entry might have been previously a delete marker */
if (op.tag.size()) {
map<string, struct rgw_bucket_pending_info>::iterator pinter = entry.pending_map.find(op.tag);
if (op.tag.size()) {
bufferlist new_key_bl;
::encode(entry, new_key_bl);
- return cls_cxx_map_set_val(hctx, key, &new_key_bl);
+ return cls_cxx_map_set_val(hctx, idx, &new_key_bl);
} else {
return 0;
}
case CLS_RGW_OP_DEL:
if (ondisk) {
if (!entry.pending_map.size()) {
- int ret = cls_cxx_map_remove_key(hctx, key);
+ int ret = cls_cxx_map_remove_key(hctx, idx);
if (ret < 0)
return ret;
} else {
entry.exists = false;
bufferlist new_key_bl;
::encode(entry, new_key_bl);
- int ret = cls_cxx_map_set_val(hctx, key, &new_key_bl);
+ int ret = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
if (ret < 0)
return ret;
}
stats.total_size_rounded += get_rounded_size(meta.accounted_size);
bufferlist new_key_bl;
::encode(entry, new_key_bl);
- int ret = cls_cxx_map_set_val(hctx, key, &new_key_bl);
+ int ret = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
if (ret < 0)
return ret;
}
BIVerObjEntry(cls_method_context_t& _hctx, const cls_rgw_obj_key& _key) : hctx(_hctx), key(_key), initialized(false) {
}
- int init() {
- int ret = read_key_entry(hctx, key, &instance_idx, &instance_entry);
+ int init(bool check_delete_marker = true) {
+ int ret = read_key_entry(hctx, key, &instance_idx, &instance_entry,
+ check_delete_marker && key.instance.empty()); /* this is potentially a delete marker, for null objects we
+ keep separate instance entry for the delete markers */
+
if (ret < 0) {
CLS_LOG(0, "ERROR: read_key_entry() idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
instance_entry.key = key;
instance_entry.flags = RGW_BUCKET_DIRENT_FLAG_DELETE_MARKER;
instance_entry.meta = meta;
+ instance_entry.tag = "delete-marker";
initialized = true;
}
return 0;
}
- int unlink_instance_entry() {
- /* this instance has a previous list entry, remove that entry */
- CLS_LOG(20, "unlink_instance_entry() instance_idx=%s", escape_str(instance_idx).c_str());
+ int unlink() {
+ /* remove the instance entry */
+ CLS_LOG(20, "unlink() idx=%s", escape_str(instance_idx).c_str());
int ret = cls_cxx_map_remove_key(hctx, instance_idx);
if (ret < 0) {
CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
}
}
- instance_entry.flags |= flags_set;
instance_entry.flags &= ~flags_reset;
+ instance_entry.flags |= flags_set;
/* write the instance and list entries */
- encode_obj_versioned_data_key(key, &instance_idx);
+ bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty());
+ encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key);
int ret = write_obj_entries(hctx, instance_entry, instance_idx);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
}
int set_current(uint64_t epoch) {
- if (instance_entry.ver.epoch > 0) {
- CLS_LOG(20, "%s(): instance_entry.ver.epoch=%d epoch=%d", __func__, (int)instance_entry.ver.epoch, (int)epoch);
+ if (instance_entry.versioned_epoch > 0) {
+ CLS_LOG(20, "%s(): instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
/* this instance has a previous list entry, remove that entry */
int ret = unlink_list_entry();
if (ret < 0) {
}
}
- instance_entry.ver.epoch = epoch;
+ instance_entry.versioned_epoch = epoch;
return write_entries(RGW_BUCKET_DIRENT_FLAG_VER | RGW_BUCKET_DIRENT_FLAG_CURRENT, 0);
}
return ret;
}
- entry.ver.epoch = epoch;
+ entry.versioned_epoch = epoch;
entry.flags |= RGW_BUCKET_DIRENT_FLAG_VER;
if (demote_current) {
BIOLHEntry olh(hctx, op.key);
/* read instance entry */
- int ret = obj.init();
+ int ret = obj.init(op.delete_marker);
bool existed = (ret == 0);
if (ret == -ENOENT && op.delete_marker) {
ret = 0;
return ret;
}
- bool removing = (existed && !obj.is_delete_marker() && op.delete_marker);
+ bool removing;
+
+ /*
+ * Special handling for null instance object / delete-marker. For these objects we're going to
+ * have separate instances for a data object vs. delete-marker to avoid collisions. We now check
+ * if we got to overwrite a previous entry, and in that case we'll remove its list entry.
+ */
+ if (op.key.instance.empty()) {
+ BIVerObjEntry other_obj(hctx, op.key);
+ ret = other_obj.init(!op.delete_marker); /* try reading the other null versioned entry */
+ existed = (ret >= 0 && !other_obj.is_delete_marker());
+ if (ret >= 0 && other_obj.is_delete_marker() != op.delete_marker) {
+ ret = other_obj.unlink_list_entry();
+ if (ret < 0) {
+ return ret;
+ }
+ ret = other_obj.unlink();
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ removing = existed && op.delete_marker;
+ } else {
+ removing = (existed && !obj.is_delete_marker() && op.delete_marker);
+ }
if (op.delete_marker) {
/* a deletion marker, need to initialize entry as such */
return -ECANCELED;
}
if (olh.exists()) {
+ rgw_bucket_olh_entry& olh_entry = olh.get_entry();
/* found olh, previous instance is no longer the latest, need to update */
- BIVerObjEntry old_obj(hctx, olh.get_entry().key);
+ if (!(olh_entry.key == op.key)) {
+ BIVerObjEntry old_obj(hctx, olh_entry.key);
- ret = old_obj.demote_current();
- if (ret < 0) {
- CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
- return ret;
+ ret = old_obj.demote_current();
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
+ return ret;
+ }
}
}
} else {
if (!obj.is_delete_marker()) {
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
- } else {
- ret = obj.unlink_instance_entry();
- if (ret < 0) {
- return ret;
- }
}
ret = obj.unlink_list_entry();