}
// 0x802001_idx
-static void bi_reshard_log_key(cls_method_context_t hctx, string& key, string& idx)
+static void bi_reshard_log_key(cls_method_context_t hctx, string& key, const string& idx)
{
bi_reshard_log_prefix(key);
key.append(idx);
}
-static int reshard_log_index_operation(cls_method_context_t hctx, string& idx,
+static int reshard_log_index_operation(cls_method_context_t hctx, const string& idx,
const cls_rgw_obj_key& key, bufferlist* log_bl)
{
string reshard_log_idx;
} // rgw_bucket_complete_op
template <class T>
-static int write_entry(cls_method_context_t hctx, T& entry, const string& key)
+static int write_entry(cls_method_context_t hctx, T& entry, const string& key,
+ const bool is_resharding = false)
{
bufferlist bl;
encode(entry, bl);
- return cls_cxx_map_set_val(hctx, key, &bl);
+ int ret = cls_cxx_map_set_val(hctx, key, &bl);
+ if (ret < 0) {
+ return ret;
+ }
+ if (is_resharding) {
+ ret = reshard_log_index_operation(hctx, key, entry.key, &bl);
+ }
+ return ret;
}
static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, rgw_bucket_olh_entry *olh_data_entry, string *index_key, bool *found)
log.push_back(log_entry);
}
-static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, const string& instance_idx)
+static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
+ const string& instance_idx, bool is_resharding)
{
- CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(), instance_idx.c_str(), instance_entry.flags);
+ CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(),
+ instance_idx.c_str(), instance_entry.flags);
/* write the instance entry */
- int ret = write_entry(hctx, instance_entry, instance_idx);
+ int ret = write_entry(hctx, instance_entry, instance_idx, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret);
return ret;
/*
* write object instance entry, and if needed also the list entry
*/
-static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, const string& instance_idx)
+static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
+ const string& instance_idx, bool is_resharding)
{
- int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx);
+ int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, is_resharding);
if (ret < 0) {
return ret;
}
if (instance_idx != instance_list_idx) {
CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags);
/* write a new list entry for the object instance */
- ret = write_entry(hctx, instance_entry, instance_list_idx);
+ ret = write_entry(hctx, instance_entry, instance_list_idx, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret);
return ret;
instance_entry.versioned_epoch = epoch;
}
- int unlink_list_entry() {
- string list_idx;
+ int unlink_list_entry(bool is_resharding) {
+ string list_idx, list_sub_ver;
/* this instance has a previous list entry, remove that entry */
get_list_index_key(instance_entry, &list_idx);
CLS_LOG(20, "unlink_list_entry() list_idx=%s", escape_str(list_idx).c_str());
CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() list_idx=%s ret=%d", list_idx.c_str(), ret);
return ret;
}
+ if (is_resharding) {
+ bufferlist empty;
+ return reshard_log_index_operation(hctx, list_idx, instance_entry.key, &empty);
+ }
return 0;
}
- int unlink() {
+ int unlink(bool is_resharding, const cls_rgw_obj_key& key) {
/* remove the instance entry */
CLS_LOG(20, "unlink() idx=%s", escape_str(instance_idx).c_str());
int ret = cls_cxx_map_remove_key(hctx, instance_idx);
CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
}
+ if (is_resharding) {
+ bufferlist empty;
+ return reshard_log_index_operation(hctx, instance_idx, key, &empty);
+ }
return 0;
}
- int write_entries(uint64_t flags_set, uint64_t flags_reset) {
+ int write_entries(uint64_t flags_set, uint64_t flags_reset, bool is_resharding) {
if (!initialized) {
int ret = init();
if (ret < 0) {
/* write the instance and list entries */
bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty());
encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key);
- int ret = write_obj_entries(hctx, instance_entry, instance_idx);
+ int ret = write_obj_entries(hctx, instance_entry, instance_idx, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
return 0;
}
- int write(uint64_t epoch, bool current) {
+ int write(uint64_t epoch, bool current, bool is_resharding) {
if (instance_entry.versioned_epoch > 0) {
CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
/* this instance has a previous list entry, remove that entry */
- int ret = unlink_list_entry();
+ int ret = unlink_list_entry(is_resharding);
if (ret < 0) {
return ret;
}
}
instance_entry.versioned_epoch = epoch;
- return write_entries(flags, 0);
+ return write_entries(flags, 0, is_resharding);
}
- int demote_current() {
- return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT);
+ int demote_current(bool is_resharding) {
+ return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, is_resharding);
}
bool is_delete_marker() {
olh_data_entry.key = key;
}
- int write() {
+ int write(bool is_resharding) {
/* write the olh data entry */
- int ret = write_entry(hctx, olh_data_entry, olh_data_idx);
+ int ret = write_entry(hctx, olh_data_entry, olh_data_idx, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret);
return ret;
}
};
-static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key)
+static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key,
+ bool is_resharding)
{
rgw_bucket_dir_entry entry;
entry.key = key;
entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER;
- int ret = write_entry(hctx, entry, key.name);
+ int ret = write_entry(hctx, entry, key.name, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret);
return ret;
* key. Their version is going to be empty though
*/
static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
- cls_rgw_obj_key& key,
- bool demote_current,
- bool instance_only)
+ cls_rgw_obj_key& key,
+ bool demote_current,
+ bool instance_only,
+ bool is_resharding)
{
if (!key.instance.empty()) {
return -EINVAL;
encode_obj_versioned_data_key(key, &new_idx);
if (instance_only) {
- ret = write_obj_instance_entry(hctx, entry, new_idx);
+ ret = write_obj_instance_entry(hctx, entry, new_idx, is_resharding);
} else {
- ret = write_obj_entries(hctx, entry, new_idx);
+ ret = write_obj_entries(hctx, entry, new_idx, is_resharding);
}
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d",
}
}
- ret = write_version_marker(hctx, key);
+ ret = write_version_marker(hctx, key, is_resharding);
if (ret < 0) {
return ret;
}
return -EINVAL;
}
+ struct rgw_bucket_dir_header header;
+ int rc = read_bucket_header(hctx, &header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__);
+ return rc;
+ }
+
/* read instance entry */
BIVerObjEntry obj(hctx, op.key);
int ret = obj.init(op.delete_marker);
* entry */
existed = (ret >= 0 && !other_obj.is_delete_marker());
if (ret >= 0 && other_obj.is_delete_marker() != op.delete_marker) {
- ret = other_obj.unlink_list_entry();
+ ret = other_obj.unlink_list_entry(header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
removing = existed && op.delete_marker;
if (!removing) {
- ret = other_obj.unlink();
+ ret = other_obj.unlink(header.resharding_in_logrecord(), op.key);
if (ret < 0) {
return ret;
}
const uint64_t prev_epoch = olh.get_epoch();
if (!olh.start_modify(op.olh_epoch)) {
- ret = obj.write(op.olh_epoch, false);
+ ret = obj.write(op.olh_epoch, false, header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
if (!(olh_entry.key == op.key)) {
BIVerObjEntry old_obj(hctx, olh_entry.key);
- ret = old_obj.demote_current();
+ ret = old_obj.demote_current(header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
return ret;
} else {
bool instance_only = (op.key.instance.empty() && op.delete_marker);
cls_rgw_obj_key key(op.key.name);
- ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only);
+ ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only,
+ header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
}
olh.set_exists(true);
- ret = olh.write();
+ ret = olh.write(header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
return ret;
}
/* write the instance and list entries */
- ret = obj.write(olh.get_epoch(), promote);
+ ret = obj.write(olh.get_epoch(), promote, header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
return 0;
}
- rgw_bucket_dir_header header;
- ret = read_bucket_header(hctx, &header);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: rgw_bucket_link_olh(): failed to read header\n");
- return ret;
- }
if (header.syncstopped) {
return 0;
}
cls_rgw_obj_key dest_key = op.key;
+ struct rgw_bucket_dir_header header;
+ int ret = read_bucket_header(hctx, &header);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: rgw_bucket_unlink_instance(): failed to read header\n");
+ return ret;
+ }
+
BIVerObjEntry obj(hctx, dest_key);
BIOLHEntry olh(hctx, dest_key);
- int ret = obj.init();
+ ret = obj.init();
if (ret < 0) {
if (ret != -ENOENT) {
CLS_LOG(0, "ERROR: obj.init() returned ret=%d", ret);
if (!olh_found) {
bool instance_only = false;
cls_rgw_obj_key key(dest_key.name);
- ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only);
+ ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only,
+ header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
}
if (!olh.start_modify(op.olh_epoch)) {
- ret = obj.unlink_list_entry();
+ ret = obj.unlink_list_entry(header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
}
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
- return olh.write();
+ return olh.write(header.resharding_in_logrecord());
}
rgw_bucket_olh_entry& olh_entry = olh.get_entry();
if (found) {
BIVerObjEntry next(hctx, next_key);
- ret = next.write(olh.get_epoch(), true);
+ ret = next.write(olh.get_epoch(), true, header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
return ret;
} else {
/* this is a delete marker, it's our responsibility to remove its
* instance entry */
- ret = obj.unlink();
+ ret = obj.unlink(header.resharding_in_logrecord(), op.key);
if (ret < 0) {
return ret;
}
}
- ret = obj.unlink_list_entry();
+ ret = obj.unlink_list_entry(header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
- ret = olh.write();
+ ret = olh.write(header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
return 0;
}
- rgw_bucket_dir_header header;
- ret = read_bucket_header(hctx, &header);
- if (ret < 0) {
- CLS_LOG(1, "ERROR: rgw_bucket_unlink_instance(): failed to read header\n");
- return ret;
- }
if (header.syncstopped) {
return 0;
}
log.erase(rm_iter);
}
+ struct rgw_bucket_dir_header header;
+ int rc = read_bucket_header(hctx, &header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__);
+ return rc;
+ }
+
/* write the olh data entry */
- ret = write_entry(hctx, olh_data_entry, olh_data_key);
+ ret = write_entry(hctx, olh_data_entry, olh_data_key, header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret);
return ret;
return -EINVAL;
}
+ struct rgw_bucket_dir_header header;
+ int rc = read_bucket_header(hctx, &header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__);
+ return rc;
+ }
+
/* read olh entry */
rgw_bucket_olh_entry olh_data_entry;
- string olh_data_key;
+ string olh_data_key, olh_sub_ver;
encode_olh_data_key(op.key, &olh_data_key);
int ret = read_index_entry(hctx, olh_data_key, &olh_data_entry);
if (ret < 0 && ret != -ENOENT) {
CLS_LOG(1, "NOTICE: %s: can't remove key %s ret=%d", __func__, olh_data_key.c_str(), ret);
return ret;
}
+ bufferlist empty;
+ ret = record_duplicate_entry(hctx, olh_data_key, olh_data_entry.key, &empty, header.resharding_in_logrecord());
+ if (ret < 0)
+ return ret;
rgw_bucket_dir_entry plain_entry;
return ret;
}
+ ret = record_duplicate_entry(hctx, op.key.name, plain_entry.key, &empty, header.resharding_in_logrecord());
+ if (ret < 0)
+ return ret;
+
return 0;
}
return ret;
}
}
+ if (header.resharding_in_logrecord()) {
+ bufferlist empty;
+ return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &empty);
+ }
break;
case CEPH_RGW_UPDATE:
CLS_LOG_BITX(bitx_inst, 10,
return ret;
}
}
+ if (header.resharding_in_logrecord()) {
+ return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &cur_state_bl);
+ }
break;
} // switch(op)
} // if (cur_disk.pending_map.empty())