return cls_cxx_map_set_val(hctx, key, &bl);
}
+static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, struct rgw_bucket_olh_entry *olh_data_entry, string *index_key, bool *found)
+{
+ cls_rgw_obj_key olh_key;
+ olh_key.name = obj_key.name;
+
+ encode_olh_data_key(olh_key, index_key);
+ int ret = read_index_entry(hctx, *index_key, olh_data_entry);
+ if (ret < 0 && ret != -ENOENT) {
+ CLS_LOG(0, "ERROR: read_index_entry() olh_key=%s ret=%d", olh_key.name.c_str(), ret);
+ return ret;
+ }
+ *found = (ret != -ENOENT);
+ return 0;
+}
+
+static int read_key_list_entry(cls_method_context_t hctx, cls_rgw_obj_key& key, rgw_bucket_dir_entry *entry, string *idx)
+{
+ encode_list_index_key(hctx, key, idx);
+
+ int ret = read_index_entry(hctx, *idx, entry);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: read_index_entry() reading previous instance %s ret=%d", idx->c_str(), ret);
+ return ret;
+ }
+
+ return 0;
+}
+
+static void update_olh_log(struct rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, const string& op_tag,
+ cls_rgw_obj_key& key, bool delete_marker)
+{
+ rgw_bucket_olh_log_entry& log_entry = olh_data_entry.pending_log[olh_data_entry.epoch];
+ log_entry.epoch = olh_data_entry.epoch;
+ log_entry.op = op;
+ log_entry.op_tag = op_tag;
+ log_entry.key = key;
+ log_entry.delete_marker = delete_marker;
+}
+
+/*
+ * write object instance entry, and if needed also the list entry
+ */
+static int write_obj_entries(cls_method_context_t hctx, struct rgw_bucket_dir_entry& instance_entry, const string& instance_idx)
+{
+ /* write the instance entry */
+ int ret = write_entry(hctx, instance_entry, instance_idx);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", instance_idx.c_str(), ret);
+ return ret;
+ }
+ string instance_list_idx;
+ get_list_index_key(instance_entry, &instance_list_idx);
+
+ if (instance_idx != instance_list_idx) {
+ /* write a new list entry for the object instance */
+ ret = write_entry(hctx, instance_entry, instance_list_idx);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: write_entry() instance_list_idx=%s ret=%d", instance_list_idx.c_str(), ret);
+ return ret;
+ }
+ }
+ return 0;
+}
+
/*
* link an object version to an olh, update the relevant index entries. It will also handle the
* deletion marker case. We have a few entries that we need to take care of. For object 'foo',
*/
static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
+ string olh_data_idx;
+ string instance_idx;
+ struct rgw_bucket_olh_entry olh_data_entry;
+
// decode request
rgw_cls_link_olh_op op;
bufferlist::iterator iter = in->begin();
return -EINVAL;
}
- cls_rgw_obj_key olh_key;
- olh_key.name = op.key.name;
-
- string instance_key;
struct rgw_bucket_dir_entry instance_entry;
- encode_obj_index_key(op.key, &instance_key);
+ /* read instance entry */
+ encode_obj_index_key(op.key, &instance_idx);
if (!op.delete_marker) {
- int ret = read_index_entry(hctx, instance_key, &instance_entry);
+ int ret = read_index_entry(hctx, instance_idx, &instance_entry);
if (ret < 0) {
- CLS_LOG(0, "ERROR: read_index_entry() key=%s ret=%d", instance_key.c_str(), ret);
+ CLS_LOG(0, "ERROR: read_index_entry() key=%s ret=%d", instance_idx.c_str(), ret);
return ret;
}
} else {
instance_entry.flags = RGW_BUCKET_DIRENT_FLAG_DELETE_MARKER;
}
- struct rgw_bucket_olh_entry olh_data_entry;
- string olh_data_key;
- encode_olh_data_key(olh_key, &olh_data_key);
- int ret = read_index_entry(hctx, olh_data_key, &olh_data_entry);
- if (ret < 0 && ret != -ENOENT) {
- CLS_LOG(0, "ERROR: read_index_entry() olh_key=%s ret=%d", olh_key.name.c_str(), ret);
+ /* read olh */
+ bool olh_found;
+ int ret = read_olh(hctx, op.key, &olh_data_entry, &olh_data_idx, &olh_found);
+ if (ret < 0) {
return ret;
}
- if (ret != -ENOENT) {
- /* previous instance is no longer the latest, need to update */
- string prev_key;
- encode_list_index_key(hctx, olh_data_entry.key, &prev_key);
-
+ if (olh_found) {
+ /* found olh, previous instance is no longer the latest, need to update */
+ string prev_idx;
struct rgw_bucket_dir_entry prev_instance_entry;
- ret = read_index_entry(hctx, prev_key, &prev_instance_entry);
+ ret = read_key_list_entry(hctx, olh_data_entry.key, &prev_instance_entry, &prev_idx);
if (ret < 0) {
- CLS_LOG(0, "ERROR: read_index_entry() reading previous instance %s ret=%d", prev_key.c_str(), ret);
+ CLS_LOG(0, "ERROR: read_key_list_entry() reading previous instance %s ret=%d", prev_idx.c_str(), ret);
return ret;
}
prev_instance_entry.flags &= ~RGW_BUCKET_DIRENT_FLAG_CURRENT;
- ret = write_entry(hctx, prev_instance_entry, prev_key);
+ ret = write_entry(hctx, prev_instance_entry, prev_idx);
if (ret < 0) {
- CLS_LOG(0, "ERROR: write_entry() prev_key=%s ret=%d", prev_key.c_str(), ret);
+ CLS_LOG(0, "ERROR: write_entry() prev_idx=%s ret=%d", prev_idx.c_str(), ret);
return ret;
}
}
/* might need to remove the plain object listing key */
- if (!op.key.instance.empty()) {
- string plain_key;
+ if (!op.key.instance.empty()) {
struct rgw_bucket_dir_entry plain_entry;
- encode_obj_index_key(olh_key, &plain_key);
- ret = read_index_entry(hctx, plain_key, &plain_entry);
+ string plain_idx;
+ cls_rgw_obj_key no_instance_key(op.key.name);
+
+ ret = read_key_list_entry(hctx, no_instance_key, &plain_entry, &plain_idx);
if (ret >= 0) {
-#warning handle overwrite of non-olh object
- ret = cls_cxx_map_remove_key(hctx, plain_key);
- }
+#warning handle overwrite of non-olh object, need to update log
+ ret = cls_cxx_map_remove_key(hctx, plain_idx);
}
+ }
olh_data_entry.epoch++;
olh_data_entry.delete_marker = op.delete_marker;
olh_data_entry.key = op.key;
if (instance_entry.ver.epoch > 0) {
- string list_key;
+ string list_idx;
/* this instance has a previous list entry, remove that entry */
- get_list_index_key(instance_entry, &list_key);
- ret = cls_cxx_map_remove_key(hctx, list_key);
+ get_list_index_key(instance_entry, &list_idx);
+ ret = cls_cxx_map_remove_key(hctx, list_idx);
if (ret < 0) {
- CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() list_key=%s ret=%d", list_key.c_str(), ret);
+ CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() list_idx=%s ret=%d", list_idx.c_str(), ret);
return ret;
}
}
instance_entry.ver.epoch = olh_data_entry.epoch;
instance_entry.flags |= (RGW_BUCKET_DIRENT_FLAG_VER | RGW_BUCKET_DIRENT_FLAG_CURRENT);
- string instance_list_key;
-
- /* write the instance entry */
- ret = write_entry(hctx, instance_entry, instance_key);
+ /* write the instance and list entries */
+ ret = write_obj_entries(hctx, instance_entry, instance_idx);
if (ret < 0) {
- CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", instance_key.c_str(), ret);
+ CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
}
- if (instance_key != instance_list_key) {
- /* write a new list entry for the object instance */
- get_list_index_key(instance_entry, &instance_list_key);
- ret = write_entry(hctx, instance_entry, instance_list_key);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: write_entry() instance_list_key=%s ret=%d", instance_list_key.c_str(), ret);
- return ret;
- }
- }
-
/* update the olh log */
- rgw_bucket_olh_log_entry& log_entry = olh_data_entry.pending_log[olh_data_entry.epoch];
- log_entry.epoch = olh_data_entry.epoch;
- log_entry.op = CLS_RGW_OLH_OP_LINK_OLH;
- log_entry.op_tag = op.op_tag;
- log_entry.key = op.key;
- log_entry.delete_marker = op.delete_marker;
+ update_olh_log(olh_data_entry, CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
/* write the olh data entry */
- ret = write_entry(hctx, olh_data_entry, olh_data_key);
+ ret = write_entry(hctx, olh_data_entry, olh_data_idx);
if (ret < 0) {
- CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret);
+ CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret);
return ret;
}