From: Matt Benjamin Date: Tue, 14 Oct 2025 13:21:08 +0000 (-0400) Subject: From efec6856a67b1525606b4cc2cd2861e30ddf0c48 Mon Sep 17 00:00:00 2001 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f6fd333dfeceef9fd41d9ed601a39f71eb90dfe6;p=ceph-ci.git From efec6856a67b1525606b4cc2cd2861e30ddf0c48 Mon Sep 17 00:00:00 2001 From: Gabriel BenHanokh Date: Mon, 15 Sep 2025 06:58:23 +0000 Subject: [PATCH] rgw/dedup: add throttling mechanism Signed-off-by: Gabriel BenHanokh rgw/dedup: Change throttle code to work lock free and remove the atomic from the timestamp Resolves: rhbz#2401399 Signed-off-by: Gabriel BenHanokh (cherry picked from commit 16ad586dac47fe9d490ed42a8c93072593b699d3) Signed-off-by: Matt Benjamin --- diff --git a/doc/radosgw/s3_objects_dedup.rst b/doc/radosgw/s3_objects_dedup.rst index ed54c61c78e..db5cdab28aa 100644 --- a/doc/radosgw/s3_objects_dedup.rst +++ b/doc/radosgw/s3_objects_dedup.rst @@ -12,16 +12,21 @@ Add a radosgw-admin command to collect and report deduplication stats *************** Admin commands: *************** -- ``radosgw-admin dedup stats``: - Collects & displays last dedup statistics +- ``radosgw-admin dedup estimate`` + Starts a new dedup estimate session (aborting first existing session if exists) + It doesn't make any change to the existing system and will only collect statistics and report them. - ``radosgw-admin dedup pause``: Pauses active dedup session (dedup resources are not released) - ``radosgw-admin dedup resume``: Resumes a paused dedup session - ``radosgw-admin dedup abort``: - Aborts active dedup session and release all resources used by it -- ``radosgw-admin dedup estimate`` - Starts a new dedup estimate session (aborting first existing session if exists) + Aborts an active dedup session and release all resources used by it. +- ``radosgw-admin dedup stats``: + Collects & displays last dedup statistics +- ``radosgw-admin dedup throttle --max-bucket-index-ops=``: + Specify max bucket-index requests per second allowed for a single RGW server during dedup, 0 means unlimited. +- ``radosgw-admin dedup throttle --stat``: + Display dedup throttle setting. ---- @@ -46,19 +51,27 @@ information during the estimate process) Estimate Processing: ******************** The Dedup Estimate process collects all the needed information directly from -the bucket-indices reading one full bucket-index object with 1000's of +the bucket indices reading one full bucket index object with thousands of entries at a time. -The Bucket-Indices objects are sharded between the participating -members so every bucket-index object is read exactly one time. -The sharding allow processing to scale almost linearly spliting the +The bucket indices objects are sharded between the participating +members so every bucket index object is read exactly one time. +The sharding allow processing to scale almost linearly splitting the load evenly between the participating members. The Dedup Estimate process does not access the objects themselves (data/metadata) which means its processing time won't be affected by -the underlined media storing the objects (SSD/HDD) since the bucket-indices are +the underlying media storing the objects (SSD/HDD) since the bucket indices are virtually always stored on a fast medium (SSD with heavy memory -caching) +caching). + +The admin can throttle the estimate process by setting a limit to the number of +bucket-index reads per-second per an RGW server (each read brings 1000 object entries) using: + +$ radosgw-admin dedup throttle --max-bucket-index-ops= + +A typical RGW server performs about 100 bucket-index reads per second (i.e. 100,000 object entries). +Setting the count to 50 will typically slow down access by half and so on... ---- diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index f2930319b00..2f9951985b7 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -156,10 +156,11 @@ void usage() cout << " caps rm remove user capabilities\n"; cout << " dedup stats Display dedup statistics from the last run\n"; cout << " dedup estimate Runs dedup in estimate mode (no changes will be made)\n"; - cout << " dedup restart Restart dedup\n"; + cout << " dedup exec Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate\n"; cout << " dedup abort Abort dedup\n"; cout << " dedup pause Pause dedup\n"; cout << " dedup resume Resume paused dedup\n"; + cout << " dedup throttle Throttle dedup execution\n"; cout << " subuser create create a new subuser\n" ; cout << " subuser modify modify subuser\n"; cout << " subuser rm remove subuser\n"; @@ -492,6 +493,10 @@ void usage() cout << " --disable-feature disable a zone/zonegroup feature\n"; cout << "\n"; cout << " := \"YYYY-MM-DD[ hh:mm:ss]\"\n"; + cout << "\nDedup throttle options:\n"; + cout << " --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited\n"; + cout << " --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited\n"; + cout << " --stat display dedup throttle setting\n"; cout << "\nQuota options:\n"; cout << " --max-objects specify max objects (negative value to disable)\n"; cout << " --max-size specify max size (in B/K/M/G/T, negative value to disable)\n"; @@ -761,9 +766,10 @@ enum class OPT { DEDUP_STATS, DEDUP_ESTIMATE, DEDUP_ABORT, - DEDUP_RESTART, + DEDUP_EXEC, DEDUP_PAUSE, DEDUP_RESUME, + DEDUP_THROTTLE, GC_LIST, GC_PROCESS, LC_LIST, @@ -1016,9 +1022,11 @@ static SimpleCmd::Commands all_cmds = { { "dedup stats", OPT::DEDUP_STATS }, { "dedup estimate", OPT::DEDUP_ESTIMATE }, { "dedup abort", OPT::DEDUP_ABORT }, - { "dedup restart", OPT::DEDUP_RESTART }, + { "dedup restart", OPT::DEDUP_EXEC }, + { "dedup exec", OPT::DEDUP_EXEC }, { "dedup pause", OPT::DEDUP_PAUSE }, { "dedup resume", OPT::DEDUP_RESUME }, + { "dedup throttle", OPT::DEDUP_THROTTLE }, { "gc list", OPT::GC_LIST }, { "gc process", OPT::GC_PROCESS }, { "lc list", OPT::LC_LIST }, @@ -3661,6 +3669,7 @@ int main(int argc, const char **argv) int skip_zero_entries = false; // log show int purge_keys = false; int yes_i_really_mean_it = false; + int throttle_stat = false; int delete_child_objects = false; int fix = false; int remove_bad = false; @@ -3715,6 +3724,8 @@ int main(int argc, const char **argv) int64_t max_delete_ops = 0; int64_t max_read_bytes = 0; int64_t max_write_bytes = 0; + uint32_t max_bucket_index_ops = 0; + uint32_t max_metadata_ops = 0; bool have_max_objects = false; bool have_max_size = false; bool have_max_write_ops = false; @@ -3723,6 +3734,8 @@ int main(int argc, const char **argv) bool have_max_delete_ops = false; bool have_max_write_bytes = false; bool have_max_read_bytes = false; + bool have_max_bucket_index_ops = false; + bool have_max_metadata_ops = false; int include_all = false; int allow_unordered = false; @@ -4047,6 +4060,20 @@ int main(int argc, const char **argv) return EINVAL; } have_max_write_bytes = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-bucket-index-ops", (char*)NULL)) { + max_bucket_index_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max bucket index ops: " << err << std::endl; + return EINVAL; + } + have_max_bucket_index_ops = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-metadata-ops", (char*)NULL)) { + max_metadata_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max metadata ops: " << err << std::endl; + return EINVAL; + } + have_max_metadata_ops = true; } else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) { date = val; if (end_date.empty()) @@ -4141,6 +4168,8 @@ int main(int argc, const char **argv) // do nothing } else if (ceph_argparse_binary_flag(args, i, &yes_i_really_mean_it, NULL, "--yes-i-really-mean-it", (char*)NULL)) { // do nothing + } else if (ceph_argparse_binary_flag(args, i, &throttle_stat, NULL, "--stat", (char*)NULL)) { + // do nothing } else if (ceph_argparse_binary_flag(args, i, &fix, NULL, "--fix", (char*)NULL)) { // do nothing } else if (ceph_argparse_binary_flag(args, i, &remove_bad, NULL, "--remove-bad", (char*)NULL)) { @@ -4562,9 +4591,10 @@ int main(int argc, const char **argv) OPT::DEDUP_STATS, OPT::DEDUP_ESTIMATE, OPT::DEDUP_ABORT, // TBD - not READ-ONLY - OPT::DEDUP_RESTART, // TBD - not READ-ONLY + OPT::DEDUP_EXEC, // TBD - not READ-ONLY OPT::DEDUP_PAUSE, OPT::DEDUP_RESUME, + OPT::DEDUP_THROTTLE, OPT::GC_LIST, OPT::LC_LIST, OPT::ORPHANS_LIST_JOBS, @@ -9228,7 +9258,8 @@ next: opt_cmd == OPT::DEDUP_ABORT || opt_cmd == OPT::DEDUP_PAUSE || opt_cmd == OPT::DEDUP_RESUME || - opt_cmd == OPT::DEDUP_RESTART) { + opt_cmd == OPT::DEDUP_THROTTLE || + opt_cmd == OPT::DEDUP_EXEC) { using namespace rgw::dedup; rgw::sal::RadosStore *store = dynamic_cast(driver); @@ -9249,7 +9280,41 @@ next: return ret; } - if (opt_cmd == OPT::DEDUP_ABORT || opt_cmd == OPT::DEDUP_PAUSE || opt_cmd == OPT::DEDUP_RESUME) { + if (opt_cmd == OPT::DEDUP_THROTTLE) { + bufferlist urgent_msg_bl; + urgent_msg_t urgent_msg = URGENT_MSG_THROTTLE; + ceph::encode(urgent_msg, urgent_msg_bl); + throttle_msg_t throttle_msg; + + if (throttle_stat) { + encode(throttle_msg, urgent_msg_bl); + return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl); + } + + if (unlikely(!have_max_bucket_index_ops && !have_max_metadata_ops)) { + std::cerr << "dedup throttle must set either --max-bucket-index-ops or --max-metadata-ops" << std::endl; + return EINVAL; + } + + if (have_max_bucket_index_ops) { + throttle_action_t action = { .op_type = BUCKET_INDEX_OP, + .limit = max_bucket_index_ops}; + throttle_msg.vec.push_back(action); + } + + if (have_max_metadata_ops) { + throttle_action_t action = { .op_type = METADATA_ACCESS_OP, + .limit = max_metadata_ops}; + throttle_msg.vec.push_back(action); + } + + encode(throttle_msg, urgent_msg_bl); + return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl); + } + + if (opt_cmd == OPT::DEDUP_ABORT || + opt_cmd == OPT::DEDUP_PAUSE || + opt_cmd == OPT::DEDUP_RESUME) { urgent_msg_t urgent_msg; if (opt_cmd == OPT::DEDUP_ABORT) { urgent_msg = URGENT_MSG_ABORT; @@ -9263,13 +9328,19 @@ next: return cluster::dedup_control(store, dpp(), urgent_msg); } - if (opt_cmd == OPT::DEDUP_RESTART || opt_cmd == OPT::DEDUP_ESTIMATE) { + if (opt_cmd == OPT::DEDUP_EXEC || opt_cmd == OPT::DEDUP_ESTIMATE) { dedup_req_type_t dedup_type = dedup_req_type_t::DEDUP_TYPE_NONE; if (opt_cmd == OPT::DEDUP_ESTIMATE) { dedup_type = dedup_req_type_t::DEDUP_TYPE_ESTIMATE; } else { - dedup_type = dedup_req_type_t::DEDUP_TYPE_FULL; + if (!yes_i_really_mean_it) { + cerr << "Full Dedup is dangerous and could lead to data loss!\n" + << "do you really mean it? (requires --yes-i-really-mean-it)" + << std::endl; + return EINVAL; + } + dedup_type = dedup_req_type_t::DEDUP_TYPE_EXEC; #ifndef FULL_DEDUP_SUPPORT std::cerr << "Only dedup estimate is supported!" << std::endl; return EPERM; diff --git a/src/rgw/rgw_dedup.cc b/src/rgw/rgw_dedup.cc index 7230f53d509..ebce5960424 100644 --- a/src/rgw/rgw_dedup.cc +++ b/src/rgw/rgw_dedup.cc @@ -129,6 +129,8 @@ namespace rgw::dedup { this->remote_pause_req = false; this->remote_paused = false; this->remote_restart_req = false; + this->bucket_index_throttle.disable(); + this->metadata_access_throttle.disable(); } //--------------------------------------------------------------------------- @@ -147,6 +149,8 @@ namespace rgw::dedup { encode(ctl.remote_pause_req, bl); encode(ctl.remote_paused, bl); encode(ctl.remote_restart_req, bl); + encode(ctl.bucket_index_throttle, bl); + encode(ctl.metadata_access_throttle, bl); ENCODE_FINISH(bl); } @@ -168,6 +172,8 @@ namespace rgw::dedup { decode(ctl.remote_pause_req, bl); decode(ctl.remote_paused, bl); decode(ctl.remote_restart_req, bl); + decode(ctl.bucket_index_throttle, bl); + decode(ctl.metadata_access_throttle, bl); DECODE_FINISH(bl); } @@ -209,6 +215,13 @@ namespace rgw::dedup { out << "::remote_restart_req"; } + if (!ctl.bucket_index_throttle.is_disabled()) { + out << "::bucket_index_throttle=" << ctl.bucket_index_throttle.get_max_calls_per_second(); + } + if (!ctl.metadata_access_throttle.is_disabled()) { + out << "::metadata_throttle=" << ctl.metadata_access_throttle.get_max_calls_per_second(); + } + return out; } @@ -534,6 +547,7 @@ namespace rgw::dedup { librados::IoCtx ioctx = obj.ioctx; ldpp_dout(dpp, 20) << __func__ << "::removing tail object: " << raw_obj.oid << dendl; + d_ctl.metadata_access_throttle.acquire(); ret = ioctx.remove(raw_obj.oid); } @@ -567,6 +581,8 @@ namespace rgw::dedup { } ObjectWriteOperation op; + d_ctl.metadata_access_throttle.acquire(); + ldpp_dout(dpp, 20) << __func__ << "::dec ref-count on tail object: " << raw_obj.oid << dendl; cls_refcount_put(op, ref_tag, true); rgw::AioResultList completed = aio->get(obj.obj, rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield), @@ -602,6 +618,7 @@ namespace rgw::dedup { ObjectWriteOperation op; cls_refcount_get(op, ref_tag, true); + d_ctl.metadata_access_throttle.acquire(); ldpp_dout(dpp, 20) << __func__ << "::inc ref-count on tail object: " << raw_obj.oid << dendl; rgw::AioResultList completed = aio->get(obj.obj, rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield), @@ -782,6 +799,7 @@ namespace rgw::dedup { ldpp_dout(dpp, 20) << __func__ << "::ref_tag=" << ref_tag << dendl; int ret = inc_ref_count_by_manifest(ref_tag, src_oid, src_manifest); if (ret == 0) { + d_ctl.metadata_access_throttle.acquire(); ldpp_dout(dpp, 20) << __func__ << "::send TGT CLS (Shared_Manifest)" << dendl; ret = tgt_ioctx.operate(tgt_oid, &tgt_op); if (unlikely(ret != 0)) { @@ -809,6 +827,7 @@ namespace rgw::dedup { p_stats->set_sha256_attrs++; } + d_ctl.metadata_access_throttle.acquire(); ldpp_dout(dpp, 20) << __func__ <<"::send SRC CLS (Shared_Manifest)"<< dendl; ret = src_ioctx.operate(src_oid, &src_op); if (unlikely(ret != 0)) { @@ -1083,6 +1102,7 @@ namespace rgw::dedup { return 0; } + d_ctl.metadata_access_throttle.acquire(); ret = p_obj->get_obj_attrs(null_yield, dpp); if (unlikely(ret < 0)) { p_stats->ingress_failed_get_obj_attrs++; @@ -1401,6 +1421,7 @@ namespace rgw::dedup { } } + p_stats->ingress_slabs++; (*p_slab_count)++; failure_count = 0; unsigned slab_rec_count = 0; @@ -1655,6 +1676,7 @@ namespace rgw::dedup { const string& oid = oids[current_shard]; rgw_cls_list_ret result; librados::ObjectReadOperation op; + d_ctl.bucket_index_throttle.acquire(); // get bucket-indices of @current_shard cls_rgw_bucket_list_op(op, marker, null_prefix, null_delimiter, max_entries, list_versions, &result); @@ -1789,7 +1811,7 @@ namespace rgw::dedup { display_table_stat_counters(dpp, p_stats); ldpp_dout(dpp, 10) << __func__ << "::MD5 Loop::" << d_ctl.dedup_type << dendl; - if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_FULL) { + if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_EXEC) { for (work_shard_t worker_id = 0; worker_id < num_work_shards; worker_id++) { remove_slabs(worker_id, md5_shard, slab_count_arr[worker_id]); } @@ -2043,6 +2065,8 @@ namespace rgw::dedup { &worker_stats,raw_mem, raw_mem_size); if (ret == 0) { worker_stats.duration = ceph_clock_now() - start_time; + worker_stats.bidx_throttle_sleep_events = d_ctl.bucket_index_throttle.get_sleep_events(); + worker_stats.bidx_throttle_sleep_time_usec = d_ctl.bucket_index_throttle.get_sleep_time_usec(); d_cluster.mark_work_shard_token_completed(store, worker_id, &worker_stats); ldpp_dout(dpp, 10) << "stat counters [worker]:\n" << worker_stats << dendl; ldpp_dout(dpp, 10) << "Shard Process Duration = " @@ -2050,6 +2074,7 @@ namespace rgw::dedup { } //ldpp_dout(dpp, 0) << __func__ << "::sleep for 2 seconds\n" << dendl; //std::this_thread::sleep_for(std::chrono::seconds(2)); + //std::this_thread::sleep_forstd::chrono::microseconds(usec_timeout); return ret; } @@ -2067,6 +2092,9 @@ namespace rgw::dedup { int ret = objects_dedup_single_md5_shard(&table, md5_shard, &md5_stats, num_work_shards); if (ret == 0) { md5_stats.duration = ceph_clock_now() - start_time; + md5_stats.md_throttle_sleep_events = d_ctl.metadata_access_throttle.get_sleep_events(); + md5_stats.md_throttle_sleep_time_usec = d_ctl.metadata_access_throttle.get_sleep_time_usec(); + d_cluster.mark_md5_shard_token_completed(store, md5_shard, &md5_stats); ldpp_dout(dpp, 10) << "stat counters [md5]:\n" << md5_stats << dendl; ldpp_dout(dpp, 10) << "Shard Process Duration = " @@ -2256,7 +2284,7 @@ namespace rgw::dedup { ldpp_dout(dpp, 10) <<__func__ << "::" << *p_epoch << dendl; d_ctl.dedup_type = p_epoch->dedup_type; #ifdef FULL_DEDUP_SUPPORT - ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL || + ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC || d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE); #else ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE); @@ -2299,14 +2327,27 @@ namespace rgw::dedup { { int ret = 0; int32_t urgent_msg = URGENT_MSG_NONE; + auto bl_iter = bl.cbegin(); try { - auto bl_iter = bl.cbegin(); ceph::decode(urgent_msg, bl_iter); } catch (buffer::error& err) { ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad urgent_msg" << dendl; - ret = -EINVAL; + cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL); + return; + } + ldpp_dout(dpp, 5) << __func__ << "::" << get_urgent_msg_names(urgent_msg) << dendl; + + throttle_msg_t throttle_msg; + if (urgent_msg == URGENT_MSG_THROTTLE) { + try { + decode(throttle_msg, bl_iter); + ldpp_dout(dpp, 5) << __func__ << "::" << throttle_msg << dendl; + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad throttle_msg" << dendl; + cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL); + return; + } } - ldpp_dout(dpp, 5) << __func__ << "::-->" << get_urgent_msg_names(urgent_msg) << dendl; // use lock to prevent concurrent pause/resume requests std::unique_lock cond_lock(d_cond_mutex); // [------>open lock block @@ -2367,6 +2408,24 @@ namespace rgw::dedup { ldpp_dout(dpp, 5) << __func__ << "::dedup is not paused->nothing to do" << dendl; } break; + case URGENT_MSG_THROTTLE: + for (auto action : throttle_msg.vec) { + if (action.op_type == BUCKET_INDEX_OP) { + d_ctl.bucket_index_throttle.set_max_calls_per_sec(action.limit); + } + else if (action.op_type == METADATA_ACCESS_OP) { + d_ctl.metadata_access_throttle.set_max_calls_per_sec(action.limit); + } + else if (action.op_type == STAT) { + ldpp_dout(dpp, 10) << __func__ << "::Throttle STAT" << dendl; + } + else { + ldpp_dout(dpp, 1) << __func__ << "::unexpected throttle_msg " + << action.op_type << dendl; + ret = -EINVAL; + } + } + break; default: ldpp_dout(dpp, 1) << __func__ << "::unexpected urgent_msg: " << get_urgent_msg_names(urgent_msg) << dendl; diff --git a/src/rgw/rgw_dedup.h b/src/rgw/rgw_dedup.h index 57ed0e824de..be8253cf6db 100644 --- a/src/rgw/rgw_dedup.h +++ b/src/rgw/rgw_dedup.h @@ -55,6 +55,8 @@ namespace rgw::dedup { bool remote_pause_req = false; bool remote_paused = false; bool remote_restart_req = false; + Throttle bucket_index_throttle; + Throttle metadata_access_throttle; }; std::ostream& operator<<(std::ostream &out, const control_t &ctl); void encode(const control_t& ctl, ceph::bufferlist& bl); @@ -230,9 +232,6 @@ namespace rgw::dedup { librados::IoCtx d_dedup_cluster_ioctx; utime_t d_heart_beat_last_update; unsigned d_heart_beat_max_elapsed_sec; - - // A pool with 6 billion objects has a 1/(2^64) chance for collison with a 128bit MD5 - uint64_t d_max_protected_objects = (6ULL * 1024 * 1024 * 1024); uint64_t d_all_buckets_obj_count = 0; uint64_t d_all_buckets_obj_size = 0; // we don't benefit from deduping RGW objects smaller than head-object size diff --git a/src/rgw/rgw_dedup_cluster.cc b/src/rgw/rgw_dedup_cluster.cc index f18de129a5a..ab59c776d34 100644 --- a/src/rgw/rgw_dedup_cluster.cc +++ b/src/rgw/rgw_dedup_cluster.cc @@ -898,9 +898,15 @@ namespace rgw::dedup { if (!has_incomplete_shards) { return; } + //utime_t now = ceph_clock_now(); Formatter::ArraySection array_section{*fmt, "incomplete_shards"}; for (unsigned shard = 0; shard < num_shards; shard++) { - if (sp_arr[shard].is_completed() ) { + if (sp_arr[shard].is_completed()) { + continue; + } + if (sp_arr[shard].was_not_started() ) { + Formatter::ObjectSection object_section{*fmt, "pending shard:"}; + fmt->dump_unsigned("shard_id", shard); continue; } Formatter::ObjectSection object_section{*fmt, "shard_progress"}; @@ -909,6 +915,8 @@ namespace rgw::dedup { fmt->dump_unsigned("progress_a", sp_arr[shard].progress_a); fmt->dump_unsigned("progress_b", sp_arr[shard].progress_b); fmt->dump_stream("last updated") << sp_arr[shard].update_time; + utime_t elapsed = sp_arr[shard].update_time - sp_arr[shard].creation_time; + fmt->dump_unsigned("time elapsed (sec)", elapsed.tv.tv_sec); } } @@ -1184,21 +1192,34 @@ namespace rgw::dedup { } //--------------------------------------------------------------------------- - // command-line called from radosgw-admin.cc - int cluster::dedup_control(rgw::sal::RadosStore *store, - const DoutPrefixProvider *dpp, - urgent_msg_t urgent_msg) + static void report_throttle_state(const struct rgw::dedup::control_t &ctl) { - ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = " - << get_urgent_msg_names(urgent_msg) << dendl; - if (urgent_msg != URGENT_MSG_RESUME && - urgent_msg != URGENT_MSG_PASUE && - urgent_msg != URGENT_MSG_RESTART && - urgent_msg != URGENT_MSG_ABORT) { - ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl; - return -EINVAL; + if (!ctl.bucket_index_throttle.is_disabled()) { + std::cout << "bucket-index throttle=" + << ctl.bucket_index_throttle.get_max_calls_per_second() + << std::endl; + } + else { + std::cout << "bucket-index throttle is disabled" << std::endl; } + if (!ctl.metadata_access_throttle.is_disabled()) { + std::cout << "metadata throttle=" + << ctl.metadata_access_throttle.get_max_calls_per_second() + << std::endl; + } + else { + std::cout << "metadata throttle is disabled" << std::endl; + } + } + + //--------------------------------------------------------------------------- + // command-line called from radosgw-admin.cc + int cluster::dedup_control_bl(rgw::sal::RadosStore *store, + const DoutPrefixProvider *dpp, + urgent_msg_t urgent_msg, + bufferlist urgent_msg_bl) + { librados::IoCtx ctl_ioctx; int ret = get_control_ioctx(store, dpp, ctl_ioctx); if (unlikely(ret != 0)) { @@ -1207,8 +1228,7 @@ namespace rgw::dedup { // 10 seconds timeout const uint64_t timeout_ms = 10*1000; - bufferlist reply_bl, urgent_msg_bl; - ceph::encode(urgent_msg, urgent_msg_bl); + bufferlist reply_bl; ret = rgw_rados_notify(dpp, ctl_ioctx, DEDUP_WATCH_OBJ, urgent_msg_bl, timeout_ms, &reply_bl, null_yield); if (ret < 0) { @@ -1234,6 +1254,9 @@ namespace rgw::dedup { struct rgw::dedup::control_t ctl; decode(ctl, iter); ldpp_dout(dpp, 10) << __func__ << "::++ACK::ctl=" << ctl << "::ret=" << ret << dendl; + if (urgent_msg == URGENT_MSG_THROTTLE) { + report_throttle_state(ctl); + } } catch (buffer::error& err) { ldpp_dout(dpp, 1) << __func__ << "::failed decoding notify acks" << dendl; return -EINVAL; @@ -1244,11 +1267,31 @@ namespace rgw::dedup { return ret; } } - ldpp_dout(dpp, 10) << __func__ << "::" << get_urgent_msg_names(urgent_msg) - << " finished successfully!" << dendl; + ldpp_dout(dpp, 10) << __func__ << "::finished successfully!" << dendl; return 0; } + //--------------------------------------------------------------------------- + // command-line called from radosgw-admin.cc + int cluster::dedup_control(rgw::sal::RadosStore *store, + const DoutPrefixProvider *dpp, + urgent_msg_t urgent_msg) + { + ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = " + << get_urgent_msg_names(urgent_msg) << dendl; + if (urgent_msg != URGENT_MSG_RESUME && + urgent_msg != URGENT_MSG_PASUE && + urgent_msg != URGENT_MSG_RESTART && + urgent_msg != URGENT_MSG_ABORT) { + ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl; + return -EINVAL; + } + + bufferlist urgent_msg_bl; + ceph::encode(urgent_msg, urgent_msg_bl); + return dedup_control_bl(store, dpp, urgent_msg, urgent_msg_bl); + } + //--------------------------------------------------------------------------- // command-line called from radosgw-admin.cc int cluster::dedup_restart_scan(rgw::sal::RadosStore *store, @@ -1288,7 +1331,7 @@ namespace rgw::dedup { ldpp_dout(dpp, 10) << __func__ << dedup_type << dendl; #ifdef FULL_DEDUP_SUPPORT ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE || - dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL); + dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC); #else ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE); #endif diff --git a/src/rgw/rgw_dedup_cluster.h b/src/rgw/rgw_dedup_cluster.h index 64b2c54a4fa..1b5b9cdc175 100644 --- a/src/rgw/rgw_dedup_cluster.h +++ b/src/rgw/rgw_dedup_cluster.h @@ -92,6 +92,10 @@ namespace rgw::dedup { uint64_t notify_id, uint64_t cookie, int status); + static int dedup_control_bl(rgw::sal::RadosStore *store, + const DoutPrefixProvider *dpp, + urgent_msg_t urgent_msg, + bufferlist urgent_msg_bl); static int dedup_control(rgw::sal::RadosStore *store, const DoutPrefixProvider *dpp, urgent_msg_t urgent_msg); diff --git a/src/rgw/rgw_dedup_store.cc b/src/rgw/rgw_dedup_store.cc index 18898bbba95..b087c39f153 100644 --- a/src/rgw/rgw_dedup_store.cc +++ b/src/rgw/rgw_dedup_store.cc @@ -607,6 +607,9 @@ namespace rgw::dedup { unsigned len = (p_curr_block + 1 - p_arr) * sizeof(disk_block_t); bufferlist bl = bufferlist::static_from_mem((char*)p_arr, len); int ret = store_slab(ioctx, bl, d_md5_shard, d_worker_id, d_seq_number, dpp); + if (unlikely(ret != 0)) { + p_stats->write_slab_failure++; + } // Need to make sure the call to rgw_put_system_obj was fully synchronous // d_seq_number++ must be called **after** flush!! diff --git a/src/rgw/rgw_dedup_utils.cc b/src/rgw/rgw_dedup_utils.cc index e4c40c51728..4b0e032008e 100644 --- a/src/rgw/rgw_dedup_utils.cc +++ b/src/rgw/rgw_dedup_utils.cc @@ -25,8 +25,8 @@ namespace rgw::dedup { else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE) { out << "DEDUP_TYPE_ESTIMATE"; } - else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL) { - out << "DEDUP_TYPE_FULL"; + else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC) { + out << "DEDUP_TYPE_EXEC (full dedup)"; } else { out << "\n*** unexpected dedup_type ***\n"; @@ -35,6 +35,108 @@ namespace rgw::dedup { return out; } + //--------------------------------------------------------------------------- + void validate_max_calls_offset() + { + // max_calls must be the first data member to guarantee 8 Bytes alignment + // this will allow us to avoid using std::atomic (which is expensive) + static_assert(offsetof(Throttle, max_calls) == 0); + } + + //--------------------------------------------------------------------------- + void encode(const Throttle& t, ceph::bufferlist& bl) + { + ENCODE_START(1, 1, bl); + encode(t.get_max_calls_per_second(), bl); + ENCODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void decode(Throttle& t, ceph::bufferlist::const_iterator& bl) + { + DECODE_START(1, bl); + size_t max_calls_per_sec; + decode(max_calls_per_sec, bl); + t.set_max_calls_per_sec(max_calls_per_sec); + DECODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + std::ostream& operator<<(std::ostream &out, const throttle_action_t& msg) + { + if (msg.op_type == BUCKET_INDEX_OP) { + out << "Set Bucket Index Throttling to "; + if (msg.limit) { + out << msg.limit << " IOPS"; + } + else { + out << "unlimited IOPS"; + } + } + else if (msg.op_type == METADATA_ACCESS_OP) { + out << "Set Metadata Throttling to "; + if (msg.limit) { + out << msg.limit << " IOPS"; + } + else { + out << "unlimited IOPS"; + } + } + else if (msg.op_type == DATA_READ_WRITE_OP) { + out << "Set Read/Write Throttling to " << msg.limit << " MB/sec"; + } + else { + out << "\n*** unexpected throttling type ***\n"; + } + + return out; + } + + //--------------------------------------------------------------------------- + std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg) + { + for (auto action : msg.vec) { + out << action << " :: "; + } + return out; + } + + //--------------------------------------------------------------------------- + void encode(const throttle_action_t& m, ceph::bufferlist& bl) + { + ENCODE_START(1, 1, bl); + encode((int)m.op_type, bl); + encode(m.limit, bl); + ENCODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl) + { + DECODE_START(1, bl); + int tmp; + decode(tmp, bl); + m.op_type = (op_type_t)tmp; + decode(m.limit, bl); + DECODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void encode(const throttle_msg_t& m, ceph::bufferlist& bl) + { + ENCODE_START(1, 1, bl); + encode(m.vec, bl); + ENCODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl) + { + DECODE_START(1, bl); + decode(m.vec, bl); + DECODE_FINISH(bl); + } + //--------------------------------------------------------------------------- dedup_stats_t& dedup_stats_t::operator+=(const dedup_stats_t& other) { @@ -244,6 +346,7 @@ namespace rgw::dedup { "URGENT_MSG_PASUE", "URGENT_MSG_RESUME", "URGENT_MSG_RESTART", + "URGENT_MSG_THROTTLE", "URGENT_MSG_INVALID" }; @@ -266,6 +369,9 @@ namespace rgw::dedup { this->egress_records += other.egress_records; this->egress_blocks += other.egress_blocks; this->egress_slabs += other.egress_slabs; + this->write_slab_failure += other.write_slab_failure; + this->bidx_throttle_sleep_events += other.bidx_throttle_sleep_events; + this->bidx_throttle_sleep_time_usec += other.bidx_throttle_sleep_time_usec; this->single_part_objs += other.single_part_objs; this->multipart_objs += other.multipart_objs; this->small_multipart_obj += other.small_multipart_obj; @@ -302,8 +408,14 @@ namespace rgw::dedup { { Formatter::ObjectSection notify(*f, "notify"); + if (this->bidx_throttle_sleep_events) { + f->dump_unsigned("Bucket-Index Throttle Sleep Events", + this->bidx_throttle_sleep_events); + f->dump_unsigned("Bucket-Index Throttle Sleep Time (sec)", + this->bidx_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND); + } - if(this->non_default_storage_class_objs) { + if (this->non_default_storage_class_objs) { f->dump_unsigned("non default storage class objs", this->non_default_storage_class_objs); f->dump_unsigned("non default storage class objs bytes", @@ -323,7 +435,7 @@ namespace rgw::dedup { { Formatter::ObjectSection skipped(*f, "skipped"); - if(this->ingress_skip_too_small) { + if (this->ingress_skip_too_small) { f->dump_unsigned("Ingress skip: too small objs", this->ingress_skip_too_small); f->dump_unsigned("Ingress skip: too small bytes", @@ -340,7 +452,10 @@ namespace rgw::dedup { { Formatter::ObjectSection failed(*f, "failed"); - if(this->ingress_corrupted_etag) { + if (this->write_slab_failure) { + f->dump_unsigned("Write SLAB failures", this->write_slab_failure); + } + if (this->ingress_corrupted_etag) { f->dump_unsigned("Corrupted ETAG", this->ingress_corrupted_etag); } } @@ -366,6 +481,9 @@ namespace rgw::dedup { encode(w.egress_records, bl); encode(w.egress_blocks, bl); encode(w.egress_slabs, bl); + encode(w.write_slab_failure, bl); + encode(w.bidx_throttle_sleep_events, bl); + encode(w.bidx_throttle_sleep_time_usec, bl); encode(w.single_part_objs, bl); encode(w.multipart_objs, bl); @@ -397,6 +515,9 @@ namespace rgw::dedup { decode(w.egress_records, bl); decode(w.egress_blocks, bl); decode(w.egress_slabs, bl); + decode(w.write_slab_failure, bl); + decode(w.bidx_throttle_sleep_events, bl); + decode(w.bidx_throttle_sleep_time_usec, bl); decode(w.single_part_objs, bl); decode(w.multipart_objs, bl); decode(w.small_multipart_obj, bl); @@ -419,6 +540,7 @@ namespace rgw::dedup { { this->small_objs_stat += other.small_objs_stat; this->big_objs_stat += other.big_objs_stat; + this->ingress_slabs += other.ingress_slabs; this->ingress_failed_load_bucket += other.ingress_failed_load_bucket; this->ingress_failed_get_object += other.ingress_failed_get_object; this->ingress_failed_get_obj_attrs += other.ingress_failed_get_obj_attrs; @@ -457,6 +579,8 @@ namespace rgw::dedup { this->dup_head_bytes += other.dup_head_bytes; this->failed_dedup += other.failed_dedup; + this->md_throttle_sleep_events += other.md_throttle_sleep_events; + this->md_throttle_sleep_time_usec += other.md_throttle_sleep_time_usec; this->failed_table_load += other.failed_table_load; this->failed_map_overflow += other.failed_map_overflow; return *this; @@ -482,6 +606,7 @@ namespace rgw::dedup { f->dump_unsigned("Total processed objects", this->processed_objects); f->dump_unsigned("Loaded objects", this->loaded_objects); + f->dump_unsigned("Ingress Slabs", this->ingress_slabs); f->dump_unsigned("Set Shared-Manifest SRC", this->set_shared_manifest_src); f->dump_unsigned("Deduped Obj (this cycle)", this->deduped_objects); f->dump_unsigned("Deduped Bytes(this cycle)", this->deduped_objects_bytes); @@ -513,6 +638,12 @@ namespace rgw::dedup { { Formatter::ObjectSection notify(*f, "notify"); + if (this->md_throttle_sleep_events) { + f->dump_unsigned("Metadata Throttle Sleep Events", this->md_throttle_sleep_events); + f->dump_unsigned("Metadata Throttle Sleep Time (sec)", + this->md_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND); + } + if (this->failed_table_load) { f->dump_unsigned("Failed Table Load", this->failed_table_load); } @@ -607,6 +738,7 @@ namespace rgw::dedup { encode(m.small_objs_stat, bl); encode(m.big_objs_stat, bl); + encode(m.ingress_slabs, bl); encode(m.ingress_failed_load_bucket, bl); encode(m.ingress_failed_get_object, bl); encode(m.ingress_failed_get_obj_attrs, bl); @@ -644,6 +776,8 @@ namespace rgw::dedup { encode(m.deduped_objects_bytes, bl); encode(m.dup_head_bytes, bl); encode(m.failed_dedup, bl); + encode(m.md_throttle_sleep_events, bl); + encode(m.md_throttle_sleep_time_usec, bl); encode(m.failed_table_load, bl); encode(m.failed_map_overflow, bl); @@ -657,6 +791,7 @@ namespace rgw::dedup { DECODE_START(1, bl); decode(m.small_objs_stat, bl); decode(m.big_objs_stat, bl); + decode(m.ingress_slabs, bl); decode(m.ingress_failed_load_bucket, bl); decode(m.ingress_failed_get_object, bl); decode(m.ingress_failed_get_obj_attrs, bl); @@ -694,6 +829,8 @@ namespace rgw::dedup { decode(m.deduped_objects_bytes, bl); decode(m.dup_head_bytes, bl); decode(m.failed_dedup, bl); + decode(m.md_throttle_sleep_events, bl); + decode(m.md_throttle_sleep_time_usec, bl); decode(m.failed_table_load, bl); decode(m.failed_map_overflow, bl); diff --git a/src/rgw/rgw_dedup_utils.h b/src/rgw/rgw_dedup_utils.h index 6a1d0fc0f45..e1c8737557a 100644 --- a/src/rgw/rgw_dedup_utils.h +++ b/src/rgw/rgw_dedup_utils.h @@ -19,15 +19,18 @@ #include "common/Formatter.h" #include "common/ceph_json.h" #include +#include #include "include/utime.h" #include "include/encoding.h" #include "common/dout.h" //#define FULL_DEDUP_SUPPORT namespace rgw::dedup { + using namespace std::chrono; using work_shard_t = uint16_t; using md5_shard_t = uint16_t; + const uint64_t MICROSECONDS_PER_SECOND = 1000000; // settings to help debug small systems const work_shard_t MIN_WORK_SHARD = 2; const md5_shard_t MIN_MD5_SHARD = 4; @@ -57,7 +60,7 @@ namespace rgw::dedup { enum dedup_req_type_t { DEDUP_TYPE_NONE = 0, DEDUP_TYPE_ESTIMATE = 1, - DEDUP_TYPE_FULL = 2 + DEDUP_TYPE_EXEC = 2 }; std::ostream& operator<<(std::ostream &out, const dedup_req_type_t& dedup_type); @@ -85,6 +88,93 @@ namespace rgw::dedup { uint8_t flags; }; + class alignas(8) Throttle { + friend void validate_max_calls_offset(); + public: + // @max_calls_per_sec - max requests per second allowed, 0 means unlimited + // disbaled by default + Throttle(size_t max_calls_per_sec=0) { + set_max_calls_per_sec(max_calls_per_sec); + reset(); + } + + // set the number of calls per second + // zero means unlimited + inline void set_max_calls_per_sec(uint32_t max_calls_per_sec) { + max_calls = max_calls_per_sec; + } + + inline size_t get_max_calls_per_second() const { + return max_calls; + } + + inline uint64_t get_sleep_events() const { + return sleep_events; + } + + inline uint64_t get_sleep_time_usec() const { + return sleep_time_usec; + } + + inline void disable() { + set_max_calls_per_sec(0); + } + + inline bool is_disabled() const { + return (max_calls == 0); + } + + // Blocks until allowed to proceed + void acquire() { + if (is_disabled()) { + return; + } + // Should work fine without atomic since acquire is single threaded + const steady_clock::time_point now = steady_clock::now(); + uint64_t elapsed_usec = duration_cast(now - last_reset).count(); + if (elapsed_usec >= MICROSECONDS_PER_SECOND || last_reset > now) { + // Renew tokens if a second (or more) has passed since last_reset + reset(); + --tokens; + return; + } + + if (tokens > 0) { + --tokens; + return; + } + + // if reached here, all tokens were exhausted, wait for the next time slot + ceph_assert(MICROSECONDS_PER_SECOND > elapsed_usec); + uint64_t wait_time_usec = MICROSECONDS_PER_SECOND - elapsed_usec; + sleep_events ++; + sleep_time_usec += wait_time_usec; + + std::this_thread::sleep_for(microseconds(wait_time_usec)); + // After sleeping, reset and return + reset(); + tokens --; + } + + private: + void reset() { + // atomic operation because it is 8 Bytes aligned + tokens = max_calls; + last_reset = steady_clock::now(); + } + + // @max_calls must be the first data member to guarantee 8 Bytes alignment + uint32_t max_calls; + uint32_t tokens; + steady_clock::time_point last_reset; + uint64_t sleep_events = 0; + uint64_t sleep_time_usec = 0; + } __attribute__ ((aligned (8))); + + void validate_max_calls_offset(); + void encode(const Throttle& t, ceph::bufferlist& bl); + void decode(Throttle& t, ceph::bufferlist::const_iterator& bl); + struct dedup_stats_t { dedup_stats_t& operator+=(const dedup_stats_t& other); @@ -107,6 +197,9 @@ namespace rgw::dedup { uint64_t egress_records = 0; uint64_t egress_blocks = 0; uint64_t egress_slabs = 0; + uint64_t write_slab_failure = 0; + uint64_t bidx_throttle_sleep_events = 0; + uint64_t bidx_throttle_sleep_time_usec = 0; uint64_t single_part_objs = 0; uint64_t multipart_objs = 0; @@ -139,6 +232,7 @@ namespace rgw::dedup { dedup_stats_t small_objs_stat; dedup_stats_t big_objs_stat; + uint64_t ingress_slabs = 0; uint64_t ingress_failed_load_bucket = 0; uint64_t ingress_failed_get_object = 0; uint64_t ingress_failed_get_obj_attrs = 0; @@ -178,6 +272,8 @@ namespace rgw::dedup { uint64_t deduped_objects_bytes = 0; uint64_t dup_head_bytes = 0; uint64_t failed_dedup = 0; + uint64_t md_throttle_sleep_events = 0; + uint64_t md_throttle_sleep_time_usec = 0; uint64_t failed_table_load = 0; uint64_t failed_map_overflow = 0; utime_t duration = {0, 0}; @@ -218,15 +314,39 @@ namespace rgw::dedup { } enum urgent_msg_t { - URGENT_MSG_NONE = 0, - URGENT_MSG_ABORT = 1, - URGENT_MSG_PASUE = 2, - URGENT_MSG_RESUME = 3, - URGENT_MSG_RESTART = 4, - URGENT_MSG_INVALID = 5 + URGENT_MSG_NONE = 0, + URGENT_MSG_ABORT, + URGENT_MSG_PASUE, + URGENT_MSG_RESUME, + URGENT_MSG_RESTART, + URGENT_MSG_THROTTLE, + URGENT_MSG_INVALID }; - const char* get_urgent_msg_names(int msg); + enum op_type_t { + NO_OP = 0, + BUCKET_INDEX_OP, + METADATA_ACCESS_OP, + DATA_READ_WRITE_OP, + STAT, + INVALID_OP + }; + + struct throttle_action_t { + op_type_t op_type; + uint32_t limit; + }; + void encode(const throttle_action_t& m, ceph::bufferlist& bl); + void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl); + std::ostream& operator<<(std::ostream &out, const throttle_action_t& action); + struct throttle_msg_t { + std::vector vec; + }; + + std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg); + void encode(const throttle_msg_t& m, ceph::bufferlist& bl); + void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl); + bool hex2int(const char *p, const char *p_end, uint64_t *p_val); bool parse_etag_string(const std::string& etag, parsed_etag_t *parsed_etag); void etag_to_bufferlist(uint64_t md5_high, uint64_t md5_low, uint16_t num_parts, diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 031d4c0922c..b3604528381 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -18,10 +18,11 @@ caps rm remove user capabilities dedup stats Display dedup statistics from the last run dedup estimate Runs dedup in estimate mode (no changes will be made) - dedup restart Restart dedup + dedup exec Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate dedup abort Abort dedup dedup pause Pause dedup dedup resume Resume paused dedup + dedup throttle Throttle dedup execution subuser create create a new subuser subuser modify modify subuser subuser rm remove subuser @@ -352,6 +353,11 @@ := "YYYY-MM-DD[ hh:mm:ss]" + Dedup throttle options: + --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited + --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited + --stat display dedup throttle setting + Quota options: --max-objects specify max objects (negative value to disable) --max-size specify max size (in B/K/M/G/T, negative value to disable) diff --git a/src/test/rgw/dedup/test_dedup.py b/src/test/rgw/dedup/test_dedup.py index a339e25b6b4..65ce175f9c5 100644 --- a/src/test/rgw/dedup/test_dedup.py +++ b/src/test/rgw/dedup/test_dedup.py @@ -1069,14 +1069,25 @@ def read_dedup_stats(dry_run): return (dedup_work_was_completed, dedup_stats, dedup_ratio_estimate, dedup_ratio_actual) +#------------------------------------------------------------------------------- +def set_bucket_index_throttling(limit): + cmd = ['dedup', 'throttle', '--max-bucket-index-ops', str(limit)] + result = admin(cmd) + assert result[1] == 0 + log.debug(result[0]) + #------------------------------------------------------------------------------- def exec_dedup_internal(expected_dedup_stats, dry_run, max_dedup_time): + ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs) + limit=random.randint(50, 200) + set_bucket_index_throttling(limit) + log.debug("sending exec_dedup request: dry_run=%d", dry_run) if dry_run: result = admin(['dedup', 'estimate']) reset_full_dedup_stats(expected_dedup_stats) else: - result = admin(['dedup', 'restart']) + result = admin(['dedup', 'exec', '--yes-i-really-mean-it']) assert result[1] == 0 log.debug("wait for dedup to complete") @@ -1095,7 +1106,10 @@ def exec_dedup_internal(expected_dedup_stats, dry_run, max_dedup_time): wait_for_completion = False log.info("dedup completed in %d seconds", dedup_time) return (dedup_time, ret[1], ret[2], ret[3]) - + else: + ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs) + limit=random.randint(50, 200) + set_bucket_index_throttling(limit) #------------------------------------------------------------------------------- def exec_dedup(expected_dedup_stats, dry_run, verify_stats=True): @@ -1308,7 +1322,7 @@ def check_full_dedup_state(): global full_dedup_state_was_checked global full_dedup_state_disabled log.debug("check_full_dedup_state:: sending FULL Dedup request") - result = admin(['dedup', 'restart']) + result = admin(['dedup', 'exec', '--yes-i-really-mean-it']) if result[1] == 0: log.debug("full dedup is enabled!") full_dedup_state_disabled = False