static int record_duplicate_entry(cls_method_context_t hctx, string& idx,
const cls_rgw_obj_key& key, bufferlist* log_bl,
- bool resharding) {
+ bool resharding, uint32_t* reshardlog_entries = NULL) {
if (resharding) {
int rc = reshard_log_index_operation(hctx, idx, key, log_bl);
if (rc < 0) {
escape_str(idx).c_str(), rc);
return rc;
}
+ if (reshardlog_entries) {
+ *reshardlog_entries += 1;
+ }
}
return 0;
}
+static int write_header_while_logrecord(cls_method_context_t hctx,
+ rgw_bucket_dir_header& header) {
+ if (header.resharding_in_logrecord())
+ return write_bucket_header(hctx, &header);
+ return 0;
+}
+
int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
const ConfigProxy& conf = cls_get_config(hctx);
unaccount_entry(header, entry);
bufferlist empty;
- ret = record_duplicate_entry(hctx, idx, key, &empty, header.resharding_in_logrecord());
+ ret = record_duplicate_entry(hctx, idx, key, &empty,
+ header.resharding_in_logrecord());
if (ret < 0)
return ret;
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord());
+ rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
+ header.resharding_in_logrecord(),
+ &header.reshardlog_entries);
if (rc < 0)
return rc;
}
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord());
+ rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
+ header.resharding_in_logrecord(),
+ &header.reshardlog_entries);
if (rc < 0)
return rc;
}
__func__, escape_str(idx).c_str(), rc);
return rc;
}
- rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord());
+ rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
+ header.resharding_in_logrecord(),
+ &header.reshardlog_entries);
if (rc < 0)
return rc;
} // CLS_RGW_OP_ADD
template <class T>
static int write_entry(cls_method_context_t hctx, T& entry, const string& key,
- const bool is_resharding = false)
+ uint32_t& reshardlog_entries, const bool is_resharding = false)
{
bufferlist bl;
encode(entry, bl);
}
if (is_resharding) {
ret = reshard_log_index_operation(hctx, key, entry.key, &bl);
+ reshardlog_entries++;
}
return ret;
}
}
static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
- const string& instance_idx, bool is_resharding)
+ const string& instance_idx, uint32_t& reshardlog_entries,
+ bool is_resharding)
{
CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(),
instance_idx.c_str(), instance_entry.flags);
/* write the instance entry */
- int ret = write_entry(hctx, instance_entry, instance_idx, is_resharding);
+ int ret = write_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret);
return ret;
* write object instance entry, and if needed also the list entry
*/
static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
- const string& instance_idx, bool is_resharding)
+ const string& instance_idx, uint32_t& reshardlog_entries,
+ bool is_resharding)
{
- int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, is_resharding);
+ int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
if (ret < 0) {
return ret;
}
if (instance_idx != instance_list_idx) {
CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags);
/* write a new list entry for the object instance */
- ret = write_entry(hctx, instance_entry, instance_list_idx, is_resharding);
+ ret = write_entry(hctx, instance_entry, instance_list_idx, reshardlog_entries, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret);
return ret;
return 0;
}
- int write_entries(uint64_t flags_set, uint64_t flags_reset, bool is_resharding) {
+ int write_entries(uint64_t flags_set, uint64_t flags_reset,
+ uint32_t& reshardlog_entries, bool is_resharding) {
if (!initialized) {
int ret = init();
if (ret < 0) {
/* write the instance and list entries */
bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty());
encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key);
- int ret = write_obj_entries(hctx, instance_entry, instance_idx, is_resharding);
+ int ret = write_obj_entries(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
return ret;
return 0;
}
- int write(uint64_t epoch, bool current, bool is_resharding) {
+ int write(uint64_t epoch, bool current, uint32_t& reshardlog_entries, bool is_resharding) {
if (instance_entry.versioned_epoch > 0) {
CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
/* this instance has a previous list entry, remove that entry */
}
instance_entry.versioned_epoch = epoch;
- return write_entries(flags, 0, is_resharding);
+ return write_entries(flags, 0, reshardlog_entries, is_resharding);
}
- int demote_current(bool is_resharding) {
- return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, is_resharding);
+ int demote_current(uint32_t& reshardlog_entries, bool is_resharding) {
+ return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, reshardlog_entries, is_resharding);
}
bool is_delete_marker() {
olh_data_entry.key = key;
}
- int write(bool is_resharding) {
+ int write(uint32_t& reshardlog_entries, bool is_resharding) {
/* write the olh data entry */
- int ret = write_entry(hctx, olh_data_entry, olh_data_idx, is_resharding);
+ int ret = write_entry(hctx, olh_data_entry, olh_data_idx, reshardlog_entries, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret);
return ret;
};
static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key,
- bool is_resharding)
+ uint32_t& reshardlog_entries, bool is_resharding)
{
rgw_bucket_dir_entry entry;
entry.key = key;
entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER;
- int ret = write_entry(hctx, entry, key.name, is_resharding);
+ int ret = write_entry(hctx, entry, key.name, reshardlog_entries, is_resharding);
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret);
return ret;
cls_rgw_obj_key& key,
bool demote_current,
bool instance_only,
+ uint32_t& reshardlog_entries,
bool is_resharding)
{
if (!key.instance.empty()) {
encode_obj_versioned_data_key(key, &new_idx);
if (instance_only) {
- ret = write_obj_instance_entry(hctx, entry, new_idx, is_resharding);
+ ret = write_obj_instance_entry(hctx, entry, new_idx, reshardlog_entries, is_resharding);
} else {
- ret = write_obj_entries(hctx, entry, new_idx, is_resharding);
+ ret = write_obj_entries(hctx, entry, new_idx, reshardlog_entries, is_resharding);
}
if (ret < 0) {
CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d",
}
}
- ret = write_version_marker(hctx, key, is_resharding);
+ ret = write_version_marker(hctx, key, reshardlog_entries, is_resharding);
if (ret < 0) {
return ret;
}
const uint64_t prev_epoch = olh.get_epoch();
if (!olh.start_modify(op.olh_epoch)) {
- ret = obj.write(op.olh_epoch, false, header.resharding_in_logrecord());
+ ret = obj.write(op.olh_epoch, false, header.reshardlog_entries, header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
if (removing) {
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
}
- return 0;
+ return write_header_while_logrecord(hctx, header);
}
// promote this version to current if it's a newer epoch, or if it matches the
if (!(olh_entry.key == op.key)) {
BIVerObjEntry old_obj(hctx, olh_entry.key);
- ret = old_obj.demote_current(header.resharding_in_logrecord());
+ ret = old_obj.demote_current(header.reshardlog_entries, header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
return ret;
bool instance_only = (op.key.instance.empty() && op.delete_marker);
cls_rgw_obj_key key(op.key.name);
ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only,
+ header.reshardlog_entries,
header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
}
olh.set_exists(true);
- ret = olh.write(header.resharding_in_logrecord());
+ ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
return ret;
}
/* write the instance and list entries */
- ret = obj.write(olh.get_epoch(), promote, header.resharding_in_logrecord());
+ ret = obj.write(olh.get_epoch(), promote, header.reshardlog_entries,
+ header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
if (!op.log_op) {
- return 0;
+ return write_header_while_logrecord(hctx, header);
}
if (header.syncstopped) {
- return 0;
+ return write_header_while_logrecord(hctx, header);
}
rgw_bucket_dir_entry& entry = obj.get_dir_entry();
bool instance_only = false;
cls_rgw_obj_key key(dest_key.name);
ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only,
+ header.reshardlog_entries,
header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
}
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
- return olh.write(header.resharding_in_logrecord());
+ return olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
}
rgw_bucket_olh_entry& olh_entry = olh.get_entry();
if (found) {
BIVerObjEntry next(hctx, next_key);
- ret = next.write(olh.get_epoch(), true, header.resharding_in_logrecord());
+ ret = next.write(olh.get_epoch(), true, header.reshardlog_entries,
+ header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
return ret;
return ret;
}
- ret = olh.write(header.resharding_in_logrecord());
+ ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
if (ret < 0) {
return ret;
}
if (!op.log_op) {
- return 0;
+ return write_header_while_logrecord(hctx, header);
}
if (header.syncstopped) {
- return 0;
+ return write_header_while_logrecord(hctx, header);
}
rgw_bucket_entry_ver ver;
}
/* write the olh data entry */
- ret = write_entry(hctx, olh_data_entry, olh_data_key, header.resharding_in_logrecord());
+ ret = write_entry(hctx, olh_data_entry, olh_data_key,
+ header.reshardlog_entries,
+ header.resharding_in_logrecord());
if (ret < 0) {
CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret);
return ret;
std::set<std::string> keys;
bool more = false;
- int rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more);
+ rgw_bucket_dir_header header;
+ int rc = read_bucket_header(hctx, &header);
+ if (rc < 0) {
+ CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to read header\n");
+ return rc;
+ }
+
+ rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more);
if (rc < 0) {
CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc);
return rc;
CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc);
return rc;
}
+
+ header.reshardlog_entries = 0;
+ rc = write_bucket_header(hctx, &header); if (rc < 0) {
+ CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to write header\n");
+ return rc;
+ }
return 0;
}
return write_bucket_header(hctx, &header);
}
+static int rgw_set_bucket_resharding2(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(10, "entered %s", __func__);
+ cls_rgw_set_bucket_resharding_op op;
+
+ auto in_iter = in->cbegin();
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_rgw_set_bucket_resharding: failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ rgw_bucket_dir_header header;
+ int rc = read_bucket_header(hctx, &header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: %s: failed to read header", __func__);
+ return rc;
+ }
+
+ if (op.entry.reshard_status == cls_rgw_reshard_status::IN_LOGRECORD) {
+ if (header.reshardlog_entries != 0) {
+ CLS_LOG(1, "ERROR: %s: cannot set logrecord status on non-zero log record count", __func__);
+ return -EOPNOTSUPP;
+ }
+ }
+ header.new_instance.set_status(op.entry.reshard_status);
+
+ return write_bucket_header(hctx, &header);
+}
+
static int rgw_clear_bucket_resharding(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
static int rgw_guard_bucket_resharding(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
+
+ const ConfigProxy& conf = cls_get_config(hctx);
+ const uint32_t reshardlog_threshold = conf->rgw_reshardlog_threshold;
+
cls_rgw_guard_bucket_resharding_op op;
auto in_iter = in->cbegin();
return rc;
}
- if (header.resharding_in_progress()) {
+ if (header.resharding_in_progress() ||
+ (header.resharding_in_logrecord() && header.reshardlog_entries >= reshardlog_threshold)) {
return op.ret_err;
}
cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding);
cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING2, CLS_METHOD_RD | CLS_METHOD_WR,
- rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding);
+ rgw_set_bucket_resharding2, &h_rgw_set_bucket_resharding);
cls_register_cxx_method(h_class, RGW_CLEAR_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
rgw_clear_bucket_resharding, &h_rgw_clear_bucket_resharding);
cls_register_cxx_method(h_class, RGW_GUARD_BUCKET_RESHARDING, CLS_METHOD_RD ,