From 8f68b3fdaec504ab476b6391062ce5c15747fdf8 Mon Sep 17 00:00:00 2001 From: liangmingyuan Date: Fri, 22 Mar 2024 09:14:35 +0800 Subject: [PATCH] reshard: small fix and cleanup At the end of each stage, finish() will be called once to guarantee all entries can be flushed to dst shards. This may costs long time if the counts of dst shards is vast, especially in second stage, so renew reshard_lock is needed. Signed-off-by: Mingyuan Liang --- src/cls/rgw/cls_rgw.cc | 1 - src/cls/rgw/cls_rgw_client.cc | 7 +- src/cls/rgw/cls_rgw_client.h | 2 +- src/rgw/driver/rados/rgw_rados.cc | 2 +- src/rgw/driver/rados/rgw_rados.h | 2 +- src/rgw/driver/rados/rgw_reshard.cc | 110 ++++++++++++++++++---------- src/rgw/driver/rados/rgw_reshard.h | 5 ++ 7 files changed, 84 insertions(+), 45 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index af520b6bb0e22..8f0190d421814 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -2914,7 +2914,6 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist * } rgw_cls_bi_entry& entry = op.entry; - if (entry.type == BIIndexType::ReshardDeleted) { int r = cls_cxx_map_remove_key(hctx, entry.idx); if (r < 0) { diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index a9b1a5bdb1c47..c5ac99eada023 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -471,12 +471,12 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid, } int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, - std::set log_entries_wanted, + std::set& log_entries_wanted, std::list *entries) { bufferlist in, out; struct rgw_cls_bi_get_vals_op call; - call.log_entries_wanted = log_entries_wanted; + call.log_entries_wanted = std::move(log_entries_wanted); encode(call, in); int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET_VALS, in, out); if (r < 0) @@ -490,7 +490,8 @@ int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, return -EIO; } - entries->swap(op_ret.entries); + if (entries) + entries->swap(op_ret.entries); return 0; } diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index c5336030c07c2..86c40dc92787d 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -384,7 +384,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const std::string oid, BIIndexType index_type, const cls_rgw_obj_key& key, rgw_cls_bi_entry *entry); int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, - std::set log_entries_wanted, + std::set& log_entries_wanted, std::list *entries); int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry); void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 1df24538067f4..4be7264d32f6b 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -9239,7 +9239,7 @@ int RGWRados::bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_ return cls_rgw_bi_get(ref.ioctx, ref.obj.oid, index_type, key, entry); } -int RGWRados::bi_get_vals(BucketShard& bs, set log_entries_wanted, +int RGWRados::bi_get_vals(BucketShard& bs, set& log_entries_wanted, list *entries, optional_yield y) { auto& ref = bs.bucket_obj; diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 4321cddf40ee3..a2c55b585d465 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1522,7 +1522,7 @@ public: int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent, optional_yield y); int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh, optional_yield y); int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry, optional_yield y); - int bi_get_vals(BucketShard& bs, std::set log_entries_wanted, std::list *entries, optional_yield y); + int bi_get_vals(BucketShard& bs, std::set& log_entries_wanted, std::list *entries, optional_yield y); void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y); int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y); int bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry, optional_yield y); diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index 3ff4915dd75c3..2625b97923339 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -237,7 +237,8 @@ public: return 0; } - int flush(bool process_log = false) { + int flush(bool process_log = false, RGWBucketReshard *br = nullptr, + const DoutPrefixProvider *dpp = nullptr) { if (entries.size() == 0) { return 0; } @@ -292,6 +293,13 @@ public: } entries.clear(); stats.clear(); + + if (br != nullptr) { + ret = br->renew_lock_if_needed(dpp); + if (ret < 0) { + return ret; + } + } return 0; } @@ -353,10 +361,11 @@ public: return 0; } - int finish(bool process_log = false) { + int finish(bool process_log = false, RGWBucketReshard *br = nullptr, + const DoutPrefixProvider *dpp = nullptr) { int ret = 0; for (auto& shard : target_shards) { - int r = shard.flush(process_log); + int r = shard.flush(process_log, br, dpp); if (r < 0) { derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl; ret = r; @@ -934,7 +943,7 @@ int RGWBucketReshard::cancel(const DoutPrefixProvider* dpp, optional_yield y) return ret; } - if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress || + if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress && bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) { ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl; ret = -EINVAL; @@ -1032,13 +1041,55 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) { return 0; } +int RGWBucketReshard::renew_lock_if_needed(const DoutPrefixProvider *dpp) { + int ret = 0; + Clock::time_point now = Clock::now(); + if (reshard_lock.should_renew(now)) { + // assume outer locks have timespans at least the size of ours, so + // can call inside conditional + if (outer_reshard_lock) { + ret = outer_reshard_lock->renew(now); + if (ret < 0) { + return ret; + } + } + ret = reshard_lock.renew(now); + if (ret < 0) { + ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl; + return ret; + } + } + return 0; +} + +int RGWBucketReshard::calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, + int& shard, const DoutPrefixProvider *dpp) { + int target_shard_id, ret; + + rgw_obj obj(bucket_info.bucket, key); + RGWMPObj mp; + if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { + // place the multipart .meta object on the same shard as its head object + obj.index_hash_source = mp.get_key(); + } + ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal, + obj.get_hash_object(), &target_shard_id); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; + return ret; + } + shard = (target_shard_id > 0 ? target_shard_id : 0); + + return 0; +} + int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current, - int& max_op_entries, - BucketReshardManager& target_shards_mgr, - bool verbose_json_out, - ostream *out, - Formatter *formatter, rgw::BucketReshardState reshard_stage, - const DoutPrefixProvider *dpp, optional_yield y) + int& max_op_entries, + BucketReshardManager& target_shards_mgr, + bool verbose_json_out, + ostream *out, + Formatter *formatter, rgw::BucketReshardState reshard_stage, + const DoutPrefixProvider *dpp, optional_yield y) { list entries; @@ -1065,6 +1116,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& uint64_t stage_entries = 0; stage.append(":"); if (!verbose_json_out && out) { + (*out) << "start time: " << real_clock::now() << std::endl; (*out) << stage; } @@ -1103,7 +1155,6 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& marker = entry.idx; - int target_shard_id; cls_rgw_obj_key cls_key; RGWObjCategory category; rgw_bucket_category_stats stats; @@ -1116,43 +1167,24 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl; continue; } - rgw_obj obj(bucket_info.bucket, key); - RGWMPObj mp; - if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { - // place the multipart .meta object on the same shard as its head object - obj.index_hash_source = mp.get_key(); - } - ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal, - obj.get_hash_object(), &target_shard_id); + + int shard_index; + ret = calc_target_shard(bucket_info, key, shard_index, dpp); if (ret < 0) { - ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; return ret; } - int shard_index = (target_shard_id > 0 ? target_shard_id : 0); - ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats, process_log); if (ret < 0) { return ret; } - Clock::time_point now = Clock::now(); - if (reshard_lock.should_renew(now)) { - // assume outer locks have timespans at least the size of ours, so - // can call inside conditional - if (outer_reshard_lock) { - ret = outer_reshard_lock->renew(now); - if (ret < 0) { - return ret; - } - } - ret = reshard_lock.renew(now); - if (ret < 0) { - ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl; - return ret; - } + ret = renew_lock_if_needed(dpp); + if (ret < 0) { + return ret; } + if (verbose_json_out) { formatter->close_section(); formatter->flush(*out); @@ -1168,13 +1200,15 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& formatter->flush(*out); } else if (out) { (*out) << " " << stage_entries << std::endl; + (*out) << "end time: " << real_clock::now() << std::endl; } - int ret = target_shards_mgr.finish(process_log); + int ret = target_shards_mgr.finish(process_log, this, dpp); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl; return -EIO; } + return 0; } diff --git a/src/rgw/driver/rados/rgw_reshard.h b/src/rgw/driver/rados/rgw_reshard.h index ea5bb6e713d5a..3d056e50f468d 100644 --- a/src/rgw/driver/rados/rgw_reshard.h +++ b/src/rgw/driver/rados/rgw_reshard.h @@ -84,6 +84,9 @@ class RGWBucketReshard { // using an initializer_list as an array in contiguous memory // allocated in at once static const std::initializer_list reshard_primes; + + int calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, + int& shard, const DoutPrefixProvider *dpp); int reshard_process(const rgw::bucket_index_layout_generation& current, int& max_entries, BucketReshardManager& target_shards_mgr, @@ -91,6 +94,7 @@ class RGWBucketReshard { std::ostream *out, Formatter *formatter, rgw::BucketReshardState reshard_stage, const DoutPrefixProvider *dpp, optional_yield y); + int do_reshard(const rgw::bucket_index_layout_generation& current, const rgw::bucket_index_layout_generation& target, int max_entries, bool support_logrecord, @@ -115,6 +119,7 @@ public: RGWReshard *reshard_log = nullptr); int get_status(const DoutPrefixProvider *dpp, std::list *status); int cancel(const DoutPrefixProvider* dpp, optional_yield y); + int renew_lock_if_needed(const DoutPrefixProvider *dpp); static int clear_resharding(rgw::sal::RadosStore* store, RGWBucketInfo& bucket_info, -- 2.39.5