From: liangmingyuan Date: Tue, 26 Mar 2024 01:12:08 +0000 (+0800) Subject: reshard: guarantee no duplicated index entries exist before starting X-Git-Tag: v20.0.0~1122^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=72997836c2fba978b4a5f5cab90df228484b3d83;p=ceph.git reshard: guarantee no duplicated index entries exist before starting reshard There will be duplicated index entries remaining after reshard failed, that can lead to redundant copys in a new reshard process. What's more, if the duplicated entry is deleting operation, and the same entry was written again before a new resharding, the dst index may be deleted wrongly. So duplicated index entries should be cleared after reshard failed and before a new reshard autom automatically. For convenience, rgw-admin can list and purge reshard logsi manually. Signed-off-by: Mingyuan Liang --- diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 8f0190d421814..9f4fd54a67da1 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -3822,6 +3822,48 @@ static int rgw_bi_log_stop(cls_method_context_t hctx, bufferlist *in, bufferlist return write_bucket_header(hctx, &header); } +static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + string key_begin(1, BI_PREFIX_CHAR); + key_begin.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX]); + + string key_end; + key_end = BI_PREFIX_CHAR; + key_end.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX + 1]); + + // list a single key to detect whether the range is empty + const size_t max_entries = 1; + std::set keys; + bool more = false; + + int rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more); + if (rc < 0) { + CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc); + return rc; + } + + if (keys.empty()) { + CLS_LOG(20, "range is empty key_begin=%s", key_begin.c_str()); + return -ENODATA; + } + + const std::string& first_key = *keys.begin(); + if (key_end < first_key) { + CLS_LOG(20, "listed key %s past key_end=%s", first_key.c_str(), key_end.c_str()); + return -ENODATA; + } + + CLS_LOG(20, "listed key %s, removing through %s", + first_key.c_str(), key_end.c_str()); + + rc = cls_cxx_map_remove_range(hctx, first_key, key_end); + if (rc < 0) { + CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc); + return rc; + } + return 0; +} + static void usage_record_prefix_by_time(uint64_t epoch, string& key) { char buf[32]; @@ -4975,6 +5017,7 @@ CLS_INIT(rgw) cls_method_handle_t h_rgw_bi_get_vals_op; cls_method_handle_t h_rgw_bi_put_op; cls_method_handle_t h_rgw_bi_list_op; + cls_method_handle_t h_rgw_reshard_log_trim_op; cls_method_handle_t h_rgw_bi_log_list_op; cls_method_handle_t h_rgw_bi_log_trim_op; cls_method_handle_t h_rgw_bi_log_resync_op; @@ -5032,6 +5075,7 @@ CLS_INIT(rgw) cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op); cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op); cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op); + cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op); cls_register_cxx_method(h_class, RGW_BI_LOG_LIST, CLS_METHOD_RD, rgw_bi_log_list, &h_rgw_bi_log_list_op); cls_register_cxx_method(h_class, RGW_BI_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_log_trim, &h_rgw_bi_log_trim_op); diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index c5ac99eada023..400b7f768a9f0 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -733,6 +733,19 @@ int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid) return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); } +static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id, + BucketIndexAioManager *manager) { + bufferlist in; + ObjectWriteOperation op; + op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid) +{ + return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager); +} + static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager, rgw_cls_check_index_ret *pdata) { bufferlist in; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 86c40dc92787d..e43aa981a37cb 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -522,6 +522,23 @@ public: virtual ~CLSRGWIssueBILogTrim() override {} }; +class CLSRGWIssueReshardLogTrim : public CLSRGWConcurrentIO { +protected: + int issue_op(int shard_id, const std::string& oid) override; + // Trim until -ENODATA is returned. + int valid_ret_code() override { return -ENODATA; } + bool need_multiple_rounds() override { return true; } + void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; } + void reset_container(std::map& objs) override { + objs_container.swap(objs); + iter = objs_container.begin(); + objs.clear(); + } +public: + CLSRGWIssueReshardLogTrim(librados::IoCtx& io_ctx, std::map& _bucket_objs, uint32_t max_aio) : + CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {} +}; + /** * Check the bucket index. * diff --git a/src/cls/rgw/cls_rgw_const.h b/src/cls/rgw/cls_rgw_const.h index 4c8f20ffa9e89..9a4e368575d99 100644 --- a/src/cls/rgw/cls_rgw_const.h +++ b/src/cls/rgw/cls_rgw_const.h @@ -37,6 +37,8 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG; #define RGW_BI_PUT "bi_put" #define RGW_BI_LIST "bi_list" +#define RGW_RESHARD_LOG_TRIM "reshard_log_trim" + #define RGW_BI_LOG_LIST "bi_log_list" #define RGW_BI_LOG_TRIM "bi_log_trim" #define RGW_DIR_SUGGEST_CHANGES "dir_suggest_changes" diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 4be7264d32f6b..aa5e646ed518c 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -7720,8 +7720,8 @@ int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, } else { ldpp_dout(dpp,20) << __func__ << ": reshard lock success, " << "that means the reshard has failed for bucekt " << bucket_info.bucket.bucket_id << dendl; - // clear the RESHARD_IN_PROGRESS status after reshard failed, also set bucket instance - // status to CLS_RGW_RESHARD_NONE + // clear the RESHARD_IN_PROGRESS status after reshard failed, set bucket instance status + // to CLS_RGW_RESHARD_NONE, also clear the reshard log entries ret = RGWBucketReshard::clear_resharding(this->driver, bucket_info, bucket_attrs, dpp, y); reshard_lock.unlock(); if (ret < 0) { @@ -9351,6 +9351,18 @@ int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs) return 0; } +int RGWRados::trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y) +{ + librados::IoCtx index_pool; + map bucket_objs; + + int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr); + if (r < 0) { + return r; + } + return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); +} + int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op, optional_yield y) { return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, y); diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index a2c55b585d465..713e8f351a489 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1540,6 +1540,8 @@ public: std::list *entries, bool *is_truncated, bool reshardlog, optional_yield y); int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs); + int trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y); + int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y); int cls_obj_usage_log_read(const DoutPrefixProvider *dpp, const std::string& oid, const std::string& user, const std::string& bucket, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, std::string& read_iter, diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index 2625b97923339..84c76f2d52d24 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -436,7 +436,8 @@ static int init_target_index(rgw::sal::RadosStore* store, { int ret = store->svc()->bi->init_index(dpp, bucket_info, index, true); if (ret == -EOPNOTSUPP) { - ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord" << dendl; + ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord, " + << "falling back to block reshard mode." << dendl; support_logrecord = false; ret = store->svc()->bi->init_index(dpp, bucket_info, index, false); } @@ -571,6 +572,25 @@ static int init_target_layout(rgw::sal::RadosStore* store, store->svc()->bi->clean_index(dpp, bucket_info, target); return ret; } + + // trim the reshard log entries to guarantee that any existing log entries are cleared, + // if there are no reshard log entries, this is a no-op that costs little time + if (support_logrecord) { + ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield); + if (ret == -EOPNOTSUPP) { + // not an error, logrecord is not supported, change to block reshard + ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported" + << " logrecord, falling back to block reshard mode." << dendl; + bucket_info.layout.resharding = rgw::BucketReshardState::InProgress; + support_logrecord = false; + return 0; + } + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: " + << cpp_strerror(ret) << dendl; + return ret; + } + } return 0; } // init_target_layout @@ -591,6 +611,13 @@ static int revert_target_layout(rgw::sal::RadosStore* store, "target index with: " << cpp_strerror(ret) << dendl; ret = 0; // non-fatal error } + // trim the reshard log entries written in logrecord state + ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to trim " + "reshard log entries: " << cpp_strerror(ret) << dendl; + ret = 0; // non-fatal error + } // retry in case of racing writes to the bucket instance metadata static constexpr auto max_retries = 10; @@ -677,7 +704,9 @@ static int init_reshard(rgw::sal::RadosStore* store, } if (ret == -EOPNOTSUPP) { ldpp_dout(dpp, 0) << "WARNING: " << "set_resharding_status()" - << " doesn't support logrecords" << dendl; + << " doesn't support logrecords," + << " fallback to blocking mode." << dendl; + bucket_info.layout.resharding = rgw::BucketReshardState::InProgress; support_logrecord = false; } } @@ -1251,15 +1280,24 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr if (ret < 0) { return ret; } - } - // block the client op and complete the resharding - ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress); - int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out, - formatter, bucket_info.layout.resharding, dpp, y); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl; - return ret; + // block the client op and complete the resharding + ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress); + ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out, + formatter, bucket_info.layout.resharding, dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl; + return ret; + } + } else { + // setting InProgress state, but doing InLogrecord state + ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress); + int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out, + formatter, rgw::BucketReshardState::InLogrecord, dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl; + return ret; + } } return 0; } // RGWBucketReshard::do_reshard diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index ee8f1451bfa49..a40b8e242129a 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -317,6 +317,8 @@ void usage() cout << " reshard cancel cancel resharding a bucket\n"; cout << " reshard stale-instances list list stale-instances from bucket resharding\n"; cout << " reshard stale-instances delete cleanup stale-instances from bucket resharding\n"; + cout << " reshardlog list list bucket resharding log\n"; + cout << " reshardlog purge trim bucket resharding log\n"; cout << " sync error list list sync error\n"; cout << " sync error trim trim sync error\n"; cout << " mfa create create a new MFA TOTP token\n"; @@ -863,6 +865,8 @@ enum class OPT { MFA_RESYNC, RESHARD_STALE_INSTANCES_LIST, RESHARD_STALE_INSTANCES_DELETE, + RESHARDLOG_LIST, + RESHARDLOG_PURGE, PUBSUB_TOPIC_LIST, PUBSUB_TOPIC_GET, PUBSUB_TOPIC_RM, @@ -1112,6 +1116,8 @@ static SimpleCmd::Commands all_cmds = { { "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST }, { "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE }, { "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE }, + { "reshardlog list", OPT::RESHARDLOG_LIST}, + { "reshardlog purge", OPT::RESHARDLOG_PURGE}, { "topic list", OPT::PUBSUB_TOPIC_LIST }, { "topic get", OPT::PUBSUB_TOPIC_GET }, { "topic rm", OPT::PUBSUB_TOPIC_RM }, @@ -11039,6 +11045,90 @@ next: } } + if (opt_cmd == OPT::RESHARDLOG_LIST) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + list entries; + bool is_truncated; + if (max_entries < 0) + max_entries = 1000; + + const auto& index = bucket->get_info().layout.current_index; + if (index.layout.type == rgw::BucketIndexType::Indexless) { + cerr << "ERROR: indexless bucket has no index to purge" << std::endl; + return EINVAL; + } + + int max_shards = rgw::num_shards(index); + + formatter->open_array_section("entries"); + int i = (specified_shard_id ? shard_id : 0); + for (; i < max_shards; i++) { + formatter->open_object_section("shard"); + encode_json("shard_id", i, formatter.get()); + formatter->open_array_section("single shard entries"); + RGWRados::BucketShard bs(static_cast(driver)->getRados()); + int ret = bs.init(dpp(), bucket->get_info(), index, i, null_yield); + if (ret < 0) { + cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << i << "): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + marker.clear(); + do { + entries.clear(); + ret = static_cast(driver)->getRados()->bi_list(bs, "", marker, max_entries, + &entries, &is_truncated, + true, null_yield); + if (ret < 0) { + cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + formatter->dump_string("idx", entry.idx); + marker = entry.idx; + } + formatter->flush(cout); + } while (is_truncated); + formatter->close_section(); + formatter->close_section(); + formatter->flush(cout); + + if (specified_shard_id) + break; + } + formatter->close_section(); + formatter->flush(cout); + } + + if (opt_cmd == OPT::RESHARDLOG_PURGE) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + ret = static_cast(driver)->getRados()->trim_reshard_log_entries(dpp(), bucket->get_info(), null_yield); + if (ret < 0) { + cerr << "ERROR: trim_reshard_log_entries(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) { if (bucket_name.empty()) { cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;