From: liangmingyuan Date: Fri, 26 Jul 2024 08:23:32 +0000 (+0800) Subject: reshard: limiting the number of log to be recorded X-Git-Tag: v20.0.0~1122^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e2fb2c63ea245031bddf34a26e1cde592932857e;p=ceph.git reshard: limiting the number of log to be recorded When the bucket's index shards are already overloaded, avoid adding too many extra keys in the reshard log. Limiting the size of this reshard log to `rgw_reshardlog_threshold`, if an index write operation during the logrecord stage would exceed that limit, returning the ERR_BUSY_RESHARDING error early. Using the reshardlog_entries in `rgw_bucket_dir_header` to do this, when writting shards, adding the reshardlog_entries. But not need to add in deleting, because number of index entries reduce meanwhile. Signed-off-by: Mingyuan Liang --- diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 9f4fd54a67d..8fc928e07d9 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -848,7 +848,7 @@ static std::string modify_op_str(uint8_t op) { static int record_duplicate_entry(cls_method_context_t hctx, string& idx, const cls_rgw_obj_key& key, bufferlist* log_bl, - bool resharding) { + bool resharding, uint32_t* reshardlog_entries = NULL) { if (resharding) { int rc = reshard_log_index_operation(hctx, idx, key, log_bl); if (rc < 0) { @@ -856,10 +856,20 @@ static int record_duplicate_entry(cls_method_context_t hctx, string& idx, escape_str(idx).c_str(), rc); return rc; } + if (reshardlog_entries) { + *reshardlog_entries += 1; + } } return 0; } +static int write_header_while_logrecord(cls_method_context_t hctx, + rgw_bucket_dir_header& header) { + if (header.resharding_in_logrecord()) + return write_bucket_header(hctx, &header); + return 0; +} + int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { const ConfigProxy& conf = cls_get_config(hctx); @@ -1067,7 +1077,8 @@ static int complete_remove_obj(cls_method_context_t hctx, unaccount_entry(header, entry); bufferlist empty; - ret = record_duplicate_entry(hctx, idx, key, &empty, header.resharding_in_logrecord()); + ret = record_duplicate_entry(hctx, idx, key, &empty, + header.resharding_in_logrecord()); if (ret < 0) return ret; @@ -1202,7 +1213,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist __func__, escape_str(idx).c_str(), rc); return rc; } - rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord()); + rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, + header.resharding_in_logrecord(), + &header.reshardlog_entries); if (rc < 0) return rc; } @@ -1251,7 +1264,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist __func__, escape_str(idx).c_str(), rc); return rc; } - rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord()); + rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, + header.resharding_in_logrecord(), + &header.reshardlog_entries); if (rc < 0) return rc; } @@ -1286,7 +1301,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist __func__, escape_str(idx).c_str(), rc); return rc; } - rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord()); + rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, + header.resharding_in_logrecord(), + &header.reshardlog_entries); if (rc < 0) return rc; } // CLS_RGW_OP_ADD @@ -1336,7 +1353,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist template static int write_entry(cls_method_context_t hctx, T& entry, const string& key, - const bool is_resharding = false) + uint32_t& reshardlog_entries, const bool is_resharding = false) { bufferlist bl; encode(entry, bl); @@ -1346,6 +1363,7 @@ static int write_entry(cls_method_context_t hctx, T& entry, const string& key, } if (is_resharding) { ret = reshard_log_index_operation(hctx, key, entry.key, &bl); + reshardlog_entries++; } return ret; } @@ -1381,12 +1399,13 @@ static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, co } static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, - const string& instance_idx, bool is_resharding) + const string& instance_idx, uint32_t& reshardlog_entries, + bool is_resharding) { CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(), instance_idx.c_str(), instance_entry.flags); /* write the instance entry */ - int ret = write_entry(hctx, instance_entry, instance_idx, is_resharding); + int ret = write_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret); return ret; @@ -1398,9 +1417,10 @@ static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_en * write object instance entry, and if needed also the list entry */ static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, - const string& instance_idx, bool is_resharding) + const string& instance_idx, uint32_t& reshardlog_entries, + bool is_resharding) { - int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, is_resharding); + int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding); if (ret < 0) { return ret; } @@ -1410,7 +1430,7 @@ static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& in if (instance_idx != instance_list_idx) { CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags); /* write a new list entry for the object instance */ - ret = write_entry(hctx, instance_entry, instance_list_idx, is_resharding); + ret = write_entry(hctx, instance_entry, instance_list_idx, reshardlog_entries, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret); return ret; @@ -1498,7 +1518,8 @@ public: return 0; } - int write_entries(uint64_t flags_set, uint64_t flags_reset, bool is_resharding) { + int write_entries(uint64_t flags_set, uint64_t flags_reset, + uint32_t& reshardlog_entries, bool is_resharding) { if (!initialized) { int ret = init(); if (ret < 0) { @@ -1511,7 +1532,7 @@ public: /* write the instance and list entries */ bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty()); encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key); - int ret = write_obj_entries(hctx, instance_entry, instance_idx, is_resharding); + int ret = write_obj_entries(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret); return ret; @@ -1520,7 +1541,7 @@ public: return 0; } - int write(uint64_t epoch, bool current, bool is_resharding) { + int write(uint64_t epoch, bool current, uint32_t& reshardlog_entries, bool is_resharding) { if (instance_entry.versioned_epoch > 0) { CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch); /* this instance has a previous list entry, remove that entry */ @@ -1536,11 +1557,11 @@ public: } instance_entry.versioned_epoch = epoch; - return write_entries(flags, 0, is_resharding); + return write_entries(flags, 0, reshardlog_entries, is_resharding); } - int demote_current(bool is_resharding) { - return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, is_resharding); + int demote_current(uint32_t& reshardlog_entries, bool is_resharding) { + return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, reshardlog_entries, is_resharding); } bool is_delete_marker() { @@ -1642,9 +1663,9 @@ public: olh_data_entry.key = key; } - int write(bool is_resharding) { + int write(uint32_t& reshardlog_entries, bool is_resharding) { /* write the olh data entry */ - int ret = write_entry(hctx, olh_data_entry, olh_data_idx, is_resharding); + int ret = write_entry(hctx, olh_data_entry, olh_data_idx, reshardlog_entries, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret); return ret; @@ -1679,12 +1700,12 @@ public: }; static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key, - bool is_resharding) + uint32_t& reshardlog_entries, bool is_resharding) { rgw_bucket_dir_entry entry; entry.key = key; entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER; - int ret = write_entry(hctx, entry, key.name, is_resharding); + int ret = write_entry(hctx, entry, key.name, reshardlog_entries, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret); return ret; @@ -1702,6 +1723,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, cls_rgw_obj_key& key, bool demote_current, bool instance_only, + uint32_t& reshardlog_entries, bool is_resharding) { if (!key.instance.empty()) { @@ -1729,9 +1751,9 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, encode_obj_versioned_data_key(key, &new_idx); if (instance_only) { - ret = write_obj_instance_entry(hctx, entry, new_idx, is_resharding); + ret = write_obj_instance_entry(hctx, entry, new_idx, reshardlog_entries, is_resharding); } else { - ret = write_obj_entries(hctx, entry, new_idx, is_resharding); + ret = write_obj_entries(hctx, entry, new_idx, reshardlog_entries, is_resharding); } if (ret < 0) { CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d", @@ -1740,7 +1762,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, } } - ret = write_version_marker(hctx, key, is_resharding); + ret = write_version_marker(hctx, key, reshardlog_entries, is_resharding); if (ret < 0) { return ret; } @@ -1894,14 +1916,14 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer const uint64_t prev_epoch = olh.get_epoch(); if (!olh.start_modify(op.olh_epoch)) { - ret = obj.write(op.olh_epoch, false, header.resharding_in_logrecord()); + ret = obj.write(op.olh_epoch, false, header.reshardlog_entries, header.resharding_in_logrecord()); if (ret < 0) { return ret; } if (removing) { olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch); } - return 0; + return write_header_while_logrecord(hctx, header); } // promote this version to current if it's a newer epoch, or if it matches the @@ -1926,7 +1948,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer if (!(olh_entry.key == op.key)) { BIVerObjEntry old_obj(hctx, olh_entry.key); - ret = old_obj.demote_current(header.resharding_in_logrecord()); + ret = old_obj.demote_current(header.reshardlog_entries, header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret); return ret; @@ -1938,6 +1960,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer bool instance_only = (op.key.instance.empty() && op.delete_marker); cls_rgw_obj_key key(op.key.name); ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, + header.reshardlog_entries, header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); @@ -1960,24 +1983,25 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer } olh.set_exists(true); - ret = olh.write(header.resharding_in_logrecord()); + ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret); return ret; } /* write the instance and list entries */ - ret = obj.write(olh.get_epoch(), promote, header.resharding_in_logrecord()); + ret = obj.write(olh.get_epoch(), promote, header.reshardlog_entries, + header.resharding_in_logrecord()); if (ret < 0) { return ret; } if (!op.log_op) { - return 0; + return write_header_while_logrecord(hctx, header); } if (header.syncstopped) { - return 0; + return write_header_while_logrecord(hctx, header); } rgw_bucket_dir_entry& entry = obj.get_dir_entry(); @@ -2051,6 +2075,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, bool instance_only = false; cls_rgw_obj_key key(dest_key.name); ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, + header.reshardlog_entries, header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); @@ -2073,7 +2098,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch); - return olh.write(header.resharding_in_logrecord()); + return olh.write(header.reshardlog_entries, header.resharding_in_logrecord()); } rgw_bucket_olh_entry& olh_entry = olh.get_entry(); @@ -2093,7 +2118,8 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, if (found) { BIVerObjEntry next(hctx, next_key); - ret = next.write(olh.get_epoch(), true, header.resharding_in_logrecord()); + ret = next.write(olh.get_epoch(), true, header.reshardlog_entries, + header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret); return ret; @@ -2131,17 +2157,17 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, return ret; } - ret = olh.write(header.resharding_in_logrecord()); + ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord()); if (ret < 0) { return ret; } if (!op.log_op) { - return 0; + return write_header_while_logrecord(hctx, header); } if (header.syncstopped) { - return 0; + return write_header_while_logrecord(hctx, header); } rgw_bucket_entry_ver ver; @@ -2263,7 +2289,9 @@ static int rgw_bucket_trim_olh_log(cls_method_context_t hctx, bufferlist *in, bu } /* write the olh data entry */ - ret = write_entry(hctx, olh_data_entry, olh_data_key, header.resharding_in_logrecord()); + ret = write_entry(hctx, olh_data_entry, olh_data_key, + header.reshardlog_entries, + header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret); return ret; @@ -3836,7 +3864,14 @@ static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bu std::set keys; bool more = false; - int rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more); + rgw_bucket_dir_header header; + int rc = read_bucket_header(hctx, &header); + if (rc < 0) { + CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to read header\n"); + return rc; + } + + rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more); if (rc < 0) { CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc); return rc; @@ -3861,6 +3896,12 @@ static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bu CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc); return rc; } + + header.reshardlog_entries = 0; + rc = write_bucket_header(hctx, &header); if (rc < 0) { + CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to write header\n"); + return rc; + } return 0; } @@ -4911,6 +4952,37 @@ static int rgw_set_bucket_resharding(cls_method_context_t hctx, bufferlist *in, return write_bucket_header(hctx, &header); } +static int rgw_set_bucket_resharding2(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG(10, "entered %s", __func__); + cls_rgw_set_bucket_resharding_op op; + + auto in_iter = in->cbegin(); + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_rgw_set_bucket_resharding: failed to decode entry\n"); + return -EINVAL; + } + + rgw_bucket_dir_header header; + int rc = read_bucket_header(hctx, &header); + if (rc < 0) { + CLS_LOG(1, "ERROR: %s: failed to read header", __func__); + return rc; + } + + if (op.entry.reshard_status == cls_rgw_reshard_status::IN_LOGRECORD) { + if (header.reshardlog_entries != 0) { + CLS_LOG(1, "ERROR: %s: cannot set logrecord status on non-zero log record count", __func__); + return -EOPNOTSUPP; + } + } + header.new_instance.set_status(op.entry.reshard_status); + + return write_bucket_header(hctx, &header); +} + static int rgw_clear_bucket_resharding(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { CLS_LOG(10, "entered %s", __func__); @@ -4938,6 +5010,10 @@ static int rgw_clear_bucket_resharding(cls_method_context_t hctx, bufferlist *in static int rgw_guard_bucket_resharding(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { CLS_LOG(10, "entered %s", __func__); + + const ConfigProxy& conf = cls_get_config(hctx); + const uint32_t reshardlog_threshold = conf->rgw_reshardlog_threshold; + cls_rgw_guard_bucket_resharding_op op; auto in_iter = in->cbegin(); @@ -4955,7 +5031,8 @@ static int rgw_guard_bucket_resharding(cls_method_context_t hctx, bufferlist *in return rc; } - if (header.resharding_in_progress()) { + if (header.resharding_in_progress() || + (header.resharding_in_logrecord() && header.reshardlog_entries >= reshardlog_threshold)) { return op.ret_err; } @@ -5118,7 +5195,7 @@ CLS_INIT(rgw) cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR, rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding); cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING2, CLS_METHOD_RD | CLS_METHOD_WR, - rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding); + rgw_set_bucket_resharding2, &h_rgw_set_bucket_resharding); cls_register_cxx_method(h_class, RGW_CLEAR_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR, rgw_clear_bucket_resharding, &h_rgw_clear_bucket_resharding); cls_register_cxx_method(h_class, RGW_GUARD_BUCKET_RESHARDING, CLS_METHOD_RD , diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index ad8e7f16e2f..8b125640e86 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -697,6 +697,7 @@ void rgw_bucket_dir_header::dump(Formatter *f) const } f->close_section(); ::encode_json("new_instance", new_instance, f); + f->dump_int("reshardlog_entries", reshardlog_entries); } void rgw_bucket_dir::generate_test_instances(list& o) diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index f3ef5ec6aec..8c90bee4c50 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -828,11 +828,13 @@ struct rgw_bucket_dir_header { std::string max_marker; cls_rgw_bucket_instance_entry new_instance; bool syncstopped; + uint32_t reshardlog_entries; - rgw_bucket_dir_header() : tag_timeout(0), ver(0), master_ver(0), syncstopped(false) {} + rgw_bucket_dir_header() : tag_timeout(0), ver(0), master_ver(0), syncstopped(false), + reshardlog_entries(0) {} void encode(ceph::buffer::list &bl) const { - ENCODE_START(7, 2, bl); + ENCODE_START(8, 2, bl); encode(stats, bl); encode(tag_timeout, bl); encode(ver, bl); @@ -840,10 +842,11 @@ struct rgw_bucket_dir_header { encode(max_marker, bl); encode(new_instance, bl); encode(syncstopped,bl); + encode(reshardlog_entries, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN(6, 2, 2, bl); + DECODE_START_LEGACY_COMPAT_LEN(8, 2, 2, bl); decode(stats, bl); if (struct_v > 2) { decode(tag_timeout, bl); @@ -867,6 +870,11 @@ struct rgw_bucket_dir_header { if (struct_v >= 7) { decode(syncstopped,bl); } + if (struct_v >= 8) { + decode(reshardlog_entries, bl); + } else { + reshardlog_entries = 0; + } DECODE_FINISH(bl); } void dump(ceph::Formatter *f) const; diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index d23ad6cfe1c..f62b4b45c67 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -2730,6 +2730,17 @@ options: - rgw see_also: - rgw_reshard_progress_judge_interval +- name: rgw_reshardlog_threshold + type: uint + level: dev + desc: threshold for a shard to record log before blocking writes + default: 30000 + with_legacy: true + services: + - rgw + - osd + see_also: + - rgw_reshard_progress_judge_interval - name: rgw_debug_inject_set_olh_err type: uint level: dev diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index aa5e646ed51..829e06a3b2c 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -7801,7 +7801,7 @@ int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, return ret; } - if (!entry.resharding_in_progress()) { + if (!entry.resharding()) { ret = fetch_new_bucket_info("get_bucket_resharding_succeeded"); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index 84c76f2d52d..18aa48ad57f 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -1357,6 +1357,7 @@ int RGWBucketReshard::execute(int num_shards, return ret; } + auto current_num_shards = rgw::num_shards(bucket_info.layout.current_index); ret = commit_reshard(store, bucket_info, bucket_attrs, fault, dpp, y); if (ret < 0) { return ret; @@ -1364,7 +1365,7 @@ int RGWBucketReshard::execute(int num_shards, ldpp_dout(dpp, 1) << __func__ << " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from " << - rgw::num_shards(bucket_info.layout.current_index) << " shards to " << num_shards << + current_num_shards << " shards to " << num_shards << " shards completed successfully" << dendl; return 0; diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 46bfcdef325..7c3e3f1f0a3 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -182,8 +182,8 @@ reshard cancel cancel resharding a bucket reshard stale-instances list list stale-instances from bucket resharding reshard stale-instances delete cleanup stale-instances from bucket resharding - reshardlog list list bucket reshard newest generation log - reshardlog purge trim all bucket resharding log + reshardlog list list bucket resharding log + reshardlog purge trim bucket resharding log sync error list list sync error sync error trim trim sync error mfa create create a new MFA TOTP token diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc index 7963231f99c..e0c8eeb13f6 100644 --- a/src/test/cls_rgw/test_cls_rgw.cc +++ b/src/test/cls_rgw/test_cls_rgw.cc @@ -1418,3 +1418,55 @@ TEST_F(cls_rgw, reshardlog_list) ASSERT_FALSE(is_truncated); ASSERT_EQ(2u, entries.size()); } + +void reshardlog_entries(librados::IoCtx& ioctx, const std::string& oid, uint32_t num_entries) +{ + map results; + map oids; + oids[0] = oid; + ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)()); + + uint32_t entries = 0; + map::iterator iter = results.begin(); + for (; iter != results.end(); ++iter) { + entries += (iter->second).dir.header.reshardlog_entries; + } + ASSERT_EQ(entries, num_entries); +} + +TEST_F(cls_rgw, reshardlog_num) +{ + string bucket_oid = str_int("reshard2", 0); + + ObjectWriteOperation op; + cls_rgw_bucket_init_index(op); + ASSERT_EQ(0, ioctx.operate(bucket_oid, &op)); + + cls_rgw_obj_key obj1 = str_int("obj1", 0); + string tag = str_int("tag-prepare", 0); + string loc = str_int("loc", 0); + index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj1, loc); + rgw_bucket_dir_entry_meta meta; + index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj1, meta); + + // do not record logs + reshardlog_entries(ioctx, bucket_oid, 0u); + + // set reshard status to IN_LOGRECORD + cls_rgw_bucket_instance_entry entry; + entry.reshard_status = cls_rgw_reshard_status::IN_LOGRECORD; + set_reshard_status(ioctx, bucket_oid, entry); + + // record a log in prepare not add reshardlog_entry + cls_rgw_obj_key obj2 = str_int("obj2", 0); + index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj2, loc); + reshardlog_entries(ioctx, bucket_oid, 0u); + // record a log in complete add reshardlog_entry + index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj2, meta); + reshardlog_entries(ioctx, bucket_oid, 1u); + + // record a log in deleting obj not add reshardlog_entry + index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, obj1, loc); + index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 1, obj1, meta); + reshardlog_entries(ioctx, bucket_oid, 1u); +}