From 196a73cbd4cff2fcb9133803fbaf1c892ac7106b Mon Sep 17 00:00:00 2001 From: liangmingyuan Date: Sat, 24 Aug 2024 17:03:13 +0800 Subject: [PATCH] cls/rgw: add a helper function for calls to cls_cxx_map_remove_key() Add some testing cases and do cleanup too. Signed-off-by: Mingyuan Liang --- doc/radosgw/dynamicresharding.rst | 6 +- qa/workunits/rgw/test_rgw_reshard.py | 16 +- src/cls/rgw/cls_rgw.cc | 286 +++++++++------------------ src/cls/rgw/cls_rgw_client.cc | 18 -- src/cls/rgw/cls_rgw_client.h | 11 -- src/doc/rgw/noblock-reshard.md | 2 +- src/rgw/driver/rados/rgw_rados.cc | 19 +- src/rgw/driver/rados/rgw_rados.h | 7 +- src/rgw/driver/rados/rgw_reshard.cc | 87 ++++---- src/test/cls_rgw/test_cls_rgw.cc | 6 +- 10 files changed, 160 insertions(+), 298 deletions(-) diff --git a/doc/radosgw/dynamicresharding.rst b/doc/radosgw/dynamicresharding.rst index 822251bbb485d..68c4162738f70 100644 --- a/doc/radosgw/dynamicresharding.rst +++ b/doc/radosgw/dynamicresharding.rst @@ -51,10 +51,8 @@ Configuration .. confval:: rgw_reshard_bucket_lock_duration .. confval:: rgw_reshard_thread_interval .. confval:: rgw_reshard_num_logs - -- ``rgw_reshard_progress_judge_interval``: interval of judging if bucket reshard failed in reshard log process state, default: 120 seconds - -- ``rgw_reshard_progress_judge_ratio``: ratio of reshard progress judge interval to randomly vary, default: 0.5 +.. confval:: rgw_reshard_progress_judge_interval +.. confval:: rgw_reshard_progress_judge_ratio Admin commands ============== diff --git a/qa/workunits/rgw/test_rgw_reshard.py b/qa/workunits/rgw/test_rgw_reshard.py index 468e3502bcb99..ba0aef5572c35 100755 --- a/qa/workunits/rgw/test_rgw_reshard.py +++ b/qa/workunits/rgw/test_rgw_reshard.py @@ -114,7 +114,7 @@ def test_bucket_reshard(conn, name, **fault): # try reshard with fault injection _, ret = run_bucket_reshard_cmd(name, num_shards_expected, check_retcode=False, **fault) - if fault.get('error_code') == errno.ECANCELED: + if fault.get('error_code') == errno.ECANCELED or fault.get('error_code') == errno.EOPNOTSUPP: assert(ret == 0) # expect ECANCELED to retry and succeed else: assert(ret != 0 and ret != errno.EBUSY) @@ -214,6 +214,13 @@ def main(): log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME)) # TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with EIO injected at init_index\n') + test_bucket_reshard(connection, 'error-at-init-index', error_at='init_index') + log.debug('TEST: reshard bucket with EOPNOTSUPP injected at init_index\n') + test_bucket_reshard(connection, 'error-at-init-index', error_at='init_index', error_code=errno.EOPNOTSUPP) + log.debug('TEST: reshard bucket with abort at init_index\n') + test_bucket_reshard(connection, 'abort-at-init-indext', abort_at='init_index') + log.debug('TEST: reshard bucket with EIO injected at set_target_layout\n') test_bucket_reshard(connection, 'error-at-set-target-layout', error_at='set_target_layout') log.debug('TEST: reshard bucket with ECANCELED injected at set_target_layout\n') @@ -221,6 +228,13 @@ def main(): log.debug('TEST: reshard bucket with abort at set_target_layout\n') test_bucket_reshard(connection, 'abort-at-set-target-layout', abort_at='set_target_layout') + log.debug('TEST: reshard bucket with EIO injected at trim_reshard_log_entries\n') + test_bucket_reshard(connection, 'error-at-trim-reshard-log-entries', error_at='trim_reshard_log_entries') + log.debug('TEST: reshard bucket with EOPNOTSUPP injected at trim_reshard_log_entries\n') + test_bucket_reshard(connection, 'error-at-trim-reshard-log-entries', error_at='trim_reshard_log_entries', error_code=errno.EOPNOTSUPP) + log.debug('TEST: reshard bucket with abort at trim_reshard_log_entries\n') + test_bucket_reshard(connection, 'abort-at-trim-reshard-log-entries', abort_at='trim_reshard_log_entries') + log.debug('TEST: reshard bucket with EIO injected at block_writes\n') test_bucket_reshard(connection, 'error-at-block-writes', error_at='block_writes') log.debug('TEST: reshard bucket with abort at block_writes\n') diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 8fc928e07d912..5534951e76521 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -735,6 +735,39 @@ static int write_bucket_header(cls_method_context_t hctx, rgw_bucket_dir_header return cls_cxx_map_write_header(hctx, &header_bl); } +template +static int write_entry(cls_method_context_t hctx, T& entry, const string& key, + rgw_bucket_dir_header& header, bool count_entry = true) +{ + bufferlist bl; + encode(entry, bl); + int ret = cls_cxx_map_set_val(hctx, key, &bl); + if (ret < 0) { + return ret; + } + if (header.resharding_in_logrecord()) { + ret = reshard_log_index_operation(hctx, key, entry.key, &bl); + header.reshardlog_entries++; + } + return ret; +} + +static int remove_entry(cls_method_context_t hctx, const string& idx, + const cls_rgw_obj_key& key, + rgw_bucket_dir_header& header) +{ + int ret = cls_cxx_map_remove_key(hctx, idx); + if (ret < 0) { + CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() idx=%s ret=%d", idx.c_str(), ret); + return ret; + } + if (header.resharding_in_logrecord()) { + header.reshardlog_entries++; + bufferlist empty; + return reshard_log_index_operation(hctx, idx, key, &empty); + } + return 0; +} int rgw_bucket_update_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { @@ -846,23 +879,6 @@ static std::string modify_op_str(uint8_t op) { return modify_op_str((RGWModifyOp) op); } -static int record_duplicate_entry(cls_method_context_t hctx, string& idx, - const cls_rgw_obj_key& key, bufferlist* log_bl, - bool resharding, uint32_t* reshardlog_entries = NULL) { - if (resharding) { - int rc = reshard_log_index_operation(hctx, idx, key, log_bl); - if (rc < 0) { - CLS_LOG(0, "record_duplicate_entry(): failed to update entry, name=%s, rc=%d", - escape_str(idx).c_str(), rc); - return rc; - } - if (reshardlog_entries) { - *reshardlog_entries += 1; - } - } - return 0; -} - static int write_header_while_logrecord(cls_method_context_t hctx, rgw_bucket_dir_header& header) { if (header.resharding_in_logrecord()) @@ -944,22 +960,16 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist entry.pending_map.insert(pair(op.tag, info)); // write out new key to disk - bufferlist info_bl; - encode(entry, info_bl); CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: setting map entry at key=%s", __func__, escape_str(idx).c_str()); - rc = cls_cxx_map_set_val(hctx, idx, &info_bl); + rc = write_entry(hctx, entry, idx, header, false); if (rc < 0) { CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s could not set value for key, key=%s, rc=%d", __func__, escape_str(idx).c_str(), rc); return rc; } - if (header.resharding_in_logrecord()) { - // write the duplicated index entry copy - return reshard_log_index_operation(hctx, idx, entry.key, &info_bl); - } CLS_LOG_BITX(bitx_inst, 10, "EXITING %s, returning 0", __func__); return 0; @@ -1076,13 +1086,7 @@ static int complete_remove_obj(cls_method_context_t hctx, int(entry.meta.category)); unaccount_entry(header, entry); - bufferlist empty; - ret = record_duplicate_entry(hctx, idx, key, &empty, - header.resharding_in_logrecord()); - if (ret < 0) - return ret; - - ret = cls_cxx_map_remove_key(hctx, idx); + ret = remove_entry(hctx, idx, key, header); if (ret < 0) { CLS_LOG(1, "%s: cls_cxx_map_remove_key failed with %d", __func__, ret); return ret; @@ -1187,7 +1191,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: removing map entry with key=%s", __func__, escape_str(idx).c_str()); - rc = cls_cxx_map_remove_key(hctx, idx); + rc = remove_entry(hctx, idx, entry.key, header); if (rc < 0) { CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: unable to remove map key, key=%s, rc=%d", @@ -1195,29 +1199,18 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist return rc; } - bufferlist empty; - rc = record_duplicate_entry(hctx, idx, entry.key, &empty, header.resharding_in_logrecord()); - if (rc < 0) - return rc; } else { // we removed this tag from pending_map so need to write the changes CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: setting map entry at key=%s", __func__, escape_str(idx).c_str()); - bufferlist new_key_bl; - encode(entry, new_key_bl); - rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl); + rc = write_entry(hctx, entry, idx, header); if (rc < 0) { CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: unable to set map val, key=%s, rc=%d", __func__, escape_str(idx).c_str(), rc); return rc; } - rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, - header.resharding_in_logrecord(), - &header.reshardlog_entries); - if (rc < 0) - return rc; } } } // CLS_RGW_OP_CANCEL @@ -1239,36 +1232,26 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: removing map entry with key=%s", __func__, escape_str(idx).c_str()); - rc = cls_cxx_map_remove_key(hctx, idx); + rc = remove_entry(hctx, idx, entry.key, header); if (rc < 0) { CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: unable to remove map key, key=%s, rc=%d", __func__, escape_str(idx).c_str(), rc); return rc; } - bufferlist empty; - rc = record_duplicate_entry(hctx, idx, entry.key, &empty, header.resharding_in_logrecord()); - if (rc < 0) - return rc; } else { entry.exists = false; - bufferlist new_key_bl; - encode(entry, new_key_bl); CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: setting map entry at key=%s", __func__, escape_str(idx).c_str()); - rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl); + + rc = write_entry(hctx, entry, idx, header); if (rc < 0) { CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: unable to set map val, key=%s, rc=%d", __func__, escape_str(idx).c_str(), rc); return rc; } - rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, - header.resharding_in_logrecord(), - &header.reshardlog_entries); - if (rc < 0) - return rc; } } // CLS_RGW_OP_DEL else if (op.op == CLS_RGW_OP_ADD) { @@ -1289,23 +1272,16 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist stats.total_size += meta.accounted_size; stats.total_size_rounded += cls_rgw_get_rounded_size(meta.accounted_size); stats.actual_size += meta.size; - bufferlist new_key_bl; - encode(entry, new_key_bl); CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: setting map entry at key=%s", __func__, escape_str(idx).c_str()); - rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl); + rc = write_entry(hctx, entry, idx, header); if (rc < 0) { CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: unable to set map value at key=%s, rc=%d", __func__, escape_str(idx).c_str(), rc); return rc; } - rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, - header.resharding_in_logrecord(), - &header.reshardlog_entries); - if (rc < 0) - return rc; } // CLS_RGW_OP_ADD if (log_op) { @@ -1351,23 +1327,6 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist return rc; } // rgw_bucket_complete_op -template -static int write_entry(cls_method_context_t hctx, T& entry, const string& key, - uint32_t& reshardlog_entries, const bool is_resharding = false) -{ - bufferlist bl; - encode(entry, bl); - int ret = cls_cxx_map_set_val(hctx, key, &bl); - if (ret < 0) { - return ret; - } - if (is_resharding) { - ret = reshard_log_index_operation(hctx, key, entry.key, &bl); - reshardlog_entries++; - } - return ret; -} - static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, rgw_bucket_olh_entry *olh_data_entry, string *index_key, bool *found) { cls_rgw_obj_key olh_key; @@ -1399,13 +1358,12 @@ static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, co } static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, - const string& instance_idx, uint32_t& reshardlog_entries, - bool is_resharding) + const string& instance_idx, rgw_bucket_dir_header& header) { CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(), instance_idx.c_str(), instance_entry.flags); /* write the instance entry */ - int ret = write_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding); + int ret = write_entry(hctx, instance_entry, instance_idx, header); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret); return ret; @@ -1417,10 +1375,9 @@ static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_en * write object instance entry, and if needed also the list entry */ static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, - const string& instance_idx, uint32_t& reshardlog_entries, - bool is_resharding) + const string& instance_idx, rgw_bucket_dir_header& header) { - int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding); + int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, header); if (ret < 0) { return ret; } @@ -1430,7 +1387,7 @@ static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& in if (instance_idx != instance_list_idx) { CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags); /* write a new list entry for the object instance */ - ret = write_entry(hctx, instance_entry, instance_list_idx, reshardlog_entries, is_resharding); + ret = write_entry(hctx, instance_entry, instance_list_idx, header); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret); return ret; @@ -1486,40 +1443,32 @@ public: instance_entry.versioned_epoch = epoch; } - int unlink_list_entry(bool is_resharding) { + int unlink_list_entry(rgw_bucket_dir_header& header) { string list_idx, list_sub_ver; /* this instance has a previous list entry, remove that entry */ get_list_index_key(instance_entry, &list_idx); CLS_LOG(20, "unlink_list_entry() list_idx=%s", escape_str(list_idx).c_str()); - int ret = cls_cxx_map_remove_key(hctx, list_idx); + int ret = remove_entry(hctx, list_idx, instance_entry.key, header); if (ret < 0) { - CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() list_idx=%s ret=%d", list_idx.c_str(), ret); + CLS_LOG(0, "ERROR: remove_entry() list_idx=%s ret=%d", list_idx.c_str(), ret); return ret; } - if (is_resharding) { - bufferlist empty; - return reshard_log_index_operation(hctx, list_idx, instance_entry.key, &empty); - } return 0; } - int unlink(bool is_resharding, const cls_rgw_obj_key& key) { + int unlink(rgw_bucket_dir_header& header, const cls_rgw_obj_key& key) { /* remove the instance entry */ CLS_LOG(20, "unlink() idx=%s", escape_str(instance_idx).c_str()); - int ret = cls_cxx_map_remove_key(hctx, instance_idx); + int ret = remove_entry(hctx, instance_idx, key, header); if (ret < 0) { - CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() instance_idx=%s ret=%d", instance_idx.c_str(), ret); + CLS_LOG(0, "ERROR: remove_entry() instance_idx=%s ret=%d", instance_idx.c_str(), ret); return ret; } - if (is_resharding) { - bufferlist empty; - return reshard_log_index_operation(hctx, instance_idx, key, &empty); - } return 0; } int write_entries(uint64_t flags_set, uint64_t flags_reset, - uint32_t& reshardlog_entries, bool is_resharding) { + rgw_bucket_dir_header& header) { if (!initialized) { int ret = init(); if (ret < 0) { @@ -1532,7 +1481,7 @@ public: /* write the instance and list entries */ bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty()); encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key); - int ret = write_obj_entries(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding); + int ret = write_obj_entries(hctx, instance_entry, instance_idx, header); if (ret < 0) { CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret); return ret; @@ -1541,11 +1490,11 @@ public: return 0; } - int write(uint64_t epoch, bool current, uint32_t& reshardlog_entries, bool is_resharding) { + int write(uint64_t epoch, bool current, rgw_bucket_dir_header& header) { if (instance_entry.versioned_epoch > 0) { CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch); /* this instance has a previous list entry, remove that entry */ - int ret = unlink_list_entry(is_resharding); + int ret = unlink_list_entry(header); if (ret < 0) { return ret; } @@ -1557,11 +1506,11 @@ public: } instance_entry.versioned_epoch = epoch; - return write_entries(flags, 0, reshardlog_entries, is_resharding); + return write_entries(flags, 0, header); } - int demote_current(uint32_t& reshardlog_entries, bool is_resharding) { - return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, reshardlog_entries, is_resharding); + int demote_current(rgw_bucket_dir_header& header) { + return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, header); } bool is_delete_marker() { @@ -1663,9 +1612,9 @@ public: olh_data_entry.key = key; } - int write(uint32_t& reshardlog_entries, bool is_resharding) { + int write(rgw_bucket_dir_header& header) { /* write the olh data entry */ - int ret = write_entry(hctx, olh_data_entry, olh_data_idx, reshardlog_entries, is_resharding); + int ret = write_entry(hctx, olh_data_entry, olh_data_idx, header); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret); return ret; @@ -1700,12 +1649,12 @@ public: }; static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key, - uint32_t& reshardlog_entries, bool is_resharding) + rgw_bucket_dir_header& header) { rgw_bucket_dir_entry entry; entry.key = key; entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER; - int ret = write_entry(hctx, entry, key.name, reshardlog_entries, is_resharding); + int ret = write_entry(hctx, entry, key.name, header); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret); return ret; @@ -1723,8 +1672,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, cls_rgw_obj_key& key, bool demote_current, bool instance_only, - uint32_t& reshardlog_entries, - bool is_resharding) + rgw_bucket_dir_header& header) { if (!key.instance.empty()) { return -EINVAL; @@ -1751,9 +1699,9 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, encode_obj_versioned_data_key(key, &new_idx); if (instance_only) { - ret = write_obj_instance_entry(hctx, entry, new_idx, reshardlog_entries, is_resharding); + ret = write_obj_instance_entry(hctx, entry, new_idx, header); } else { - ret = write_obj_entries(hctx, entry, new_idx, reshardlog_entries, is_resharding); + ret = write_obj_entries(hctx, entry, new_idx, header); } if (ret < 0) { CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d", @@ -1762,7 +1710,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, } } - ret = write_version_marker(hctx, key, reshardlog_entries, is_resharding); + ret = write_version_marker(hctx, key, header); if (ret < 0) { return ret; } @@ -1882,7 +1830,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer * entry */ existed = (ret >= 0 && !other_obj.is_delete_marker()); if (ret >= 0 && other_obj.is_delete_marker() != op.delete_marker) { - ret = other_obj.unlink_list_entry(header.resharding_in_logrecord()); + ret = other_obj.unlink_list_entry(header); if (ret < 0) { return ret; } @@ -1890,7 +1838,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer removing = existed && op.delete_marker; if (!removing) { - ret = other_obj.unlink(header.resharding_in_logrecord(), op.key); + ret = other_obj.unlink(header, op.key); if (ret < 0) { return ret; } @@ -1916,7 +1864,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer const uint64_t prev_epoch = olh.get_epoch(); if (!olh.start_modify(op.olh_epoch)) { - ret = obj.write(op.olh_epoch, false, header.reshardlog_entries, header.resharding_in_logrecord()); + ret = obj.write(op.olh_epoch, false, header); if (ret < 0) { return ret; } @@ -1948,7 +1896,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer if (!(olh_entry.key == op.key)) { BIVerObjEntry old_obj(hctx, olh_entry.key); - ret = old_obj.demote_current(header.reshardlog_entries, header.resharding_in_logrecord()); + ret = old_obj.demote_current(header); if (ret < 0) { CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret); return ret; @@ -1959,9 +1907,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer } else { bool instance_only = (op.key.instance.empty() && op.delete_marker); cls_rgw_obj_key key(op.key.name); - ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, - header.reshardlog_entries, - header.resharding_in_logrecord()); + ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); return ret; @@ -1983,15 +1929,14 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer } olh.set_exists(true); - ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord()); + ret = olh.write(header); if (ret < 0) { CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret); return ret; } /* write the instance and list entries */ - ret = obj.write(olh.get_epoch(), promote, header.reshardlog_entries, - header.resharding_in_logrecord()); + ret = obj.write(olh.get_epoch(), promote, header); if (ret < 0) { return ret; } @@ -2074,9 +2019,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, if (!olh_found) { bool instance_only = false; cls_rgw_obj_key key(dest_key.name); - ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, - header.reshardlog_entries, - header.resharding_in_logrecord()); + ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); return ret; @@ -2088,7 +2031,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } if (!olh.start_modify(op.olh_epoch)) { - ret = obj.unlink_list_entry(header.resharding_in_logrecord()); + ret = obj.unlink_list_entry(header); if (ret < 0) { return ret; } @@ -2098,7 +2041,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch); - return olh.write(header.reshardlog_entries, header.resharding_in_logrecord()); + return olh.write(header); } rgw_bucket_olh_entry& olh_entry = olh.get_entry(); @@ -2118,8 +2061,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, if (found) { BIVerObjEntry next(hctx, next_key); - ret = next.write(olh.get_epoch(), true, header.reshardlog_entries, - header.resharding_in_logrecord()); + ret = next.write(olh.get_epoch(), true, header); if (ret < 0) { CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret); return ret; @@ -2146,18 +2088,18 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } else { /* this is a delete marker, it's our responsibility to remove its * instance entry */ - ret = obj.unlink(header.resharding_in_logrecord(), op.key); + ret = obj.unlink(header, op.key); if (ret < 0) { return ret; } } - ret = obj.unlink_list_entry(header.resharding_in_logrecord()); + ret = obj.unlink_list_entry(header); if (ret < 0) { return ret; } - ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord()); + ret = olh.write(header); if (ret < 0) { return ret; } @@ -2289,9 +2231,7 @@ static int rgw_bucket_trim_olh_log(cls_method_context_t hctx, bufferlist *in, bu } /* write the olh data entry */ - ret = write_entry(hctx, olh_data_entry, olh_data_key, - header.reshardlog_entries, - header.resharding_in_logrecord()); + ret = write_entry(hctx, olh_data_entry, olh_data_key, header); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret); return ret; @@ -2340,15 +2280,11 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe return -ECANCELED; } - ret = cls_cxx_map_remove_key(hctx, olh_data_key); + ret = remove_entry(hctx, olh_data_key, olh_data_entry.key, header); if (ret < 0) { CLS_LOG(1, "NOTICE: %s: can't remove key %s ret=%d", __func__, olh_data_key.c_str(), ret); return ret; } - bufferlist empty; - ret = record_duplicate_entry(hctx, olh_data_key, olh_data_entry.key, &empty, header.resharding_in_logrecord()); - if (ret < 0) - return ret; rgw_bucket_dir_entry plain_entry; @@ -2368,16 +2304,12 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe return 0; } - ret = cls_cxx_map_remove_key(hctx, op.key.name); + ret = remove_entry(hctx, op.key.name, plain_entry.key, header); if (ret < 0) { CLS_LOG(1, "NOTICE: %s: can't remove key %s ret=%d", __func__, op.key.name.c_str(), ret); return ret; } - ret = record_duplicate_entry(hctx, op.key.name, plain_entry.key, &empty, header.resharding_in_logrecord()); - if (ret < 0) - return ret; - return 0; } @@ -2529,7 +2461,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: removing map entry with key=%s", __func__, escape_str(cur_change_key).c_str()); - ret = cls_cxx_map_remove_key(hctx, cur_change_key); + ret = remove_entry(hctx, cur_change_key, cur_change.key, header); if (ret < 0) { CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to remove key, key=%s, error=%d", __func__, escape_str(cur_change_key).c_str(), ret); @@ -2544,10 +2476,6 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, return ret; } } - if (header.resharding_in_logrecord()) { - bufferlist empty; - return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &empty); - } break; case CEPH_RGW_UPDATE: CLS_LOG_BITX(bitx_inst, 10, @@ -2561,13 +2489,11 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, stats.actual_size += cur_change.meta.size; header_changed = true; cur_change.index_ver = header.ver; - bufferlist cur_state_bl; - encode(cur_change, cur_state_bl); CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: setting map entry at key=%s", __func__, escape_str(cur_change.key.to_string()).c_str()); - ret = cls_cxx_map_set_val(hctx, cur_change_key, &cur_state_bl); + ret = write_entry(hctx, cur_change, cur_change_key, header); if (ret < 0) { CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to set value for key, key=%s, error=%d", __func__, escape_str(cur_change_key).c_str(), ret); @@ -2581,9 +2507,6 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, return ret; } } - if (header.resharding_in_logrecord()) { - return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &cur_state_bl); - } break; } // switch(op) } // if (cur_disk.pending_map.empty()) @@ -2946,7 +2869,6 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist * int r = cls_cxx_map_remove_key(hctx, entry.idx); if (r < 0) { CLS_LOG(0, "ERROR: %s: cls_cxx_map_remove_key() returned r=%d", __func__, r); - return r; } } else { int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data); @@ -3898,7 +3820,8 @@ static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bu } header.reshardlog_entries = 0; - rc = write_bucket_header(hctx, &header); if (rc < 0) { + rc = write_bucket_header(hctx, &header); + if (rc < 0) { CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to write header\n"); return rc; } @@ -4952,37 +4875,6 @@ static int rgw_set_bucket_resharding(cls_method_context_t hctx, bufferlist *in, return write_bucket_header(hctx, &header); } -static int rgw_set_bucket_resharding2(cls_method_context_t hctx, bufferlist *in, bufferlist *out) -{ - CLS_LOG(10, "entered %s", __func__); - cls_rgw_set_bucket_resharding_op op; - - auto in_iter = in->cbegin(); - try { - decode(op, in_iter); - } catch (ceph::buffer::error& err) { - CLS_LOG(1, "ERROR: cls_rgw_set_bucket_resharding: failed to decode entry\n"); - return -EINVAL; - } - - rgw_bucket_dir_header header; - int rc = read_bucket_header(hctx, &header); - if (rc < 0) { - CLS_LOG(1, "ERROR: %s: failed to read header", __func__); - return rc; - } - - if (op.entry.reshard_status == cls_rgw_reshard_status::IN_LOGRECORD) { - if (header.reshardlog_entries != 0) { - CLS_LOG(1, "ERROR: %s: cannot set logrecord status on non-zero log record count", __func__); - return -EOPNOTSUPP; - } - } - header.new_instance.set_status(op.entry.reshard_status); - - return write_bucket_header(hctx, &header); -} - static int rgw_clear_bucket_resharding(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { CLS_LOG(10, "entered %s", __func__); @@ -5194,8 +5086,6 @@ CLS_INIT(rgw) /* resharding attribute */ cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR, rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding); - cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING2, CLS_METHOD_RD | CLS_METHOD_WR, - rgw_set_bucket_resharding2, &h_rgw_set_bucket_resharding); cls_register_cxx_method(h_class, RGW_CLEAR_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR, rgw_clear_bucket_resharding, &h_rgw_clear_bucket_resharding); cls_register_cxx_method(h_class, RGW_GUARD_BUCKET_RESHARDING, CLS_METHOD_RD , diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 400b7f768a9f0..25079a5fc1464 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -1289,26 +1289,8 @@ static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, return manager->aio_operate(io_ctx, shard_id, oid, &op); } -static bool issue_set_bucket_resharding2(librados::IoCtx& io_ctx, - const int shard_id, const string& oid, - const cls_rgw_bucket_instance_entry& entry, - BucketIndexAioManager *manager) { - bufferlist in; - cls_rgw_set_bucket_resharding_op call; - call.entry = entry; - encode(call, in); - librados::ObjectWriteOperation op; - op.assert_exists(); // the shard must exist; if not fail rather than recreate - op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING2, in); - return manager->aio_operate(io_ctx, shard_id, oid, &op); -} - int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid) { return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager); } -int CLSRGWIssueSetBucketResharding2::issue_op(const int shard_id, const string& oid) -{ - return issue_set_bucket_resharding2(io_ctx, shard_id, oid, entry, &manager); -} diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index e43aa981a37cb..39b07ab50171c 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -591,17 +591,6 @@ public: virtual ~CLSRGWIssueSetBucketResharding() override {} }; -class CLSRGWIssueSetBucketResharding2 : public CLSRGWConcurrentIO { - cls_rgw_bucket_instance_entry entry; -protected: - int issue_op(int shard_id, const std::string& oid) override; -public: - CLSRGWIssueSetBucketResharding2(librados::IoCtx& ioc, std::map& _bucket_objs, - const cls_rgw_bucket_instance_entry& _entry, - uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), entry(_entry) {} - virtual ~CLSRGWIssueSetBucketResharding2() override {} -}; - class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO { protected: int issue_op(int shard_id, const std::string& oid); diff --git a/src/doc/rgw/noblock-reshard.md b/src/doc/rgw/noblock-reshard.md index 0167952460273..806a27aa4a0d9 100644 --- a/src/doc/rgw/noblock-reshard.md +++ b/src/doc/rgw/noblock-reshard.md @@ -42,7 +42,7 @@ When a bucket reshard faild in the logrecord phase, the duplicated copys should * The privious release only has one reshard phase: the progress phase which will block client writes. Because our release contains this phase and the process is same too, that means it is superset of privious release. So when privious rgw initiates a reshard, it will execute as before. -* When a updated rgw initiates a reshard, it firstly enter the logrecord phase which privious releases do not realized. That means the nodes which do not upgraded will deal with client write operations without recording copys. It may leads to part of these index entries missed. So we forbit this scene by adding `cls_rgw_set_bucket_resharding2()` and `cls_rgw_bucket_init_index2()` control source and target versions, old osds would fail the request with -EOPNOTSUPP. so radosgw could start by trying that on all shards. if there are no errors, it can safely proceed with the new scheme. If any of the osds do return -EOPNOTSUPP there, then rgw fall back to the current resharding scheme where writes are blocked the whole time. +* When a updated rgw initiates a reshard, it firstly enter the logrecord phase which privious releases do not realized. That means the nodes which do not upgraded will deal with client write operations without recording copys. It may leads to part of these index entries missed. So we forbit this scene by adding `trim_reshard_log_entries()` and `cls_rgw_bucket_init_index2()` control source and target versions, old osds would fail the request with -EOPNOTSUPP. so radosgw could start by trying that on all shards. if there are no errors, it can safely proceed with the new scheme. If any of the osds do return -EOPNOTSUPP there, then rgw fall back to the current resharding scheme where writes are blocked the whole time. ## Future Prospects diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 829e06a3b2c0b..b1e74d3a634ad 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -5595,8 +5595,7 @@ int RGWRados::bucket_resync_encrypted_multipart(const DoutPrefixProvider* dpp, int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, - const cls_rgw_bucket_instance_entry& entry, - bool judge_support_logrecord) + const cls_rgw_bucket_instance_entry& entry) { librados::IoCtx index_pool; map bucket_objs; @@ -5608,11 +5607,7 @@ int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, cpp_strerror(-r) << ")" << dendl; return r; } - - if (judge_support_logrecord) - r = CLSRGWIssueSetBucketResharding2(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)(); - else - r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)(); + r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << ": unable to issue set bucket resharding, r=" << r << " (" << @@ -6970,7 +6965,7 @@ int RGWRados::Bucket::UpdateIndex::guard_reshard(const DoutPrefixProvider *dpp, } if (target->bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) { - store->reshard_failed_while_logrecord(target->bucket_info, y, dpp); + store->check_reshard_logrecord_status(target->bucket_info, y, dpp); } return 0; @@ -7667,13 +7662,13 @@ int RGWRados::guard_reshard(const DoutPrefixProvider *dpp, } if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) { - reshard_failed_while_logrecord(bucket_info, y, dpp); + check_reshard_logrecord_status(bucket_info, y, dpp); } return 0; } -int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, optional_yield y, +int RGWRados::check_reshard_logrecord_status(RGWBucketInfo& bucket_info, optional_yield y, const DoutPrefixProvider *dpp) { real_time now = real_clock::now(); @@ -7694,12 +7689,12 @@ int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, optiona } if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord && now - bucket_info.layout.judge_reshard_lock_time >= make_timespan(reshard_progress_judge_interval)) - return reshard_failed_while_logrecord(bucket_info, bucket_attrs, y, dpp); + return recover_reshard_logrecord(bucket_info, bucket_attrs, y, dpp); } return 0; } -int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, +int RGWRados::recover_reshard_logrecord(RGWBucketInfo& bucket_info, map& bucket_attrs, optional_yield y, const DoutPrefixProvider *dpp) diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 713e8f351a489..6b833eabc35ee 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1337,9 +1337,9 @@ public: RGWBucketInfo& bucket_info, std::function call, optional_yield y); /* clear the progress flag when reshard failed */ - int reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, optional_yield y, + int check_reshard_logrecord_status(RGWBucketInfo& bucket_info, optional_yield y, const DoutPrefixProvider *dpp); - int reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, + int recover_reshard_logrecord(RGWBucketInfo& bucket_info, std::map& bucket_attrs, optional_yield y, const DoutPrefixProvider *dpp); @@ -1589,8 +1589,7 @@ public: RGWFormatterFlusher& flusher); int bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, - const cls_rgw_bucket_instance_entry& entry, - bool judge_support_logrecord = false); + const cls_rgw_bucket_instance_entry& entry); int remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const std::list& oid_list); diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index 18aa48ad57fdb..d30a402f7061e 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -237,8 +237,7 @@ public: return 0; } - int flush(bool process_log = false, RGWBucketReshard *br = nullptr, - const DoutPrefixProvider *dpp = nullptr) { + int flush(bool process_log = false) { if (entries.size() == 0) { return 0; } @@ -294,12 +293,6 @@ public: entries.clear(); stats.clear(); - if (br != nullptr) { - ret = br->renew_lock_if_needed(dpp); - if (ret < 0) { - return ret; - } - } return 0; } @@ -365,7 +358,7 @@ public: const DoutPrefixProvider *dpp = nullptr) { int ret = 0; for (auto& shard : target_shards) { - int r = shard.flush(process_log, br, dpp); + int r = shard.flush(process_log); if (r < 0) { derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl; ret = r; @@ -377,6 +370,13 @@ public: derr << "ERROR: target_shards[" << shard.get_shard_id() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl; ret = r; } + if (br != nullptr) { + r = br->renew_lock_if_needed(dpp); + } + if (r < 0) { + derr << "ERROR: br->renew_lock_if_needed() returned error: " << cpp_strerror(-r) << dendl; + ret = r; + } } return ret; } @@ -395,13 +395,12 @@ RGWBucketReshard::RGWBucketReshard(rgw::sal::RadosStore* _store, static int set_resharding_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info, - cls_rgw_reshard_status status, - bool judge_support_logrecord = false) + cls_rgw_reshard_status status) { cls_rgw_bucket_instance_entry instance_entry; instance_entry.set_status(status); - int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry, judge_support_logrecord); + int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry); if (ret < 0) { ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " << cpp_strerror(-ret) << dendl; @@ -431,17 +430,22 @@ static int remove_old_reshard_instance(rgw::sal::RadosStore* store, static int init_target_index(rgw::sal::RadosStore* store, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& index, + ReshardFaultInjector& fault, bool& support_logrecord, const DoutPrefixProvider* dpp) { - int ret = store->svc()->bi->init_index(dpp, bucket_info, index, true); + + int ret = 0; + if (ret = fault.check("init_index"); + ret == 0) { // no fault injected, initialize index + ret = store->svc()->bi->init_index(dpp, bucket_info, index, true); + } if (ret == -EOPNOTSUPP) { ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord, " << "falling back to block reshard mode." << dendl; support_logrecord = false; ret = store->svc()->bi->init_index(dpp, bucket_info, index, false); - } - if (ret < 0) { + } else if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to initialize " "target index shard objects: " << cpp_strerror(ret) << dendl; return ret; @@ -508,7 +512,7 @@ static int init_target_layout(rgw::sal::RadosStore* store, } // create the index shard objects - int ret = init_target_index(store, bucket_info, target, support_logrecord, dpp); + int ret = init_target_index(store, bucket_info, target, fault, support_logrecord, dpp); if (ret < 0) { return ret; } @@ -573,24 +577,6 @@ static int init_target_layout(rgw::sal::RadosStore* store, return ret; } - // trim the reshard log entries to guarantee that any existing log entries are cleared, - // if there are no reshard log entries, this is a no-op that costs little time - if (support_logrecord) { - ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield); - if (ret == -EOPNOTSUPP) { - // not an error, logrecord is not supported, change to block reshard - ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported" - << " logrecord, falling back to block reshard mode." << dendl; - bucket_info.layout.resharding = rgw::BucketReshardState::InProgress; - support_logrecord = false; - return 0; - } - if (ret < 0) { - ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: " - << cpp_strerror(ret) << dendl; - return ret; - } - } return 0; } // init_target_layout @@ -612,7 +598,7 @@ static int revert_target_layout(rgw::sal::RadosStore* store, ret = 0; // non-fatal error } // trim the reshard log entries written in logrecord state - ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield); + ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, y); if (ret < 0) { ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to trim " "reshard log entries: " << cpp_strerror(ret) << dendl; @@ -695,26 +681,35 @@ static int init_reshard(rgw::sal::RadosStore* store, return ret; } + // trim the reshard log entries to guarantee that any existing log entries are cleared, + // if there are no reshard log entries, this is a no-op that costs little time if (support_logrecord) { - if (ret = fault.check("logrecord_writes"); - ret == 0) { // no fault injected, record log with writing to the current index shards - ret = set_resharding_status(dpp, store, bucket_info, - cls_rgw_reshard_status::IN_LOGRECORD, - true); + if (ret = fault.check("trim_reshard_log_entries"); + ret == 0) { // no fault injected, trim reshard log entries + ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, y); } if (ret == -EOPNOTSUPP) { - ldpp_dout(dpp, 0) << "WARNING: " << "set_resharding_status()" - << " doesn't support logrecords," - << " fallback to blocking mode." << dendl; + // not an error, logrecord is not supported, change to block reshard + ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported" + << " logrecord, falling back to block reshard mode." << dendl; bucket_info.layout.resharding = rgw::BucketReshardState::InProgress; support_logrecord = false; + } else if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: " + << cpp_strerror(ret) << dendl; + return ret; } } - if (!support_logrecord) { + if (support_logrecord) { + if (ret = fault.check("logrecord_writes"); + ret == 0) { // no fault injected, record log with writing to the current index shards + ret = set_resharding_status(dpp, store, bucket_info, + cls_rgw_reshard_status::IN_LOGRECORD); + } + } else { ret = set_resharding_status(dpp, store, bucket_info, - cls_rgw_reshard_status::IN_PROGRESS, - false); + cls_rgw_reshard_status::IN_PROGRESS); } if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to pause " diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc index e0c8eeb13f6c8..0cc322e54b304 100644 --- a/src/test/cls_rgw/test_cls_rgw.cc +++ b/src/test/cls_rgw/test_cls_rgw.cc @@ -1346,7 +1346,7 @@ void set_reshard_status(librados::IoCtx& ioctx, const std::string& oid, { map bucket_objs; bucket_objs[0] = oid; - int r = CLSRGWIssueSetBucketResharding2(ioctx, bucket_objs, entry, 1)(); + int r = CLSRGWIssueSetBucketResharding(ioctx, bucket_objs, entry, 1)(); ASSERT_EQ(0, r); } @@ -1467,6 +1467,6 @@ TEST_F(cls_rgw, reshardlog_num) // record a log in deleting obj not add reshardlog_entry index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, obj1, loc); - index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 1, obj1, meta); - reshardlog_entries(ioctx, bucket_oid, 1u); + index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 2, obj1, meta); + reshardlog_entries(ioctx, bucket_oid, 2u); } -- 2.39.5