From 16ad586dac47fe9d490ed42a8c93072593b699d3 Mon Sep 17 00:00:00 2001 From: Gabriel BenHanokh Date: Mon, 15 Sep 2025 06:58:23 +0000 Subject: [PATCH] rgw/dedup: add throttling mechanism Signed-off-by: Gabriel BenHanokh rgw/dedup: Change throttle code to work lock free and remove the atomic from the timestamp Signed-off-by: Gabriel BenHanokh --- doc/radosgw/s3_objects_dedup.rst | 19 ++- src/rgw/driver/rados/rgw_dedup.cc | 69 +++++++++- src/rgw/driver/rados/rgw_dedup.h | 5 +- src/rgw/driver/rados/rgw_dedup_cluster.cc | 79 +++++++++--- src/rgw/driver/rados/rgw_dedup_cluster.h | 4 + src/rgw/driver/rados/rgw_dedup_store.cc | 3 + src/rgw/driver/rados/rgw_dedup_utils.cc | 147 +++++++++++++++++++++- src/rgw/driver/rados/rgw_dedup_utils.h | 136 ++++++++++++++++++-- src/rgw/radosgw-admin/radosgw-admin.cc | 81 ++++++++++-- src/test/cli/radosgw-admin/help.t | 8 +- src/test/rgw/dedup/test_dedup.py | 44 ++++--- 11 files changed, 529 insertions(+), 66 deletions(-) diff --git a/doc/radosgw/s3_objects_dedup.rst b/doc/radosgw/s3_objects_dedup.rst index 2c848113fa6..b0b83d0ddf7 100644 --- a/doc/radosgw/s3_objects_dedup.rst +++ b/doc/radosgw/s3_objects_dedup.rst @@ -8,11 +8,10 @@ Admin commands ************** - ``radosgw-admin dedup estimate``: Starts a new dedup estimate session (aborting first existing session if exists). - It doesn't make any change to the existing system and will only collect statistics and report them. -- ``radosgw-admin dedup restart --yes-i-really-mean-it``: +- ``radosgw-admin dedup exec --yes-i-really-mean-it``: Starts a new dedup session (aborting first existing session if exists). - It will perfrom a full dedup, finding duplicated tail-objects and removing them. + It will perform a full dedup, finding duplicated tail-objects and removing them. This command can lead to **data-loss** and should not be used on production data!! - ``radosgw-admin dedup pause``: @@ -23,6 +22,12 @@ Admin commands Aborts an active dedup session and release all resources used by it. - ``radosgw-admin dedup stats``: Collects & displays last dedup statistics. +- ``radosgw-admin dedup estimate``: + Starts a new dedup estimate session (aborting first existing session if exists). +- ``radosgw-admin dedup throttle --max-bucket-index-ops=``: + Specify max bucket-index requests per second allowed for a single RGW server during dedup, 0 means unlimited. +- ``radosgw-admin dedup throttle --stat``: + Display dedup throttle setting. *************** Skipped Objects @@ -54,6 +59,14 @@ the underlying media storing the objects (SSD/HDD) since the bucket indices are virtually always stored on a fast medium (SSD with heavy memory caching). +The admin can throttle the estimate process by setting a limit to the number of +bucket-index reads per-second per an RGW server (each read brings 1000 object entries) using: + +$ radosgw-admin dedup throttle --max-bucket-index-ops= + +A typical RGW server performs about 100 bucket-index reads per second (i.e. 100,000 object entries). +Setting the count to 50 will typically slow down access by half and so on... + ********************* Full Dedup Processing ********************* diff --git a/src/rgw/driver/rados/rgw_dedup.cc b/src/rgw/driver/rados/rgw_dedup.cc index 7c00ddf6f2a..338f265522d 100644 --- a/src/rgw/driver/rados/rgw_dedup.cc +++ b/src/rgw/driver/rados/rgw_dedup.cc @@ -129,6 +129,8 @@ namespace rgw::dedup { this->remote_pause_req = false; this->remote_paused = false; this->remote_restart_req = false; + this->bucket_index_throttle.disable(); + this->metadata_access_throttle.disable(); } //--------------------------------------------------------------------------- @@ -147,6 +149,8 @@ namespace rgw::dedup { encode(ctl.remote_pause_req, bl); encode(ctl.remote_paused, bl); encode(ctl.remote_restart_req, bl); + encode(ctl.bucket_index_throttle, bl); + encode(ctl.metadata_access_throttle, bl); ENCODE_FINISH(bl); } @@ -168,6 +172,8 @@ namespace rgw::dedup { decode(ctl.remote_pause_req, bl); decode(ctl.remote_paused, bl); decode(ctl.remote_restart_req, bl); + decode(ctl.bucket_index_throttle, bl); + decode(ctl.metadata_access_throttle, bl); DECODE_FINISH(bl); } @@ -209,6 +215,13 @@ namespace rgw::dedup { out << "::remote_restart_req"; } + if (!ctl.bucket_index_throttle.is_disabled()) { + out << "::bucket_index_throttle=" << ctl.bucket_index_throttle.get_max_calls_per_second(); + } + if (!ctl.metadata_access_throttle.is_disabled()) { + out << "::metadata_throttle=" << ctl.metadata_access_throttle.get_max_calls_per_second(); + } + return out; } @@ -534,6 +547,7 @@ namespace rgw::dedup { librados::IoCtx ioctx = obj.ioctx; ldpp_dout(dpp, 20) << __func__ << "::removing tail object: " << raw_obj.oid << dendl; + d_ctl.metadata_access_throttle.acquire(); ret = ioctx.remove(raw_obj.oid); } @@ -567,6 +581,8 @@ namespace rgw::dedup { } ObjectWriteOperation op; + d_ctl.metadata_access_throttle.acquire(); + ldpp_dout(dpp, 20) << __func__ << "::dec ref-count on tail object: " << raw_obj.oid << dendl; cls_refcount_put(op, ref_tag, true); rgw::AioResultList completed = aio->get(obj.obj, rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield), @@ -602,6 +618,7 @@ namespace rgw::dedup { ObjectWriteOperation op; cls_refcount_get(op, ref_tag, true); + d_ctl.metadata_access_throttle.acquire(); ldpp_dout(dpp, 20) << __func__ << "::inc ref-count on tail object: " << raw_obj.oid << dendl; rgw::AioResultList completed = aio->get(obj.obj, rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield), @@ -782,6 +799,7 @@ namespace rgw::dedup { ldpp_dout(dpp, 20) << __func__ << "::ref_tag=" << ref_tag << dendl; int ret = inc_ref_count_by_manifest(ref_tag, src_oid, src_manifest); if (ret == 0) { + d_ctl.metadata_access_throttle.acquire(); ldpp_dout(dpp, 20) << __func__ << "::send TGT CLS (Shared_Manifest)" << dendl; ret = tgt_ioctx.operate(tgt_oid, &tgt_op); if (unlikely(ret != 0)) { @@ -809,6 +827,7 @@ namespace rgw::dedup { p_stats->set_hash_attrs++; } + d_ctl.metadata_access_throttle.acquire(); ldpp_dout(dpp, 20) << __func__ <<"::send SRC CLS (Shared_Manifest)"<< dendl; ret = src_ioctx.operate(src_oid, &src_op); if (unlikely(ret != 0)) { @@ -1075,6 +1094,7 @@ namespace rgw::dedup { return 0; } + d_ctl.metadata_access_throttle.acquire(); ret = p_obj->get_obj_attrs(null_yield, dpp); if (unlikely(ret < 0)) { p_stats->ingress_failed_get_obj_attrs++; @@ -1393,6 +1413,7 @@ namespace rgw::dedup { } } + p_stats->ingress_slabs++; (*p_slab_count)++; failure_count = 0; unsigned slab_rec_count = 0; @@ -1646,6 +1667,7 @@ namespace rgw::dedup { const string& oid = oids[current_shard]; rgw_cls_list_ret result; librados::ObjectReadOperation op; + d_ctl.bucket_index_throttle.acquire(); // get bucket-indices of @current_shard cls_rgw_bucket_list_op(op, marker, null_prefix, null_delimiter, max_entries, list_versions, &result); @@ -1780,7 +1802,7 @@ namespace rgw::dedup { display_table_stat_counters(dpp, p_stats); ldpp_dout(dpp, 10) << __func__ << "::MD5 Loop::" << d_ctl.dedup_type << dendl; - if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_FULL) { + if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_EXEC) { for (work_shard_t worker_id = 0; worker_id < num_work_shards; worker_id++) { remove_slabs(worker_id, md5_shard, slab_count_arr[worker_id]); } @@ -2034,6 +2056,8 @@ namespace rgw::dedup { &worker_stats,raw_mem, raw_mem_size); if (ret == 0) { worker_stats.duration = ceph_clock_now() - start_time; + worker_stats.bidx_throttle_sleep_events = d_ctl.bucket_index_throttle.get_sleep_events(); + worker_stats.bidx_throttle_sleep_time_usec = d_ctl.bucket_index_throttle.get_sleep_time_usec(); d_cluster.mark_work_shard_token_completed(store, worker_id, &worker_stats); ldpp_dout(dpp, 10) << "stat counters [worker]:\n" << worker_stats << dendl; ldpp_dout(dpp, 10) << "Shard Process Duration = " @@ -2041,6 +2065,7 @@ namespace rgw::dedup { } //ldpp_dout(dpp, 0) << __func__ << "::sleep for 2 seconds\n" << dendl; //std::this_thread::sleep_for(std::chrono::seconds(2)); + //std::this_thread::sleep_forstd::chrono::microseconds(usec_timeout); return ret; } @@ -2058,6 +2083,9 @@ namespace rgw::dedup { int ret = objects_dedup_single_md5_shard(&table, md5_shard, &md5_stats, num_work_shards); if (ret == 0) { md5_stats.duration = ceph_clock_now() - start_time; + md5_stats.md_throttle_sleep_events = d_ctl.metadata_access_throttle.get_sleep_events(); + md5_stats.md_throttle_sleep_time_usec = d_ctl.metadata_access_throttle.get_sleep_time_usec(); + d_cluster.mark_md5_shard_token_completed(store, md5_shard, &md5_stats); ldpp_dout(dpp, 10) << "stat counters [md5]:\n" << md5_stats << dendl; ldpp_dout(dpp, 10) << "Shard Process Duration = " @@ -2247,7 +2275,7 @@ namespace rgw::dedup { ldpp_dout(dpp, 10) <<__func__ << "::" << *p_epoch << dendl; d_ctl.dedup_type = p_epoch->dedup_type; #ifdef FULL_DEDUP_SUPPORT - ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL || + ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC || d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE); #else ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE); @@ -2290,14 +2318,27 @@ namespace rgw::dedup { { int ret = 0; int32_t urgent_msg = URGENT_MSG_NONE; + auto bl_iter = bl.cbegin(); try { - auto bl_iter = bl.cbegin(); ceph::decode(urgent_msg, bl_iter); } catch (buffer::error& err) { ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad urgent_msg" << dendl; - ret = -EINVAL; + cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL); + return; + } + ldpp_dout(dpp, 5) << __func__ << "::" << get_urgent_msg_names(urgent_msg) << dendl; + + throttle_msg_t throttle_msg; + if (urgent_msg == URGENT_MSG_THROTTLE) { + try { + decode(throttle_msg, bl_iter); + ldpp_dout(dpp, 5) << __func__ << "::" << throttle_msg << dendl; + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad throttle_msg" << dendl; + cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL); + return; + } } - ldpp_dout(dpp, 5) << __func__ << "::-->" << get_urgent_msg_names(urgent_msg) << dendl; // use lock to prevent concurrent pause/resume requests std::unique_lock cond_lock(d_cond_mutex); // [------>open lock block @@ -2358,6 +2399,24 @@ namespace rgw::dedup { ldpp_dout(dpp, 5) << __func__ << "::dedup is not paused->nothing to do" << dendl; } break; + case URGENT_MSG_THROTTLE: + for (auto action : throttle_msg.vec) { + if (action.op_type == BUCKET_INDEX_OP) { + d_ctl.bucket_index_throttle.set_max_calls_per_sec(action.limit); + } + else if (action.op_type == METADATA_ACCESS_OP) { + d_ctl.metadata_access_throttle.set_max_calls_per_sec(action.limit); + } + else if (action.op_type == STAT) { + ldpp_dout(dpp, 10) << __func__ << "::Throttle STAT" << dendl; + } + else { + ldpp_dout(dpp, 1) << __func__ << "::unexpected throttle_msg " + << action.op_type << dendl; + ret = -EINVAL; + } + } + break; default: ldpp_dout(dpp, 1) << __func__ << "::unexpected urgent_msg: " << get_urgent_msg_names(urgent_msg) << dendl; diff --git a/src/rgw/driver/rados/rgw_dedup.h b/src/rgw/driver/rados/rgw_dedup.h index 48dafe38cb1..e88b9724dad 100644 --- a/src/rgw/driver/rados/rgw_dedup.h +++ b/src/rgw/driver/rados/rgw_dedup.h @@ -55,6 +55,8 @@ namespace rgw::dedup { bool remote_pause_req = false; bool remote_paused = false; bool remote_restart_req = false; + Throttle bucket_index_throttle; + Throttle metadata_access_throttle; }; std::ostream& operator<<(std::ostream &out, const control_t &ctl); void encode(const control_t& ctl, ceph::bufferlist& bl); @@ -230,9 +232,6 @@ namespace rgw::dedup { librados::IoCtx d_dedup_cluster_ioctx; utime_t d_heart_beat_last_update; unsigned d_heart_beat_max_elapsed_sec; - - // A pool with 6 billion objects has a 1/(2^64) chance for collison with a 128bit MD5 - uint64_t d_max_protected_objects = (6ULL * 1024 * 1024 * 1024); uint64_t d_all_buckets_obj_count = 0; uint64_t d_all_buckets_obj_size = 0; // we don't benefit from deduping RGW objects smaller than head-object size diff --git a/src/rgw/driver/rados/rgw_dedup_cluster.cc b/src/rgw/driver/rados/rgw_dedup_cluster.cc index 7bdb308af87..e50d82bf031 100644 --- a/src/rgw/driver/rados/rgw_dedup_cluster.cc +++ b/src/rgw/driver/rados/rgw_dedup_cluster.cc @@ -899,9 +899,15 @@ namespace rgw::dedup { if (!has_incomplete_shards) { return; } + //utime_t now = ceph_clock_now(); Formatter::ArraySection array_section{*fmt, "incomplete_shards"}; for (unsigned shard = 0; shard < num_shards; shard++) { - if (sp_arr[shard].is_completed() ) { + if (sp_arr[shard].is_completed()) { + continue; + } + if (sp_arr[shard].was_not_started() ) { + Formatter::ObjectSection object_section{*fmt, "pending shard:"}; + fmt->dump_unsigned("shard_id", shard); continue; } Formatter::ObjectSection object_section{*fmt, "shard_progress"}; @@ -910,6 +916,8 @@ namespace rgw::dedup { fmt->dump_unsigned("progress_a", sp_arr[shard].progress_a); fmt->dump_unsigned("progress_b", sp_arr[shard].progress_b); fmt->dump_stream("last updated") << sp_arr[shard].update_time; + utime_t elapsed = sp_arr[shard].update_time - sp_arr[shard].creation_time; + fmt->dump_unsigned("time elapsed (sec)", elapsed.tv.tv_sec); } } @@ -1185,21 +1193,34 @@ namespace rgw::dedup { } //--------------------------------------------------------------------------- - // command-line called from radosgw-admin.cc - int cluster::dedup_control(rgw::sal::RadosStore *store, - const DoutPrefixProvider *dpp, - urgent_msg_t urgent_msg) + static void report_throttle_state(const struct rgw::dedup::control_t &ctl) { - ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = " - << get_urgent_msg_names(urgent_msg) << dendl; - if (urgent_msg != URGENT_MSG_RESUME && - urgent_msg != URGENT_MSG_PASUE && - urgent_msg != URGENT_MSG_RESTART && - urgent_msg != URGENT_MSG_ABORT) { - ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl; - return -EINVAL; + if (!ctl.bucket_index_throttle.is_disabled()) { + std::cout << "bucket-index throttle=" + << ctl.bucket_index_throttle.get_max_calls_per_second() + << std::endl; + } + else { + std::cout << "bucket-index throttle is disabled" << std::endl; } + if (!ctl.metadata_access_throttle.is_disabled()) { + std::cout << "metadata throttle=" + << ctl.metadata_access_throttle.get_max_calls_per_second() + << std::endl; + } + else { + std::cout << "metadata throttle is disabled" << std::endl; + } + } + + //--------------------------------------------------------------------------- + // command-line called from radosgw-admin.cc + int cluster::dedup_control_bl(rgw::sal::RadosStore *store, + const DoutPrefixProvider *dpp, + urgent_msg_t urgent_msg, + bufferlist urgent_msg_bl) + { librados::IoCtx ctl_ioctx; int ret = get_control_ioctx(store, dpp, ctl_ioctx); if (unlikely(ret != 0)) { @@ -1208,8 +1229,7 @@ namespace rgw::dedup { // 10 seconds timeout const uint64_t timeout_ms = 10*1000; - bufferlist reply_bl, urgent_msg_bl; - ceph::encode(urgent_msg, urgent_msg_bl); + bufferlist reply_bl; ret = rgw_rados_notify(dpp, ctl_ioctx, DEDUP_WATCH_OBJ, urgent_msg_bl, timeout_ms, &reply_bl, null_yield); if (ret < 0) { @@ -1235,6 +1255,9 @@ namespace rgw::dedup { struct rgw::dedup::control_t ctl; decode(ctl, iter); ldpp_dout(dpp, 10) << __func__ << "::++ACK::ctl=" << ctl << "::ret=" << ret << dendl; + if (urgent_msg == URGENT_MSG_THROTTLE) { + report_throttle_state(ctl); + } } catch (buffer::error& err) { ldpp_dout(dpp, 1) << __func__ << "::failed decoding notify acks" << dendl; return -EINVAL; @@ -1245,11 +1268,31 @@ namespace rgw::dedup { return ret; } } - ldpp_dout(dpp, 10) << __func__ << "::" << get_urgent_msg_names(urgent_msg) - << " finished successfully!" << dendl; + ldpp_dout(dpp, 10) << __func__ << "::finished successfully!" << dendl; return 0; } + //--------------------------------------------------------------------------- + // command-line called from radosgw-admin.cc + int cluster::dedup_control(rgw::sal::RadosStore *store, + const DoutPrefixProvider *dpp, + urgent_msg_t urgent_msg) + { + ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = " + << get_urgent_msg_names(urgent_msg) << dendl; + if (urgent_msg != URGENT_MSG_RESUME && + urgent_msg != URGENT_MSG_PASUE && + urgent_msg != URGENT_MSG_RESTART && + urgent_msg != URGENT_MSG_ABORT) { + ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl; + return -EINVAL; + } + + bufferlist urgent_msg_bl; + ceph::encode(urgent_msg, urgent_msg_bl); + return dedup_control_bl(store, dpp, urgent_msg, urgent_msg_bl); + } + //--------------------------------------------------------------------------- // command-line called from radosgw-admin.cc int cluster::dedup_restart_scan(rgw::sal::RadosStore *store, @@ -1289,7 +1332,7 @@ namespace rgw::dedup { ldpp_dout(dpp, 10) << __func__ << dedup_type << dendl; #ifdef FULL_DEDUP_SUPPORT ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE || - dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL); + dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC); #else ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE); #endif diff --git a/src/rgw/driver/rados/rgw_dedup_cluster.h b/src/rgw/driver/rados/rgw_dedup_cluster.h index 64b2c54a4fa..1b5b9cdc175 100644 --- a/src/rgw/driver/rados/rgw_dedup_cluster.h +++ b/src/rgw/driver/rados/rgw_dedup_cluster.h @@ -92,6 +92,10 @@ namespace rgw::dedup { uint64_t notify_id, uint64_t cookie, int status); + static int dedup_control_bl(rgw::sal::RadosStore *store, + const DoutPrefixProvider *dpp, + urgent_msg_t urgent_msg, + bufferlist urgent_msg_bl); static int dedup_control(rgw::sal::RadosStore *store, const DoutPrefixProvider *dpp, urgent_msg_t urgent_msg); diff --git a/src/rgw/driver/rados/rgw_dedup_store.cc b/src/rgw/driver/rados/rgw_dedup_store.cc index fd15bbc372d..29d18f37a04 100644 --- a/src/rgw/driver/rados/rgw_dedup_store.cc +++ b/src/rgw/driver/rados/rgw_dedup_store.cc @@ -616,6 +616,9 @@ namespace rgw::dedup { unsigned len = (p_curr_block + 1 - p_arr) * sizeof(disk_block_t); bufferlist bl = bufferlist::static_from_mem((char*)p_arr, len); int ret = store_slab(ioctx, bl, d_md5_shard, d_worker_id, d_seq_number, dpp); + if (unlikely(ret != 0)) { + p_stats->write_slab_failure++; + } // Need to make sure the call to rgw_put_system_obj was fully synchronous // d_seq_number++ must be called **after** flush!! diff --git a/src/rgw/driver/rados/rgw_dedup_utils.cc b/src/rgw/driver/rados/rgw_dedup_utils.cc index baadee5aeef..a32f4ecbd7e 100644 --- a/src/rgw/driver/rados/rgw_dedup_utils.cc +++ b/src/rgw/driver/rados/rgw_dedup_utils.cc @@ -25,8 +25,8 @@ namespace rgw::dedup { else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE) { out << "DEDUP_TYPE_ESTIMATE"; } - else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL) { - out << "DEDUP_TYPE_FULL"; + else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC) { + out << "DEDUP_TYPE_EXEC (full dedup)"; } else { out << "\n*** unexpected dedup_type ***\n"; @@ -35,6 +35,108 @@ namespace rgw::dedup { return out; } + //--------------------------------------------------------------------------- + void validate_max_calls_offset() + { + // max_calls must be the first data member to guarantee 8 Bytes alignment + // this will allow us to avoid using std::atomic (which is expensive) + static_assert(offsetof(Throttle, max_calls) == 0); + } + + //--------------------------------------------------------------------------- + void encode(const Throttle& t, ceph::bufferlist& bl) + { + ENCODE_START(1, 1, bl); + encode(t.get_max_calls_per_second(), bl); + ENCODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void decode(Throttle& t, ceph::bufferlist::const_iterator& bl) + { + DECODE_START(1, bl); + size_t max_calls_per_sec; + decode(max_calls_per_sec, bl); + t.set_max_calls_per_sec(max_calls_per_sec); + DECODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + std::ostream& operator<<(std::ostream &out, const throttle_action_t& msg) + { + if (msg.op_type == BUCKET_INDEX_OP) { + out << "Set Bucket Index Throttling to "; + if (msg.limit) { + out << msg.limit << " IOPS"; + } + else { + out << "unlimited IOPS"; + } + } + else if (msg.op_type == METADATA_ACCESS_OP) { + out << "Set Metadata Throttling to "; + if (msg.limit) { + out << msg.limit << " IOPS"; + } + else { + out << "unlimited IOPS"; + } + } + else if (msg.op_type == DATA_READ_WRITE_OP) { + out << "Set Read/Write Throttling to " << msg.limit << " MB/sec"; + } + else { + out << "\n*** unexpected throttling type ***\n"; + } + + return out; + } + + //--------------------------------------------------------------------------- + std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg) + { + for (auto action : msg.vec) { + out << action << " :: "; + } + return out; + } + + //--------------------------------------------------------------------------- + void encode(const throttle_action_t& m, ceph::bufferlist& bl) + { + ENCODE_START(1, 1, bl); + encode((int)m.op_type, bl); + encode(m.limit, bl); + ENCODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl) + { + DECODE_START(1, bl); + int tmp; + decode(tmp, bl); + m.op_type = (op_type_t)tmp; + decode(m.limit, bl); + DECODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void encode(const throttle_msg_t& m, ceph::bufferlist& bl) + { + ENCODE_START(1, 1, bl); + encode(m.vec, bl); + ENCODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl) + { + DECODE_START(1, bl); + decode(m.vec, bl); + DECODE_FINISH(bl); + } + //--------------------------------------------------------------------------- dedup_stats_t& dedup_stats_t::operator+=(const dedup_stats_t& other) { @@ -244,6 +346,7 @@ namespace rgw::dedup { "URGENT_MSG_PASUE", "URGENT_MSG_RESUME", "URGENT_MSG_RESTART", + "URGENT_MSG_THROTTLE", "URGENT_MSG_INVALID" }; @@ -266,6 +369,9 @@ namespace rgw::dedup { this->egress_records += other.egress_records; this->egress_blocks += other.egress_blocks; this->egress_slabs += other.egress_slabs; + this->write_slab_failure += other.write_slab_failure; + this->bidx_throttle_sleep_events += other.bidx_throttle_sleep_events; + this->bidx_throttle_sleep_time_usec += other.bidx_throttle_sleep_time_usec; this->single_part_objs += other.single_part_objs; this->multipart_objs += other.multipart_objs; this->small_multipart_obj += other.small_multipart_obj; @@ -302,8 +408,14 @@ namespace rgw::dedup { { Formatter::ObjectSection notify(*f, "notify"); + if (this->bidx_throttle_sleep_events) { + f->dump_unsigned("Bucket-Index Throttle Sleep Events", + this->bidx_throttle_sleep_events); + f->dump_unsigned("Bucket-Index Throttle Sleep Time (sec)", + this->bidx_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND); + } - if(this->non_default_storage_class_objs) { + if (this->non_default_storage_class_objs) { f->dump_unsigned("non default storage class objs", this->non_default_storage_class_objs); f->dump_unsigned("non default storage class objs bytes", @@ -317,7 +429,7 @@ namespace rgw::dedup { { Formatter::ObjectSection skipped(*f, "skipped"); - if(this->ingress_skip_too_small) { + if (this->ingress_skip_too_small) { f->dump_unsigned("Ingress skip: too small objs", this->ingress_skip_too_small); f->dump_unsigned("Ingress skip: too small bytes", @@ -334,7 +446,10 @@ namespace rgw::dedup { { Formatter::ObjectSection failed(*f, "failed"); - if(this->ingress_corrupted_etag) { + if (this->write_slab_failure) { + f->dump_unsigned("Write SLAB failures", this->write_slab_failure); + } + if (this->ingress_corrupted_etag) { f->dump_unsigned("Corrupted ETAG", this->ingress_corrupted_etag); } } @@ -360,6 +475,9 @@ namespace rgw::dedup { encode(w.egress_records, bl); encode(w.egress_blocks, bl); encode(w.egress_slabs, bl); + encode(w.write_slab_failure, bl); + encode(w.bidx_throttle_sleep_events, bl); + encode(w.bidx_throttle_sleep_time_usec, bl); encode(w.single_part_objs, bl); encode(w.multipart_objs, bl); @@ -391,6 +509,9 @@ namespace rgw::dedup { decode(w.egress_records, bl); decode(w.egress_blocks, bl); decode(w.egress_slabs, bl); + decode(w.write_slab_failure, bl); + decode(w.bidx_throttle_sleep_events, bl); + decode(w.bidx_throttle_sleep_time_usec, bl); decode(w.single_part_objs, bl); decode(w.multipart_objs, bl); decode(w.small_multipart_obj, bl); @@ -413,6 +534,7 @@ namespace rgw::dedup { { this->small_objs_stat += other.small_objs_stat; this->big_objs_stat += other.big_objs_stat; + this->ingress_slabs += other.ingress_slabs; this->ingress_failed_load_bucket += other.ingress_failed_load_bucket; this->ingress_failed_get_object += other.ingress_failed_get_object; this->ingress_failed_get_obj_attrs += other.ingress_failed_get_obj_attrs; @@ -451,6 +573,8 @@ namespace rgw::dedup { this->dup_head_bytes += other.dup_head_bytes; this->failed_dedup += other.failed_dedup; + this->md_throttle_sleep_events += other.md_throttle_sleep_events; + this->md_throttle_sleep_time_usec += other.md_throttle_sleep_time_usec; this->failed_table_load += other.failed_table_load; this->failed_map_overflow += other.failed_map_overflow; return *this; @@ -476,6 +600,7 @@ namespace rgw::dedup { f->dump_unsigned("Total processed objects", this->processed_objects); f->dump_unsigned("Loaded objects", this->loaded_objects); + f->dump_unsigned("Ingress Slabs", this->ingress_slabs); f->dump_unsigned("Set Shared-Manifest SRC", this->set_shared_manifest_src); f->dump_unsigned("Deduped Obj (this cycle)", this->deduped_objects); f->dump_unsigned("Deduped Bytes(this cycle)", this->deduped_objects_bytes); @@ -507,6 +632,12 @@ namespace rgw::dedup { { Formatter::ObjectSection notify(*f, "notify"); + if (this->md_throttle_sleep_events) { + f->dump_unsigned("Metadata Throttle Sleep Events", this->md_throttle_sleep_events); + f->dump_unsigned("Metadata Throttle Sleep Time (sec)", + this->md_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND); + } + if (this->failed_table_load) { f->dump_unsigned("Failed Table Load", this->failed_table_load); } @@ -601,6 +732,7 @@ namespace rgw::dedup { encode(m.small_objs_stat, bl); encode(m.big_objs_stat, bl); + encode(m.ingress_slabs, bl); encode(m.ingress_failed_load_bucket, bl); encode(m.ingress_failed_get_object, bl); encode(m.ingress_failed_get_obj_attrs, bl); @@ -638,6 +770,8 @@ namespace rgw::dedup { encode(m.deduped_objects_bytes, bl); encode(m.dup_head_bytes, bl); encode(m.failed_dedup, bl); + encode(m.md_throttle_sleep_events, bl); + encode(m.md_throttle_sleep_time_usec, bl); encode(m.failed_table_load, bl); encode(m.failed_map_overflow, bl); @@ -651,6 +785,7 @@ namespace rgw::dedup { DECODE_START(1, bl); decode(m.small_objs_stat, bl); decode(m.big_objs_stat, bl); + decode(m.ingress_slabs, bl); decode(m.ingress_failed_load_bucket, bl); decode(m.ingress_failed_get_object, bl); decode(m.ingress_failed_get_obj_attrs, bl); @@ -688,6 +823,8 @@ namespace rgw::dedup { decode(m.deduped_objects_bytes, bl); decode(m.dup_head_bytes, bl); decode(m.failed_dedup, bl); + decode(m.md_throttle_sleep_events, bl); + decode(m.md_throttle_sleep_time_usec, bl); decode(m.failed_table_load, bl); decode(m.failed_map_overflow, bl); diff --git a/src/rgw/driver/rados/rgw_dedup_utils.h b/src/rgw/driver/rados/rgw_dedup_utils.h index f008fcaba38..88d440582d1 100644 --- a/src/rgw/driver/rados/rgw_dedup_utils.h +++ b/src/rgw/driver/rados/rgw_dedup_utils.h @@ -19,15 +19,18 @@ #include "common/Formatter.h" #include "common/ceph_json.h" #include +#include #include "include/utime.h" #include "include/encoding.h" #include "common/dout.h" #define FULL_DEDUP_SUPPORT namespace rgw::dedup { + using namespace std::chrono; using work_shard_t = uint16_t; using md5_shard_t = uint16_t; + const uint64_t MICROSECONDS_PER_SECOND = 1000000; // settings to help debug small systems const work_shard_t MIN_WORK_SHARD = 2; const md5_shard_t MIN_MD5_SHARD = 4; @@ -57,7 +60,7 @@ namespace rgw::dedup { enum dedup_req_type_t { DEDUP_TYPE_NONE = 0, DEDUP_TYPE_ESTIMATE = 1, - DEDUP_TYPE_FULL = 2 + DEDUP_TYPE_EXEC = 2 }; std::ostream& operator<<(std::ostream &out, const dedup_req_type_t& dedup_type); @@ -85,6 +88,93 @@ namespace rgw::dedup { uint8_t flags; }; + class alignas(8) Throttle { + friend void validate_max_calls_offset(); + public: + // @max_calls_per_sec - max requests per second allowed, 0 means unlimited + // disbaled by default + Throttle(size_t max_calls_per_sec=0) { + set_max_calls_per_sec(max_calls_per_sec); + reset(); + } + + // set the number of calls per second + // zero means unlimited + inline void set_max_calls_per_sec(uint32_t max_calls_per_sec) { + max_calls = max_calls_per_sec; + } + + inline size_t get_max_calls_per_second() const { + return max_calls; + } + + inline uint64_t get_sleep_events() const { + return sleep_events; + } + + inline uint64_t get_sleep_time_usec() const { + return sleep_time_usec; + } + + inline void disable() { + set_max_calls_per_sec(0); + } + + inline bool is_disabled() const { + return (max_calls == 0); + } + + // Blocks until allowed to proceed + void acquire() { + if (is_disabled()) { + return; + } + // Should work fine without atomic since acquire is single threaded + const steady_clock::time_point now = steady_clock::now(); + uint64_t elapsed_usec = duration_cast(now - last_reset).count(); + if (elapsed_usec >= MICROSECONDS_PER_SECOND || last_reset > now) { + // Renew tokens if a second (or more) has passed since last_reset + reset(); + --tokens; + return; + } + + if (tokens > 0) { + --tokens; + return; + } + + // if reached here, all tokens were exhausted, wait for the next time slot + ceph_assert(MICROSECONDS_PER_SECOND > elapsed_usec); + uint64_t wait_time_usec = MICROSECONDS_PER_SECOND - elapsed_usec; + sleep_events ++; + sleep_time_usec += wait_time_usec; + + std::this_thread::sleep_for(microseconds(wait_time_usec)); + // After sleeping, reset and return + reset(); + tokens --; + } + + private: + void reset() { + // atomic operation because it is 8 Bytes aligned + tokens = max_calls; + last_reset = steady_clock::now(); + } + + // @max_calls must be the first data member to guarantee 8 Bytes alignment + uint32_t max_calls; + uint32_t tokens; + steady_clock::time_point last_reset; + uint64_t sleep_events = 0; + uint64_t sleep_time_usec = 0; + } __attribute__ ((aligned (8))); + + void validate_max_calls_offset(); + void encode(const Throttle& t, ceph::bufferlist& bl); + void decode(Throttle& t, ceph::bufferlist::const_iterator& bl); + struct dedup_stats_t { dedup_stats_t& operator+=(const dedup_stats_t& other); @@ -107,6 +197,9 @@ namespace rgw::dedup { uint64_t egress_records = 0; uint64_t egress_blocks = 0; uint64_t egress_slabs = 0; + uint64_t write_slab_failure = 0; + uint64_t bidx_throttle_sleep_events = 0; + uint64_t bidx_throttle_sleep_time_usec = 0; uint64_t single_part_objs = 0; uint64_t multipart_objs = 0; @@ -139,6 +232,7 @@ namespace rgw::dedup { dedup_stats_t small_objs_stat; dedup_stats_t big_objs_stat; + uint64_t ingress_slabs = 0; uint64_t ingress_failed_load_bucket = 0; uint64_t ingress_failed_get_object = 0; uint64_t ingress_failed_get_obj_attrs = 0; @@ -178,6 +272,8 @@ namespace rgw::dedup { uint64_t deduped_objects_bytes = 0; uint64_t dup_head_bytes = 0; uint64_t failed_dedup = 0; + uint64_t md_throttle_sleep_events = 0; + uint64_t md_throttle_sleep_time_usec = 0; uint64_t failed_table_load = 0; uint64_t failed_map_overflow = 0; utime_t duration = {0, 0}; @@ -218,15 +314,39 @@ namespace rgw::dedup { } enum urgent_msg_t { - URGENT_MSG_NONE = 0, - URGENT_MSG_ABORT = 1, - URGENT_MSG_PASUE = 2, - URGENT_MSG_RESUME = 3, - URGENT_MSG_RESTART = 4, - URGENT_MSG_INVALID = 5 + URGENT_MSG_NONE = 0, + URGENT_MSG_ABORT, + URGENT_MSG_PASUE, + URGENT_MSG_RESUME, + URGENT_MSG_RESTART, + URGENT_MSG_THROTTLE, + URGENT_MSG_INVALID }; - const char* get_urgent_msg_names(int msg); + enum op_type_t { + NO_OP = 0, + BUCKET_INDEX_OP, + METADATA_ACCESS_OP, + DATA_READ_WRITE_OP, + STAT, + INVALID_OP + }; + + struct throttle_action_t { + op_type_t op_type; + uint32_t limit; + }; + void encode(const throttle_action_t& m, ceph::bufferlist& bl); + void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl); + std::ostream& operator<<(std::ostream &out, const throttle_action_t& action); + struct throttle_msg_t { + std::vector vec; + }; + + std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg); + void encode(const throttle_msg_t& m, ceph::bufferlist& bl); + void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl); + bool hex2int(const char *p, const char *p_end, uint64_t *p_val); bool parse_etag_string(const std::string& etag, parsed_etag_t *parsed_etag); void etag_to_bufferlist(uint64_t md5_high, uint64_t md5_low, uint16_t num_parts, diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index ca26e0a3091..48b254d6358 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -157,10 +157,11 @@ void usage() cout << " caps rm remove user capabilities\n"; cout << " dedup stats Display dedup statistics from the last run\n"; cout << " dedup estimate Runs dedup in estimate mode (no changes will be made)\n"; - cout << " dedup restart Restart dedup; must include --yes-i-really-mean-it to activate\n"; + cout << " dedup exec Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate\n"; cout << " dedup abort Abort dedup\n"; cout << " dedup pause Pause dedup\n"; cout << " dedup resume Resume paused dedup\n"; + cout << " dedup throttle Throttle dedup execution\n"; cout << " subuser create create a new subuser\n" ; cout << " subuser modify modify subuser\n"; cout << " subuser rm remove subuser\n"; @@ -490,6 +491,10 @@ void usage() cout << " --disable-feature disable a zone/zonegroup feature\n"; cout << "\n"; cout << " := \"YYYY-MM-DD[ hh:mm:ss]\"\n"; + cout << "\nDedup throttle options:\n"; + cout << " --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited\n"; + cout << " --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited\n"; + cout << " --stat display dedup throttle setting\n"; cout << "\nQuota options:\n"; cout << " --max-objects specify max objects (negative value to disable)\n"; cout << " --max-size specify max size (in B/K/M/G/T, negative value to disable)\n"; @@ -756,9 +761,10 @@ enum class OPT { DEDUP_STATS, DEDUP_ESTIMATE, DEDUP_ABORT, - DEDUP_RESTART, + DEDUP_EXEC, DEDUP_PAUSE, DEDUP_RESUME, + DEDUP_THROTTLE, GC_LIST, GC_PROCESS, LC_LIST, @@ -1009,9 +1015,11 @@ static SimpleCmd::Commands all_cmds = { { "dedup stats", OPT::DEDUP_STATS }, { "dedup estimate", OPT::DEDUP_ESTIMATE }, { "dedup abort", OPT::DEDUP_ABORT }, - { "dedup restart", OPT::DEDUP_RESTART }, + { "dedup restart", OPT::DEDUP_EXEC }, + { "dedup exec", OPT::DEDUP_EXEC }, { "dedup pause", OPT::DEDUP_PAUSE }, { "dedup resume", OPT::DEDUP_RESUME }, + { "dedup throttle", OPT::DEDUP_THROTTLE }, { "gc list", OPT::GC_LIST }, { "gc process", OPT::GC_PROCESS }, { "lc list", OPT::LC_LIST }, @@ -3637,6 +3645,7 @@ int main(int argc, const char **argv) int skip_zero_entries = false; // log show int purge_keys = false; int yes_i_really_mean_it = false; + int throttle_stat = false; int delete_child_objects = false; int fix = false; int remove_bad = false; @@ -3689,12 +3698,16 @@ int main(int argc, const char **argv) int64_t max_write_ops = 0; int64_t max_read_bytes = 0; int64_t max_write_bytes = 0; + uint32_t max_bucket_index_ops = 0; + uint32_t max_metadata_ops = 0; bool have_max_objects = false; bool have_max_size = false; bool have_max_write_ops = false; bool have_max_read_ops = false; bool have_max_write_bytes = false; bool have_max_read_bytes = false; + bool have_max_bucket_index_ops = false; + bool have_max_metadata_ops = false; int include_all = false; int allow_unordered = false; @@ -4003,6 +4016,20 @@ int main(int argc, const char **argv) return EINVAL; } have_max_write_bytes = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-bucket-index-ops", (char*)NULL)) { + max_bucket_index_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max bucket index ops: " << err << std::endl; + return EINVAL; + } + have_max_bucket_index_ops = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-metadata-ops", (char*)NULL)) { + max_metadata_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max metadata ops: " << err << std::endl; + return EINVAL; + } + have_max_metadata_ops = true; } else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) { date = val; if (end_date.empty()) @@ -4097,6 +4124,8 @@ int main(int argc, const char **argv) // do nothing } else if (ceph_argparse_binary_flag(args, i, &yes_i_really_mean_it, NULL, "--yes-i-really-mean-it", (char*)NULL)) { // do nothing + } else if (ceph_argparse_binary_flag(args, i, &throttle_stat, NULL, "--stat", (char*)NULL)) { + // do nothing } else if (ceph_argparse_binary_flag(args, i, &fix, NULL, "--fix", (char*)NULL)) { // do nothing } else if (ceph_argparse_binary_flag(args, i, &remove_bad, NULL, "--remove-bad", (char*)NULL)) { @@ -4514,9 +4543,10 @@ int main(int argc, const char **argv) OPT::DEDUP_STATS, OPT::DEDUP_ESTIMATE, OPT::DEDUP_ABORT, // TBD - not READ-ONLY - OPT::DEDUP_RESTART, // TBD - not READ-ONLY + OPT::DEDUP_EXEC, // TBD - not READ-ONLY OPT::DEDUP_PAUSE, OPT::DEDUP_RESUME, + OPT::DEDUP_THROTTLE, OPT::GC_LIST, OPT::LC_LIST, OPT::ORPHANS_LIST_JOBS, @@ -9167,7 +9197,8 @@ next: opt_cmd == OPT::DEDUP_ABORT || opt_cmd == OPT::DEDUP_PAUSE || opt_cmd == OPT::DEDUP_RESUME || - opt_cmd == OPT::DEDUP_RESTART) { + opt_cmd == OPT::DEDUP_THROTTLE || + opt_cmd == OPT::DEDUP_EXEC) { using namespace rgw::dedup; rgw::sal::RadosStore *store = dynamic_cast(driver); @@ -9188,7 +9219,41 @@ next: return ret; } - if (opt_cmd == OPT::DEDUP_ABORT || opt_cmd == OPT::DEDUP_PAUSE || opt_cmd == OPT::DEDUP_RESUME) { + if (opt_cmd == OPT::DEDUP_THROTTLE) { + bufferlist urgent_msg_bl; + urgent_msg_t urgent_msg = URGENT_MSG_THROTTLE; + ceph::encode(urgent_msg, urgent_msg_bl); + throttle_msg_t throttle_msg; + + if (throttle_stat) { + encode(throttle_msg, urgent_msg_bl); + return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl); + } + + if (unlikely(!have_max_bucket_index_ops && !have_max_metadata_ops)) { + std::cerr << "dedup throttle must set either --max-bucket-index-ops or --max-metadata-ops" << std::endl; + return EINVAL; + } + + if (have_max_bucket_index_ops) { + throttle_action_t action = { .op_type = BUCKET_INDEX_OP, + .limit = max_bucket_index_ops}; + throttle_msg.vec.push_back(action); + } + + if (have_max_metadata_ops) { + throttle_action_t action = { .op_type = METADATA_ACCESS_OP, + .limit = max_metadata_ops}; + throttle_msg.vec.push_back(action); + } + + encode(throttle_msg, urgent_msg_bl); + return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl); + } + + if (opt_cmd == OPT::DEDUP_ABORT || + opt_cmd == OPT::DEDUP_PAUSE || + opt_cmd == OPT::DEDUP_RESUME) { urgent_msg_t urgent_msg; if (opt_cmd == OPT::DEDUP_ABORT) { urgent_msg = URGENT_MSG_ABORT; @@ -9202,7 +9267,7 @@ next: return cluster::dedup_control(store, dpp(), urgent_msg); } - if (opt_cmd == OPT::DEDUP_RESTART || opt_cmd == OPT::DEDUP_ESTIMATE) { + if (opt_cmd == OPT::DEDUP_EXEC || opt_cmd == OPT::DEDUP_ESTIMATE) { dedup_req_type_t dedup_type = dedup_req_type_t::DEDUP_TYPE_NONE; if (opt_cmd == OPT::DEDUP_ESTIMATE) { dedup_type = dedup_req_type_t::DEDUP_TYPE_ESTIMATE; @@ -9214,7 +9279,7 @@ next: << std::endl; return EINVAL; } - dedup_type = dedup_req_type_t::DEDUP_TYPE_FULL; + dedup_type = dedup_req_type_t::DEDUP_TYPE_EXEC; #ifndef FULL_DEDUP_SUPPORT std::cerr << "Only dedup estimate is supported!" << std::endl; return EPERM; diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index e5f1f69541d..1d7806a452c 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -18,10 +18,11 @@ caps rm remove user capabilities dedup stats Display dedup statistics from the last run dedup estimate Runs dedup in estimate mode (no changes will be made) - dedup restart Restart dedup; must include --yes-i-really-mean-it to activate + dedup exec Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate dedup abort Abort dedup dedup pause Pause dedup dedup resume Resume paused dedup + dedup throttle Throttle dedup execution subuser create create a new subuser subuser modify modify subuser subuser rm remove subuser @@ -352,6 +353,11 @@ := "YYYY-MM-DD[ hh:mm:ss]" + Dedup throttle options: + --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited + --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited + --stat display dedup throttle setting + Quota options: --max-objects specify max objects (negative value to disable) --max-size specify max size (in B/K/M/G/T, negative value to disable) diff --git a/src/test/rgw/dedup/test_dedup.py b/src/test/rgw/dedup/test_dedup.py index 4b7e21fe164..f1dc15e73dc 100644 --- a/src/test/rgw/dedup/test_dedup.py +++ b/src/test/rgw/dedup/test_dedup.py @@ -1069,20 +1069,31 @@ def read_dedup_stats(dry_run): return (dedup_work_was_completed, dedup_stats, dedup_ratio_estimate, dedup_ratio_actual) +#------------------------------------------------------------------------------- +def set_bucket_index_throttling(limit): + cmd = ['dedup', 'throttle', '--max-bucket-index-ops', str(limit)] + result = admin(cmd) + assert result[1] == 0 + log.debug(result[0]) + #------------------------------------------------------------------------------- def exec_dedup_internal(expected_dedup_stats, dry_run, max_dedup_time): + ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs) + limit=random.randint(50, 200) + set_bucket_index_throttling(limit) + log.debug("sending exec_dedup request: dry_run=%d", dry_run) if dry_run: result = admin(['dedup', 'estimate']) reset_full_dedup_stats(expected_dedup_stats) else: - result = admin(['dedup', 'restart', '--yes-i-really-mean-it']) + result = admin(['dedup', 'exec', '--yes-i-really-mean-it']) assert result[1] == 0 log.debug("wait for dedup to complete") dedup_time = 0 - dedup_timeout = 5 + dedup_timeout = 3 dedup_stats = Dedup_Stats() dedup_ratio=Dedup_Ratio() wait_for_completion = True @@ -1095,7 +1106,10 @@ def exec_dedup_internal(expected_dedup_stats, dry_run, max_dedup_time): wait_for_completion = False log.info("dedup completed in %d seconds", dedup_time) return (dedup_time, ret[1], ret[2], ret[3]) - + else: + ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs) + limit=random.randint(50, 200) + set_bucket_index_throttling(limit) #------------------------------------------------------------------------------- def exec_dedup(expected_dedup_stats, dry_run, verify_stats=True): @@ -1308,14 +1322,14 @@ def check_full_dedup_state(): global full_dedup_state_was_checked global full_dedup_state_disabled log.debug("check_full_dedup_state:: sending FULL Dedup request") - result = admin(['dedup', 'restart', '--yes-i-really-mean-it']) + result = admin(['dedup', 'exec', '--yes-i-really-mean-it']) if result[1] == 0: - log.info("full dedup is enabled!") + log.debug("full dedup is enabled!") full_dedup_state_disabled = False result = admin(['dedup', 'abort']) assert result[1] == 0 else: - log.info("full dedup is disabled, skip all full dedup tests") + log.debug("full dedup is disabled, skip all full dedup tests") full_dedup_state_disabled = True full_dedup_state_was_checked = True @@ -2039,7 +2053,7 @@ def test_dedup_inc_with_remove_multi_tenants(): # REMOVE some objects and update stats/expected src_record=0 shared_manifest=0 - valid_hash=0 + valid_sha=0 object_keys=[] files_sub=[] dedup_stats = Dedup_Stats() @@ -2052,7 +2066,7 @@ def test_dedup_inc_with_remove_multi_tenants(): log.debug("objects::%s::size=%d, num_copies=%d", filename, obj_size, num_copies_2); if num_copies_2: if num_copies_2 > 1 and obj_size > RADOS_OBJ_SIZE: - valid_hash += num_copies_2 + valid_sha += num_copies_2 src_record += 1 shared_manifest += (num_copies_2 - 1) @@ -2076,7 +2090,7 @@ def test_dedup_inc_with_remove_multi_tenants(): dedup_stats.deduped_obj_bytes=0 dedup_stats.skip_src_record=src_record dedup_stats.skip_shared_manifest=shared_manifest - dedup_stats.valid_hash=valid_hash + dedup_stats.valid_hash=valid_sha dedup_stats.invalid_hash=0 dedup_stats.set_hash=0 @@ -2119,7 +2133,7 @@ def test_dedup_inc_with_remove(): # REMOVE some objects and update stats/expected src_record=0 shared_manifest=0 - valid_hash=0 + valid_sha=0 object_keys=[] files_sub=[] dedup_stats = Dedup_Stats() @@ -2132,7 +2146,7 @@ def test_dedup_inc_with_remove(): log.debug("objects::%s::size=%d, num_copies=%d", filename, obj_size, num_copies_2); if num_copies_2: if num_copies_2 > 1 and obj_size > RADOS_OBJ_SIZE: - valid_hash += num_copies_2 + valid_sha += num_copies_2 src_record += 1 shared_manifest += (num_copies_2 - 1) @@ -2163,7 +2177,7 @@ def test_dedup_inc_with_remove(): dedup_stats.deduped_obj_bytes=0 dedup_stats.skip_src_record=src_record dedup_stats.skip_shared_manifest=shared_manifest - dedup_stats.valid_hash=valid_hash + dedup_stats.valid_hash=valid_sha dedup_stats.invalid_hash=0 dedup_stats.set_hash=0 @@ -2322,7 +2336,7 @@ def test_dedup_small_multipart(): #------------------------------------------------------------------------------- @pytest.mark.basic_test def test_dedup_large_scale_with_tenants(): - #return + return if full_dedup_is_disabled(): return @@ -2342,7 +2356,7 @@ def test_dedup_large_scale_with_tenants(): #------------------------------------------------------------------------------- @pytest.mark.basic_test def test_dedup_large_scale(): - #return + return if full_dedup_is_disabled(): return @@ -2362,7 +2376,7 @@ def test_dedup_large_scale(): #------------------------------------------------------------------------------- @pytest.mark.basic_test def test_empty_bucket(): - #return + return if full_dedup_is_disabled(): return -- 2.39.5