{
*index_key = entry.key.name;
- if (!entry.key.instance.empty()) {
- string ver_str;
- decreasing_str(entry.ver.epoch, &ver_str);
- string instance_delim("\0i", 2);
- string ver_delim("\0v", 2);
+ string ver_str;
+ decreasing_str(entry.ver.epoch, &ver_str);
+ string instance_delim("\0i", 2);
+ string ver_delim("\0v", 2);
- index_key->append(ver_delim);
- index_key->append(ver_str);
- index_key->append(instance_delim);
- index_key->append(entry.key.instance);
- }
+ index_key->append(ver_delim);
+ index_key->append(ver_str);
+ index_key->append(instance_delim);
+ index_key->append(entry.key.instance);
+}
+
+static void encode_obj_versioned_data_key(const cls_rgw_obj_key& key, string *index_key)
+{
+ *index_key = BI_PREFIX_CHAR;
+ index_key->append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]);
+ index_key->append(key.name);
+ string delim("\0i", 2);
+ index_key->append(delim);
+ index_key->append(key.instance);
}
static void encode_obj_index_key(const cls_rgw_obj_key& key, string *index_key)
if (key.instance.empty()) {
*index_key = key.name;
} else {
- *index_key = BI_PREFIX_CHAR;
- index_key->append(bucket_index_prefixes[BI_BUCKET_OBJ_INSTANCE_INDEX]);
- index_key->append(key.name);
- string delim("\0i", 2);
- index_key->append(delim);
- index_key->append(key.instance);
+ encode_obj_versioned_data_key(key, index_key);
}
}
uint64_t ver;
decode_list_index_key(kiter->first, &key, &ver);
+ if (!entry.is_valid()) {
+ continue;
+ }
+
if (!op.list_versions && !entry.is_visible()) {
continue;
}
return 0;
}
+static int read_key_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, string *idx, struct rgw_bucket_dir_entry *entry)
+{
+ encode_obj_index_key(key, idx);
+ int rc = read_index_entry(hctx, *idx, entry);
+ if (rc < 0) {
+ return rc;
+ }
+
+ if (key.instance.empty() &&
+ entry->flags & RGW_BUCKET_DIRENT_FLAG_VER_MARKER) {
+ encode_obj_versioned_data_key(key, idx);
+ rc = read_index_entry(hctx, *idx, entry);
+ if (rc < 0) {
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
// decode request
bool ondisk = true;
string key;
- encode_obj_index_key(op.key, &key);
- rc = read_index_entry(hctx, key, &entry);
+ rc = read_key_entry(hctx, op.key, &key, &entry);
if (rc == -ENOENT) {
entry.key = op.key;
entry.ver = op.ver;
remove_key.name.c_str(), remove_key.instance.c_str());
struct rgw_bucket_dir_entry remove_entry;
string k;
- encode_obj_index_key(remove_key, &k);
- int ret = read_index_entry(hctx, k, &remove_entry);
+ int ret = read_key_entry(hctx, remove_key, &k, &remove_entry);
if (ret < 0) {
CLS_LOG(1, "rgw_bucket_complete_op(): removing entries, read_index_entry name=%s instance=%s ret=%d\n",
remove_key.name.c_str(), remove_key.instance.c_str(), ret);
static int read_key_list_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, rgw_bucket_dir_entry *entry, string *idx)
{
- encode_list_index_key(hctx, key, idx);
-
- int ret = read_index_entry(hctx, *idx, entry);
+ int ret = read_key_entry(hctx, key, idx, entry);
if (ret < 0) {
return ret;
}
public:
BIVerObjEntry(cls_method_context_t& _hctx, const cls_rgw_obj_key& _key) : hctx(_hctx), key(_key), initialized(false) {
- encode_obj_index_key(key, &instance_idx);
}
int init() {
- int ret = read_index_entry(hctx, instance_idx, &instance_entry);
+ int ret = read_key_entry(hctx, key, &instance_idx, &instance_entry);
if (ret < 0) {
- CLS_LOG(0, "ERROR: read_index_entry() key=%s ret=%d", instance_idx.c_str(), ret);
+ CLS_LOG(0, "ERROR: read_key_entry() idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
}
initialized = true;
instance_entry.flags &= ~flags_reset;
/* write the instance and list entries */
+ encode_obj_versioned_data_key(key, &instance_idx);
int ret = write_obj_entries(hctx, instance_entry, instance_idx);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
};
+/*
+ * plain entries are the ones who were created when bucket was not versioned,
+ * if we override these objects, we need to convert these to versioned entries -- ones that have
+ * both data entry, and listing key. Their version is going to be empty though
+ */
+static int convert_plain_entry_to_versioned(cls_method_context_t hctx, cls_rgw_obj_key& key, uint64_t epoch, bool demote_current)
+{
+ if (!key.instance.empty()) {
+ return -EINVAL;
+ }
+
+ struct rgw_bucket_dir_entry entry;
+
+ string orig_idx;
+ int ret = read_key_entry(hctx, key, &orig_idx, &entry);
+ if (ret != -ENOENT) {
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: read_key_entry() returned ret=%d", ret);
+ return ret;
+ }
+
+ entry.ver.epoch = epoch;
+ entry.flags |= RGW_BUCKET_DIRENT_FLAG_VER;
+
+ if (demote_current) {
+ entry.flags &= ~RGW_BUCKET_DIRENT_FLAG_CURRENT;
+ }
+
+ string new_idx;
+ encode_obj_versioned_data_key(key, &new_idx);
+
+ ret = write_obj_entries(hctx, entry, new_idx);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d", new_idx.c_str(), ret);
+ return ret;
+ }
+ }
+
+ entry.flags = RGW_BUCKET_DIRENT_FLAG_VER_MARKER;
+ ret = write_entry(hctx, entry, key.name);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret);
+ return ret;
+ }
+ return 0;
+}
+
/*
* link an object version to an olh, update the relevant index entries. It will also handle the
* deletion marker case. We have a few entries that we need to take care of. For object 'foo',
CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
return ret;
}
- }
-
- /* might need to remove the plain object listing key */
- if (!op.key.instance.empty()) {
- struct rgw_bucket_dir_entry plain_entry;
- string plain_idx;
- cls_rgw_obj_key no_instance_key(op.key.name);
-
- ret = read_key_list_entry(hctx, no_instance_key, &plain_entry, &plain_idx);
- if (ret >= 0) {
-#warning handle overwrite of non-olh object, need to update stats
- ret = cls_cxx_map_remove_key(hctx, plain_idx);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: cls_Cxx_map_remove_key() ret=%d", ret);
- return ret;
- }
-
- if (!plain_entry.is_delete_marker()) {
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, plain_entry.key, false);
- }
+ } else {
+ cls_rgw_obj_key key(op.key.name);
+ ret = convert_plain_entry_to_versioned(hctx, key, olh.get_epoch(), true);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
+ return ret;
}
+ olh.inc_epoch();
}
/* update the olh log */