From 5194bb6bdd304beb3e9bf509143ae970c9a28eb7 Mon Sep 17 00:00:00 2001 From: liangmingyuan Date: Mon, 18 Mar 2024 17:44:21 +0800 Subject: [PATCH] rgw/reshard: take into account the object stats of dest shards In the progress state, some index entries that have already been copyed to dest shards in logrecord state will be copyed again, we should subtract their stats in dest shards firstly. Signed-off-by: Mingyuan Liang --- src/cls/rgw/cls_rgw.cc | 83 ++++++++++++++++++++++++++++- src/cls/rgw/cls_rgw_client.cc | 29 +++++++++- src/cls/rgw/cls_rgw_client.h | 6 ++- src/cls/rgw/cls_rgw_const.h | 1 + src/cls/rgw/cls_rgw_ops.cc | 9 ++++ src/cls/rgw/cls_rgw_ops.h | 25 ++++++++- src/rgw/driver/rados/rgw_rados.cc | 11 ++++ src/rgw/driver/rados/rgw_rados.h | 1 + src/rgw/driver/rados/rgw_reshard.cc | 66 +++++++++++++++++------ 9 files changed, 210 insertions(+), 21 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 7bd60bf20c854..22d2e863f68aa 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -121,7 +121,7 @@ static bool bi_entry_gt(const string& first, const string& second) /** * return: Plain, Instance, OLH or Invalid */ -BIIndexType bi_type(const string& s, const string& prefix) +BIIndexType bi_type(const string& s, const string& prefix ="") { int ret = bi_entry_type(s.substr(prefix.size())); if (ret < 0) { @@ -768,6 +768,19 @@ int rgw_bucket_update_stats(cls_method_context_t hctx, bufferlist *in, bufferlis } } + for (auto& s : op.dec_stats) { + auto& dest = header.stats[s.first]; + if (op.absolute) { + CLS_LOG(0, "ERROR: %s: there can not be decribed stats when setting absolutly", __func__); + return -EINVAL; + } else { + dest.total_size -= s.second.total_size; + dest.total_size_rounded -= s.second.total_size_rounded; + dest.num_entries -= s.second.num_entries; + dest.actual_size -= s.second.actual_size; + } + } + return write_bucket_header(hctx, &header); } @@ -2821,6 +2834,72 @@ static int rgw_bi_get_op(cls_method_context_t hctx, bufferlist *in, bufferlist * return 0; } +/* gain bi_entry based on reshard log */ +static int rgw_bi_get_vals_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + // decode request + rgw_cls_bi_get_vals_op op; + auto bl_iter = in->cbegin(); + try { + decode(op, bl_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: %s: failed to decode request", __func__); + return -EINVAL; + } + + map keys; + int ret = cls_cxx_map_get_vals_by_keys(hctx, op.log_entries_wanted, &keys); + if (ret < 0) { + return ret; + } + + rgw_cls_bi_list_ret op_ret; + std::map::iterator iter; + for (iter = keys.begin(); iter != keys.end(); ++iter) { + + rgw_cls_bi_entry entry; + entry.idx = iter->first; + entry.type = bi_type(iter->first); + entry.data = iter->second; + + auto biter = entry.data.cbegin(); + + switch (entry.type) { + case BIIndexType::Plain: + case BIIndexType::Instance: { + rgw_bucket_dir_entry e; + try { + decode(e, biter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: %s: failed to decode buffer", __func__); + return -EIO; + } + break; + } + case BIIndexType::OLH: { + rgw_bucket_olh_entry e; + try { + decode(e, biter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: %s: failed to decode buffer (size=%d)", __func__, entry.data.length()); + return -EIO; + } + break; + } + default: + CLS_LOG(0, "%s: invalid entry type: %d", __func__, int(entry.type)); + return -EINVAL; + } + CLS_LOG(20, "%s: entry.idx=%s", __func__, escape_str(entry.idx).c_str()); + + op_ret.entries.push_back(entry); + } + + encode(op_ret, *out); + + return 0; +} + static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { CLS_LOG(10, "entered %s", __func__); @@ -4894,6 +4973,7 @@ CLS_INIT(rgw) cls_method_handle_t h_rgw_obj_check_attrs_prefix; cls_method_handle_t h_rgw_obj_check_mtime; cls_method_handle_t h_rgw_bi_get_op; + cls_method_handle_t h_rgw_bi_get_vals_op; cls_method_handle_t h_rgw_bi_put_op; cls_method_handle_t h_rgw_bi_list_op; cls_method_handle_t h_rgw_bi_log_list_op; @@ -4949,6 +5029,7 @@ CLS_INIT(rgw) cls_register_cxx_method(h_class, RGW_OBJ_CHECK_MTIME, CLS_METHOD_RD, rgw_obj_check_mtime, &h_rgw_obj_check_mtime); cls_register_cxx_method(h_class, RGW_BI_GET, CLS_METHOD_RD, rgw_bi_get_op, &h_rgw_bi_get_op); + cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op); cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op); cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op); diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index bfdac5259aa6e..f8990dfe02ee0 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -253,11 +253,14 @@ int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid) void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, bool absolute, - const map& stats) + const map& stats, + const map* dec_stats) { rgw_cls_bucket_update_stats_op call; call.absolute = absolute; call.stats = stats; + if (dec_stats != NULL) + call.dec_stats = *dec_stats; bufferlist in; encode(call, in); o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in); @@ -443,6 +446,30 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid, return 0; } +int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, + std::set log_entries_wanted, + std::list *entries) +{ + bufferlist in, out; + struct rgw_cls_bi_get_vals_op call; + call.log_entries_wanted = log_entries_wanted; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET_VALS, in, out); + if (r < 0) + return r; + + struct rgw_cls_bi_list_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + entries->swap(op_ret.entries); + + return 0; +} int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry) { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 6f39858b10ba4..2d4fd46e74aff 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -346,7 +346,8 @@ public: void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, bool absolute, - const std::map& stats); + const std::map& stats, + const std::map* dec_stats = nullptr); void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, const std::string& tag, const cls_rgw_obj_key& key, const std::string& locator, bool log_op, @@ -368,6 +369,9 @@ void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const ceph::real_time int cls_rgw_bi_get(librados::IoCtx& io_ctx, const std::string oid, BIIndexType index_type, const cls_rgw_obj_key& key, rgw_cls_bi_entry *entry); +int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, + std::set log_entries_wanted, + std::list *entries); int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry); void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry); int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid, diff --git a/src/cls/rgw/cls_rgw_const.h b/src/cls/rgw/cls_rgw_const.h index a3398ebd0164a..d8c3f1dcb99e2 100644 --- a/src/cls/rgw/cls_rgw_const.h +++ b/src/cls/rgw/cls_rgw_const.h @@ -33,6 +33,7 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG; #define RGW_OBJ_CHECK_MTIME "obj_check_mtime" #define RGW_BI_GET "bi_get" +#define RGW_BI_GET_VALS "bi_get_vals" #define RGW_BI_PUT "bi_put" #define RGW_BI_LIST "bi_list" diff --git a/src/cls/rgw/cls_rgw_ops.cc b/src/cls/rgw/cls_rgw_ops.cc index 15bcba33330da..d32448517ce3d 100644 --- a/src/cls/rgw/cls_rgw_ops.cc +++ b/src/cls/rgw/cls_rgw_ops.cc @@ -373,6 +373,10 @@ void rgw_cls_bucket_update_stats_op::generate_test_instances(listdec_stats[RGWObjCategory::None]; + dec_s.total_size = 1; + dec_s.total_size_rounded = 4096; + dec_s.num_entries = 1; o.push_back(r); o.push_back(new rgw_cls_bucket_update_stats_op); @@ -386,6 +390,11 @@ void rgw_cls_bucket_update_stats_op::dump(Formatter *f) const s[(int)entry.first] = entry.second; } encode_json("stats", s, f); + map dec_s; + for (auto& entry : dec_stats) { + dec_s[(int)entry.first] = entry.second; + } + encode_json("dec_stats", dec_s, f); } void cls_rgw_bi_log_list_op::dump(Formatter *f) const diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index dd8a7779aeddb..3e36d96c50eb6 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -493,19 +493,23 @@ struct rgw_cls_bucket_update_stats_op { bool absolute{false}; std::map stats; + std::map dec_stats; rgw_cls_bucket_update_stats_op() {} void encode(ceph::buffer::list &bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(absolute, bl); encode(stats, bl); + encode(dec_stats, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator &bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(absolute, bl); decode(stats, bl); + if (struct_v >= 2) + decode(dec_stats, bl); DECODE_FINISH(bl); } void dump(ceph::Formatter *f) const; @@ -727,6 +731,23 @@ struct rgw_cls_bi_get_ret { }; WRITE_CLASS_ENCODER(rgw_cls_bi_get_ret) +struct rgw_cls_bi_get_vals_op { + std::set log_entries_wanted; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(log_entries_wanted, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(log_entries_wanted, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_cls_bi_get_vals_op) + struct rgw_cls_bi_put_op { rgw_cls_bi_entry entry; diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 4939438a68b4f..d79d10238cbf9 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -9162,6 +9162,17 @@ int RGWRados::bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_ return cls_rgw_bi_get(ref.ioctx, ref.obj.oid, index_type, key, entry); } +int RGWRados::bi_get_vals(BucketShard& bs, set log_entries_wanted, + list *entries, optional_yield y) +{ + auto& ref = bs.bucket_obj; + int ret = cls_rgw_bi_get_vals(ref.ioctx, ref.obj.oid, log_entries_wanted, entries); + if (ret < 0) + return ret; + + return 0; +} + void RGWRados::bi_put(ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y) { auto& ref = bs.bucket_obj; diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 7d6299d118ded..67af40f771b4c 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1515,6 +1515,7 @@ public: int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent, optional_yield y); int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh, optional_yield y); int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry, optional_yield y); + int bi_get_vals(BucketShard& bs, std::set log_entries_wanted, std::list *entries, optional_yield y); void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y); int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y); int bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry, optional_yield y); diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index b75bb7bf8cd9d..fce156b69772f 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -217,7 +217,8 @@ public: } int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, - const rgw_bucket_category_stats& entry_stats) { + const rgw_bucket_category_stats& entry_stats, + bool process_log = false) { entries.push_back(entry); if (account) { rgw_bucket_category_stats& target = stats[category]; @@ -227,7 +228,7 @@ public: target.actual_size += entry_stats.actual_size; } if (entries.size() >= reshard_shard_batch_size) { - int ret = flush(); + int ret = flush(process_log); if (ret < 0) { return ret; } @@ -236,16 +237,48 @@ public: return 0; } - int flush() { + int flush(bool process_log = false) { if (entries.size() == 0) { return 0; } librados::ObjectWriteOperation op; - for (auto& entry : entries) { - store->getRados()->bi_put(op, bs, entry, null_yield); + if (process_log) { + map dec_stats; + list dec_entries; + set dec_entry_names_wanted; + for (auto& entry : entries) { + store->getRados()->bi_put(op, bs, entry, null_yield); + dec_entry_names_wanted.emplace(entry.idx); + } + + // getting the index entry in target shard + int ret = store->getRados()->bi_get_vals(bs, dec_entry_names_wanted, &dec_entries, null_yield); + if(ret < 0) { + derr << "ERROR: bi_get_vals(): " << cpp_strerror(-ret) << dendl; + return ret; + } + + for (auto& dec_entry : dec_entries) { + cls_rgw_obj_key cls_key; + RGWObjCategory category; + rgw_bucket_category_stats accounted_stats; + bool account = dec_entry.get_info(&cls_key, &category, &accounted_stats); + if (account) { + auto& dest = dec_stats[category]; + dest.total_size += accounted_stats.total_size; + dest.total_size_rounded += accounted_stats.total_size_rounded; + dest.num_entries += accounted_stats.num_entries; + dest.actual_size += accounted_stats.actual_size; + } + } + cls_rgw_bucket_update_stats(op, false, stats, &dec_stats); + } else { + for (auto& entry : entries) { + store->getRados()->bi_put(op, bs, entry, null_yield); + } + cls_rgw_bucket_update_stats(op, false, stats); } - cls_rgw_bucket_update_stats(op, false, stats); librados::AioCompletion *c; int ret = get_completion(&c); @@ -307,9 +340,10 @@ public: int add_entry(int shard_index, rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, - const rgw_bucket_category_stats& entry_stats) { + const rgw_bucket_category_stats& entry_stats, + bool process_log = false) { int ret = target_shards[shard_index].add_entry(entry, account, category, - entry_stats); + entry_stats, process_log); if (ret < 0) { derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl; @@ -319,10 +353,10 @@ public: return 0; } - int finish() { + int finish(bool process_log = false) { int ret = 0; for (auto& shard : target_shards) { - int r = shard.flush(); + int r = shard.flush(process_log); if (r < 0) { derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl; ret = r; @@ -980,15 +1014,15 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& list entries; string stage; - bool read_reshardlog; + bool process_log = false; switch (reshard_stage) { case rgw::BucketReshardState::InLogrecord: stage = "inventory"; - read_reshardlog = false; + process_log = false; break; case rgw::BucketReshardState::InProgress: stage = "inc"; - read_reshardlog = true; + process_log = true; break; default: ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " unknown reshard stage" << dendl; @@ -1016,7 +1050,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& int ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter, marker, max_op_entries, &entries, - &is_truncated, read_reshardlog, y); + &is_truncated, process_log, y); if (ret == -ENOENT) { ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to find shard " << i << ", skipping" << dendl; @@ -1069,7 +1103,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& int shard_index = (target_shard_id > 0 ? target_shard_id : 0); ret = target_shards_mgr.add_entry(shard_index, entry, account, - category, stats); + category, stats, process_log); if (ret < 0) { return ret; } @@ -1107,7 +1141,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& (*out) << " " << stage_entries << std::endl; } - int ret = target_shards_mgr.finish(); + int ret = target_shards_mgr.finish(process_log); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl; return -EIO; -- 2.39.5