return cls_cxx_map_write_header(hctx, &header_bl);
}
+template <class T>
+static int write_entry(cls_method_context_t hctx, T& entry, const string& key,
+ rgw_bucket_dir_header& header, bool count_entry = true)
+{
+ bufferlist bl;
+ encode(entry, bl);
+ int ret = cls_cxx_map_set_val(hctx, key, &bl);
+ if (ret < 0) {
+ return ret;
+ }
+ if (header.resharding_in_logrecord()) {
+ ret = reshard_log_index_operation(hctx, key, entry.key, &bl);
+ header.reshardlog_entries++;
+ }
+ return ret;
+}
+
+static int remove_entry(cls_method_context_t hctx, const string& idx,
+ const cls_rgw_obj_key& key,
+ rgw_bucket_dir_header& header)
+{
+ int ret = cls_cxx_map_remove_key(hctx, idx);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() idx=%s ret=%d", idx.c_str(), ret);
+ return ret;
+ }
+ if (header.resharding_in_logrecord()) {
+ header.reshardlog_entries++;
+ bufferlist empty;
+ return reshard_log_index_operation(hctx, idx, key, &empty);
+ }
+ return 0;
+}
int rgw_bucket_update_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
return modify_op_str((RGWModifyOp) op);
}
-static int record_duplicate_entry(cls_method_context_t hctx, string& idx,
- const cls_rgw_obj_key& key, bufferlist* log_bl,
- bool resharding, uint32_t* reshardlog_entries = NULL) {
- if (resharding) {
- int rc = reshard_log_index_operation(hctx, idx, key, log_bl);
- if (rc < 0) {
- CLS_LOG(0, "record_duplicate_entry(): failed to update entry, name=%s, rc=%d",
- escape_str(idx).c_str(), rc);
- return rc;
- }
- if (reshardlog_entries) {
- *reshardlog_entries += 1;
- }
- }
- return 0;
-}
-
static int write_header_while_logrecord(cls_method_context_t hctx,
rgw_bucket_dir_header& header) {
if (header.resharding_in_logrecord())
entry.pending_map.insert(pair<string, rgw_bucket_pending_info>(op.tag, info));
// write out new key to disk
- bufferlist info_bl;
- encode(entry, info_bl);
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: setting map entry at key=%s",
__func__, escape_str(idx).c_str());
- rc = cls_cxx_map_set_val(hctx, idx, &info_bl);
+ rc = write_entry(hctx, entry, idx, header, false);
if (rc < 0) {
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s could not set value for key, key=%s, rc=%d",
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- if (header.resharding_in_logrecord()) {
- // write the duplicated index entry copy
- return reshard_log_index_operation(hctx, idx, entry.key, &info_bl);
- }
CLS_LOG_BITX(bitx_inst, 10, "EXITING %s, returning 0", __func__);
return 0;
int(entry.meta.category));
unaccount_entry(header, entry);
- bufferlist empty;
- ret = record_duplicate_entry(hctx, idx, key, &empty,
- header.resharding_in_logrecord());
- if (ret < 0)
- return ret;
-
- ret = cls_cxx_map_remove_key(hctx, idx);
+ ret = remove_entry(hctx, idx, key, header);
if (ret < 0) {
CLS_LOG(1, "%s: cls_cxx_map_remove_key failed with %d", __func__, ret);
return ret;
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: removing map entry with key=%s",
__func__, escape_str(idx).c_str());
- rc = cls_cxx_map_remove_key(hctx, idx);
+ rc = remove_entry(hctx, idx, entry.key, header);
if (rc < 0) {
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s: unable to remove map key, key=%s, rc=%d",
return rc;
}
- bufferlist empty;
- rc = record_duplicate_entry(hctx, idx, entry.key, &empty, header.resharding_in_logrecord());
- if (rc < 0)
- return rc;
} else {
// we removed this tag from pending_map so need to write the changes
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: setting map entry at key=%s",
__func__, escape_str(idx).c_str());
- bufferlist new_key_bl;
- encode(entry, new_key_bl);
- rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
+ rc = write_entry(hctx, entry, idx, header);
if (rc < 0) {
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s: unable to set map val, key=%s, rc=%d",
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
- header.resharding_in_logrecord(),
- &header.reshardlog_entries);
- if (rc < 0)
- return rc;
}
}
} // CLS_RGW_OP_CANCEL
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: removing map entry with key=%s",
__func__, escape_str(idx).c_str());
- rc = cls_cxx_map_remove_key(hctx, idx);
+ rc = remove_entry(hctx, idx, entry.key, header);
if (rc < 0) {
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s: unable to remove map key, key=%s, rc=%d",
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- bufferlist empty;
- rc = record_duplicate_entry(hctx, idx, entry.key, &empty, header.resharding_in_logrecord());
- if (rc < 0)
- return rc;
} else {
entry.exists = false;
- bufferlist new_key_bl;
- encode(entry, new_key_bl);
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: setting map entry at key=%s",
__func__, escape_str(idx).c_str());
- rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
+
+ rc = write_entry(hctx, entry, idx, header);
if (rc < 0) {
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s: unable to set map val, key=%s, rc=%d",
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
- header.resharding_in_logrecord(),
- &header.reshardlog_entries);
- if (rc < 0)
- return rc;
}
} // CLS_RGW_OP_DEL
else if (op.op == CLS_RGW_OP_ADD) {
stats.total_size += meta.accounted_size;
stats.total_size_rounded += cls_rgw_get_rounded_size(meta.accounted_size);
stats.actual_size += meta.size;
- bufferlist new_key_bl;
- encode(entry, new_key_bl);
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: setting map entry at key=%s",
__func__, escape_str(idx).c_str());
- rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
+ rc = write_entry(hctx, entry, idx, header);
if (rc < 0) {
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s: unable to set map value at key=%s, rc=%d",
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
- header.resharding_in_logrecord(),
- &header.reshardlog_entries);
- if (rc < 0)
- return rc;
} // CLS_RGW_OP_ADD
if (log_op) {
return rc;
} // rgw_bucket_complete_op
-template <class T>
-static int write_entry(cls_method_context_t hctx, T& entry, const string& key,
- uint32_t& reshardlog_entries, const bool is_resharding = false)
-{
- bufferlist bl;
- encode(entry, bl);
- int ret = cls_cxx_map_set_val(hctx, key, &bl);
- if (ret < 0) {
- return ret;
- }
- if (is_resharding) {
- ret = reshard_log_index_operation(hctx, key, entry.key, &bl);
- reshardlog_entries++;
- }
- return ret;
-}
-
static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, rgw_bucket_olh_entry *olh_data_entry, string *index_key, bool *found)
{
cls_rgw_obj_key olh_key;
}
static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
- const string& instance_idx, uint32_t& reshardlog_entries,
- bool is_resharding)
+ const string& instance_idx, rgw_bucket_dir_header& header)
{
CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(),
instance_idx.c_str(), instance_entry.flags);
/* write the instance entry */
- int ret = write_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
+ int ret = write_entry(hctx, instance_entry, instance_idx, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret);
return ret;
* write object instance entry, and if needed also the list entry
*/
static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
- const string& instance_idx, uint32_t& reshardlog_entries,
- bool is_resharding)
+ const string& instance_idx, rgw_bucket_dir_header& header)
{
- int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
+ int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, header);
if (ret < 0) {
return ret;
}
if (instance_idx != instance_list_idx) {
CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags);
/* write a new list entry for the object instance */
- ret = write_entry(hctx, instance_entry, instance_list_idx, reshardlog_entries, is_resharding);
+ ret = write_entry(hctx, instance_entry, instance_list_idx, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret);
return ret;
instance_entry.versioned_epoch = epoch;
}
- int unlink_list_entry(bool is_resharding) {
+ int unlink_list_entry(rgw_bucket_dir_header& header) {
string list_idx, list_sub_ver;
/* this instance has a previous list entry, remove that entry */
get_list_index_key(instance_entry, &list_idx);
CLS_LOG(20, "unlink_list_entry() list_idx=%s", escape_str(list_idx).c_str());
- int ret = cls_cxx_map_remove_key(hctx, list_idx);
+ int ret = remove_entry(hctx, list_idx, instance_entry.key, header);
if (ret < 0) {
- CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() list_idx=%s ret=%d", list_idx.c_str(), ret);
+ CLS_LOG(0, "ERROR: remove_entry() list_idx=%s ret=%d", list_idx.c_str(), ret);
return ret;
}
- if (is_resharding) {
- bufferlist empty;
- return reshard_log_index_operation(hctx, list_idx, instance_entry.key, &empty);
- }
return 0;
}
- int unlink(bool is_resharding, const cls_rgw_obj_key& key) {
+ int unlink(rgw_bucket_dir_header& header, const cls_rgw_obj_key& key) {
/* remove the instance entry */
CLS_LOG(20, "unlink() idx=%s", escape_str(instance_idx).c_str());
- int ret = cls_cxx_map_remove_key(hctx, instance_idx);
+ int ret = remove_entry(hctx, instance_idx, key, header);
if (ret < 0) {
- CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
+ CLS_LOG(0, "ERROR: remove_entry() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
}
- if (is_resharding) {
- bufferlist empty;
- return reshard_log_index_operation(hctx, instance_idx, key, &empty);
- }
return 0;
}
int write_entries(uint64_t flags_set, uint64_t flags_reset,
- uint32_t& reshardlog_entries, bool is_resharding) {
+ rgw_bucket_dir_header& header) {
if (!initialized) {
int ret = init();
if (ret < 0) {
/* write the instance and list entries */
bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty());
encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key);
- int ret = write_obj_entries(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
+ int ret = write_obj_entries(hctx, instance_entry, instance_idx, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
return 0;
}
- int write(uint64_t epoch, bool current, uint32_t& reshardlog_entries, bool is_resharding) {
+ int write(uint64_t epoch, bool current, rgw_bucket_dir_header& header) {
if (instance_entry.versioned_epoch > 0) {
CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
/* this instance has a previous list entry, remove that entry */
- int ret = unlink_list_entry(is_resharding);
+ int ret = unlink_list_entry(header);
if (ret < 0) {
return ret;
}
}
instance_entry.versioned_epoch = epoch;
- return write_entries(flags, 0, reshardlog_entries, is_resharding);
+ return write_entries(flags, 0, header);
}
- int demote_current(uint32_t& reshardlog_entries, bool is_resharding) {
- return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, reshardlog_entries, is_resharding);
+ int demote_current(rgw_bucket_dir_header& header) {
+ return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, header);
}
bool is_delete_marker() {
olh_data_entry.key = key;
}
- int write(uint32_t& reshardlog_entries, bool is_resharding) {
+ int write(rgw_bucket_dir_header& header) {
/* write the olh data entry */
- int ret = write_entry(hctx, olh_data_entry, olh_data_idx, reshardlog_entries, is_resharding);
+ int ret = write_entry(hctx, olh_data_entry, olh_data_idx, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret);
return ret;
};
static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key,
- uint32_t& reshardlog_entries, bool is_resharding)
+ rgw_bucket_dir_header& header)
{
rgw_bucket_dir_entry entry;
entry.key = key;
entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER;
- int ret = write_entry(hctx, entry, key.name, reshardlog_entries, is_resharding);
+ int ret = write_entry(hctx, entry, key.name, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret);
return ret;
cls_rgw_obj_key& key,
bool demote_current,
bool instance_only,
- uint32_t& reshardlog_entries,
- bool is_resharding)
+ rgw_bucket_dir_header& header)
{
if (!key.instance.empty()) {
return -EINVAL;
encode_obj_versioned_data_key(key, &new_idx);
if (instance_only) {
- ret = write_obj_instance_entry(hctx, entry, new_idx, reshardlog_entries, is_resharding);
+ ret = write_obj_instance_entry(hctx, entry, new_idx, header);
} else {
- ret = write_obj_entries(hctx, entry, new_idx, reshardlog_entries, is_resharding);
+ ret = write_obj_entries(hctx, entry, new_idx, header);
}
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d",
}
}
- ret = write_version_marker(hctx, key, reshardlog_entries, is_resharding);
+ ret = write_version_marker(hctx, key, header);
if (ret < 0) {
return ret;
}
* entry */
existed = (ret >= 0 && !other_obj.is_delete_marker());
if (ret >= 0 && other_obj.is_delete_marker() != op.delete_marker) {
- ret = other_obj.unlink_list_entry(header.resharding_in_logrecord());
+ ret = other_obj.unlink_list_entry(header);
if (ret < 0) {
return ret;
}
removing = existed && op.delete_marker;
if (!removing) {
- ret = other_obj.unlink(header.resharding_in_logrecord(), op.key);
+ ret = other_obj.unlink(header, op.key);
if (ret < 0) {
return ret;
}
const uint64_t prev_epoch = olh.get_epoch();
if (!olh.start_modify(op.olh_epoch)) {
- ret = obj.write(op.olh_epoch, false, header.reshardlog_entries, header.resharding_in_logrecord());
+ ret = obj.write(op.olh_epoch, false, header);
if (ret < 0) {
return ret;
}
if (!(olh_entry.key == op.key)) {
BIVerObjEntry old_obj(hctx, olh_entry.key);
- ret = old_obj.demote_current(header.reshardlog_entries, header.resharding_in_logrecord());
+ ret = old_obj.demote_current(header);
if (ret < 0) {
CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
return ret;
} else {
bool instance_only = (op.key.instance.empty() && op.delete_marker);
cls_rgw_obj_key key(op.key.name);
- ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only,
- header.reshardlog_entries,
- header.resharding_in_logrecord());
+ ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
}
olh.set_exists(true);
- ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
+ ret = olh.write(header);
if (ret < 0) {
CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
return ret;
}
/* write the instance and list entries */
- ret = obj.write(olh.get_epoch(), promote, header.reshardlog_entries,
- header.resharding_in_logrecord());
+ ret = obj.write(olh.get_epoch(), promote, header);
if (ret < 0) {
return ret;
}
if (!olh_found) {
bool instance_only = false;
cls_rgw_obj_key key(dest_key.name);
- ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only,
- header.reshardlog_entries,
- header.resharding_in_logrecord());
+ ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
}
if (!olh.start_modify(op.olh_epoch)) {
- ret = obj.unlink_list_entry(header.resharding_in_logrecord());
+ ret = obj.unlink_list_entry(header);
if (ret < 0) {
return ret;
}
}
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
- return olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
+ return olh.write(header);
}
rgw_bucket_olh_entry& olh_entry = olh.get_entry();
if (found) {
BIVerObjEntry next(hctx, next_key);
- ret = next.write(olh.get_epoch(), true, header.reshardlog_entries,
- header.resharding_in_logrecord());
+ ret = next.write(olh.get_epoch(), true, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
return ret;
} else {
/* this is a delete marker, it's our responsibility to remove its
* instance entry */
- ret = obj.unlink(header.resharding_in_logrecord(), op.key);
+ ret = obj.unlink(header, op.key);
if (ret < 0) {
return ret;
}
}
- ret = obj.unlink_list_entry(header.resharding_in_logrecord());
+ ret = obj.unlink_list_entry(header);
if (ret < 0) {
return ret;
}
- ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
+ ret = olh.write(header);
if (ret < 0) {
return ret;
}
}
/* write the olh data entry */
- ret = write_entry(hctx, olh_data_entry, olh_data_key,
- header.reshardlog_entries,
- header.resharding_in_logrecord());
+ ret = write_entry(hctx, olh_data_entry, olh_data_key, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret);
return ret;
return -ECANCELED;
}
- ret = cls_cxx_map_remove_key(hctx, olh_data_key);
+ ret = remove_entry(hctx, olh_data_key, olh_data_entry.key, header);
if (ret < 0) {
CLS_LOG(1, "NOTICE: %s: can't remove key %s ret=%d", __func__, olh_data_key.c_str(), ret);
return ret;
}
- bufferlist empty;
- ret = record_duplicate_entry(hctx, olh_data_key, olh_data_entry.key, &empty, header.resharding_in_logrecord());
- if (ret < 0)
- return ret;
rgw_bucket_dir_entry plain_entry;
return 0;
}
- ret = cls_cxx_map_remove_key(hctx, op.key.name);
+ ret = remove_entry(hctx, op.key.name, plain_entry.key, header);
if (ret < 0) {
CLS_LOG(1, "NOTICE: %s: can't remove key %s ret=%d", __func__, op.key.name.c_str(), ret);
return ret;
}
- ret = record_duplicate_entry(hctx, op.key.name, plain_entry.key, &empty, header.resharding_in_logrecord());
- if (ret < 0)
- return ret;
-
return 0;
}
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: removing map entry with key=%s",
__func__, escape_str(cur_change_key).c_str());
- ret = cls_cxx_map_remove_key(hctx, cur_change_key);
+ ret = remove_entry(hctx, cur_change_key, cur_change.key, header);
if (ret < 0) {
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to remove key, key=%s, error=%d",
__func__, escape_str(cur_change_key).c_str(), ret);
return ret;
}
}
- if (header.resharding_in_logrecord()) {
- bufferlist empty;
- return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &empty);
- }
break;
case CEPH_RGW_UPDATE:
CLS_LOG_BITX(bitx_inst, 10,
stats.actual_size += cur_change.meta.size;
header_changed = true;
cur_change.index_ver = header.ver;
- bufferlist cur_state_bl;
- encode(cur_change, cur_state_bl);
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: setting map entry at key=%s",
__func__, escape_str(cur_change.key.to_string()).c_str());
- ret = cls_cxx_map_set_val(hctx, cur_change_key, &cur_state_bl);
+ ret = write_entry(hctx, cur_change, cur_change_key, header);
if (ret < 0) {
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to set value for key, key=%s, error=%d",
__func__, escape_str(cur_change_key).c_str(), ret);
return ret;
}
}
- if (header.resharding_in_logrecord()) {
- return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &cur_state_bl);
- }
break;
} // switch(op)
} // if (cur_disk.pending_map.empty())
int r = cls_cxx_map_remove_key(hctx, entry.idx);
if (r < 0) {
CLS_LOG(0, "ERROR: %s: cls_cxx_map_remove_key() returned r=%d", __func__, r);
- return r;
}
} else {
int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data);
}
header.reshardlog_entries = 0;
- rc = write_bucket_header(hctx, &header); if (rc < 0) {
+ rc = write_bucket_header(hctx, &header);
+ if (rc < 0) {
CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to write header\n");
return rc;
}
return write_bucket_header(hctx, &header);
}
-static int rgw_set_bucket_resharding2(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- CLS_LOG(10, "entered %s", __func__);
- cls_rgw_set_bucket_resharding_op op;
-
- auto in_iter = in->cbegin();
- try {
- decode(op, in_iter);
- } catch (ceph::buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_rgw_set_bucket_resharding: failed to decode entry\n");
- return -EINVAL;
- }
-
- rgw_bucket_dir_header header;
- int rc = read_bucket_header(hctx, &header);
- if (rc < 0) {
- CLS_LOG(1, "ERROR: %s: failed to read header", __func__);
- return rc;
- }
-
- if (op.entry.reshard_status == cls_rgw_reshard_status::IN_LOGRECORD) {
- if (header.reshardlog_entries != 0) {
- CLS_LOG(1, "ERROR: %s: cannot set logrecord status on non-zero log record count", __func__);
- return -EOPNOTSUPP;
- }
- }
- header.new_instance.set_status(op.entry.reshard_status);
-
- return write_bucket_header(hctx, &header);
-}
-
static int rgw_clear_bucket_resharding(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
/* resharding attribute */
cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding);
- cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING2, CLS_METHOD_RD | CLS_METHOD_WR,
- rgw_set_bucket_resharding2, &h_rgw_set_bucket_resharding);
cls_register_cxx_method(h_class, RGW_CLEAR_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
rgw_clear_bucket_resharding, &h_rgw_clear_bucket_resharding);
cls_register_cxx_method(h_class, RGW_GUARD_BUCKET_RESHARDING, CLS_METHOD_RD ,
return 0;
}
- int flush(bool process_log = false, RGWBucketReshard *br = nullptr,
- const DoutPrefixProvider *dpp = nullptr) {
+ int flush(bool process_log = false) {
if (entries.size() == 0) {
return 0;
}
entries.clear();
stats.clear();
- if (br != nullptr) {
- ret = br->renew_lock_if_needed(dpp);
- if (ret < 0) {
- return ret;
- }
- }
return 0;
}
const DoutPrefixProvider *dpp = nullptr) {
int ret = 0;
for (auto& shard : target_shards) {
- int r = shard.flush(process_log, br, dpp);
+ int r = shard.flush(process_log);
if (r < 0) {
derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
ret = r;
derr << "ERROR: target_shards[" << shard.get_shard_id() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
ret = r;
}
+ if (br != nullptr) {
+ r = br->renew_lock_if_needed(dpp);
+ }
+ if (r < 0) {
+ derr << "ERROR: br->renew_lock_if_needed() returned error: " << cpp_strerror(-r) << dendl;
+ ret = r;
+ }
}
return ret;
}
static int set_resharding_status(const DoutPrefixProvider *dpp,
rgw::sal::RadosStore* store,
const RGWBucketInfo& bucket_info,
- cls_rgw_reshard_status status,
- bool judge_support_logrecord = false)
+ cls_rgw_reshard_status status)
{
cls_rgw_bucket_instance_entry instance_entry;
instance_entry.set_status(status);
- int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry, judge_support_logrecord);
+ int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
if (ret < 0) {
ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
<< cpp_strerror(-ret) << dendl;
static int init_target_index(rgw::sal::RadosStore* store,
RGWBucketInfo& bucket_info,
const rgw::bucket_index_layout_generation& index,
+ ReshardFaultInjector& fault,
bool& support_logrecord,
const DoutPrefixProvider* dpp)
{
- int ret = store->svc()->bi->init_index(dpp, bucket_info, index, true);
+
+ int ret = 0;
+ if (ret = fault.check("init_index");
+ ret == 0) { // no fault injected, initialize index
+ ret = store->svc()->bi->init_index(dpp, bucket_info, index, true);
+ }
if (ret == -EOPNOTSUPP) {
ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord, "
<< "falling back to block reshard mode." << dendl;
support_logrecord = false;
ret = store->svc()->bi->init_index(dpp, bucket_info, index, false);
- }
- if (ret < 0) {
+ } else if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to initialize "
"target index shard objects: " << cpp_strerror(ret) << dendl;
return ret;
}
// create the index shard objects
- int ret = init_target_index(store, bucket_info, target, support_logrecord, dpp);
+ int ret = init_target_index(store, bucket_info, target, fault, support_logrecord, dpp);
if (ret < 0) {
return ret;
}
return ret;
}
- // trim the reshard log entries to guarantee that any existing log entries are cleared,
- // if there are no reshard log entries, this is a no-op that costs little time
- if (support_logrecord) {
- ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
- if (ret == -EOPNOTSUPP) {
- // not an error, logrecord is not supported, change to block reshard
- ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported"
- << " logrecord, falling back to block reshard mode." << dendl;
- bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
- support_logrecord = false;
- return 0;
- }
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: "
- << cpp_strerror(ret) << dendl;
- return ret;
- }
- }
return 0;
} // init_target_layout
ret = 0; // non-fatal error
}
// trim the reshard log entries written in logrecord state
- ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
+ ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to trim "
"reshard log entries: " << cpp_strerror(ret) << dendl;
return ret;
}
+ // trim the reshard log entries to guarantee that any existing log entries are cleared,
+ // if there are no reshard log entries, this is a no-op that costs little time
if (support_logrecord) {
- if (ret = fault.check("logrecord_writes");
- ret == 0) { // no fault injected, record log with writing to the current index shards
- ret = set_resharding_status(dpp, store, bucket_info,
- cls_rgw_reshard_status::IN_LOGRECORD,
- true);
+ if (ret = fault.check("trim_reshard_log_entries");
+ ret == 0) { // no fault injected, trim reshard log entries
+ ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, y);
}
if (ret == -EOPNOTSUPP) {
- ldpp_dout(dpp, 0) << "WARNING: " << "set_resharding_status()"
- << " doesn't support logrecords,"
- << " fallback to blocking mode." << dendl;
+ // not an error, logrecord is not supported, change to block reshard
+ ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported"
+ << " logrecord, falling back to block reshard mode." << dendl;
bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
support_logrecord = false;
+ } else if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: "
+ << cpp_strerror(ret) << dendl;
+ return ret;
}
}
- if (!support_logrecord) {
+ if (support_logrecord) {
+ if (ret = fault.check("logrecord_writes");
+ ret == 0) { // no fault injected, record log with writing to the current index shards
+ ret = set_resharding_status(dpp, store, bucket_info,
+ cls_rgw_reshard_status::IN_LOGRECORD);
+ }
+ } else {
ret = set_resharding_status(dpp, store, bucket_info,
- cls_rgw_reshard_status::IN_PROGRESS,
- false);
+ cls_rgw_reshard_status::IN_PROGRESS);
}
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to pause "