]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/dedup: add throttling mechanism wip_dedup_throttle_rebase_Z1
authorGabriel BenHanokh <gbenhano@redhat.com>
Mon, 15 Sep 2025 06:58:23 +0000 (06:58 +0000)
committerGabriel BenHanokh <gbenhano@redhat.com>
Sat, 4 Oct 2025 06:03:31 +0000 (06:03 +0000)
Signed-off-by: Gabriel BenHanokh <gbenhano@redhat.com>
rgw/dedup: Change throttle code to work lock free and remove the atomic
from the timestamp

Signed-off-by: Gabriel BenHanokh <gbenhano@redhat.com>
doc/radosgw/s3_objects_dedup.rst
src/rgw/driver/rados/rgw_dedup.cc
src/rgw/driver/rados/rgw_dedup.h
src/rgw/driver/rados/rgw_dedup_cluster.cc
src/rgw/driver/rados/rgw_dedup_cluster.h
src/rgw/driver/rados/rgw_dedup_store.cc
src/rgw/driver/rados/rgw_dedup_utils.cc
src/rgw/driver/rados/rgw_dedup_utils.h
src/rgw/radosgw-admin/radosgw-admin.cc
src/test/cli/radosgw-admin/help.t
src/test/rgw/dedup/test_dedup.py

index 2c848113fa6a4d51dcd7bf699099fff6887483ed..b0b83d0ddf7d7c06c6952d96ddb2a7064c2a92a6 100644 (file)
@@ -8,11 +8,10 @@ Admin commands
 **************
 - ``radosgw-admin dedup estimate``:
    Starts a new dedup estimate session (aborting first existing session if exists).
-   
    It doesn't make any change to the existing system and will only collect statistics and report them.
-- ``radosgw-admin dedup restart --yes-i-really-mean-it``:
+- ``radosgw-admin dedup exec --yes-i-really-mean-it``:
    Starts a new dedup session (aborting first existing session if exists).
-   It will perfrom a full dedup, finding duplicated tail-objects and removing them.
+   It will perform a full dedup, finding duplicated tail-objects and removing them.
 
   This command can lead to **data-loss** and should not be used on production data!!
 - ``radosgw-admin dedup pause``:
@@ -23,6 +22,12 @@ Admin commands
    Aborts an active dedup session and release all resources used by it.
 - ``radosgw-admin dedup stats``:
    Collects & displays last dedup statistics.
+- ``radosgw-admin dedup estimate``:
+   Starts a new dedup estimate session (aborting first existing session if exists).
+- ``radosgw-admin dedup throttle --max-bucket-index-ops=<count>``:
+   Specify max bucket-index requests per second allowed for a single RGW server during dedup, 0 means unlimited.
+- ``radosgw-admin dedup throttle --stat``:
+   Display dedup throttle setting.
 
 ***************
 Skipped Objects
@@ -54,6 +59,14 @@ the underlying media storing the objects (SSD/HDD) since the bucket indices are
 virtually always stored on a fast medium (SSD with heavy memory
 caching).
 
+The admin can throttle the estimate process by setting a limit to the number of
+bucket-index reads per-second per an RGW server (each read brings 1000 object entries) using:
+
+$ radosgw-admin dedup throttle --max-bucket-index-ops=<count>
+
+A typical RGW server performs about 100 bucket-index reads per second (i.e. 100,000 object entries).
+Setting the count to 50 will typically slow down access by half and so on...
+
 *********************
 Full Dedup Processing
 *********************
index 7c00ddf6f2a737013cdf72c8e3f14697a059cb8d..338f265522de879e49481af0f279f8b25fdb4d79 100644 (file)
@@ -129,6 +129,8 @@ namespace rgw::dedup {
     this->remote_pause_req   = false;
     this->remote_paused      = false;
     this->remote_restart_req = false;
+    this->bucket_index_throttle.disable();
+    this->metadata_access_throttle.disable();
   }
 
   //---------------------------------------------------------------------------
@@ -147,6 +149,8 @@ namespace rgw::dedup {
     encode(ctl.remote_pause_req, bl);
     encode(ctl.remote_paused, bl);
     encode(ctl.remote_restart_req, bl);
+    encode(ctl.bucket_index_throttle, bl);
+    encode(ctl.metadata_access_throttle, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -168,6 +172,8 @@ namespace rgw::dedup {
     decode(ctl.remote_pause_req, bl);
     decode(ctl.remote_paused, bl);
     decode(ctl.remote_restart_req, bl);
+    decode(ctl.bucket_index_throttle, bl);
+    decode(ctl.metadata_access_throttle, bl);
     DECODE_FINISH(bl);
   }
 
@@ -209,6 +215,13 @@ namespace rgw::dedup {
       out << "::remote_restart_req";
     }
 
+    if (!ctl.bucket_index_throttle.is_disabled()) {
+      out << "::bucket_index_throttle=" << ctl.bucket_index_throttle.get_max_calls_per_second();
+    }
+    if (!ctl.metadata_access_throttle.is_disabled()) {
+      out << "::metadata_throttle=" << ctl.metadata_access_throttle.get_max_calls_per_second();
+    }
+
     return out;
   }
 
@@ -534,6 +547,7 @@ namespace rgw::dedup {
       librados::IoCtx ioctx = obj.ioctx;
       ldpp_dout(dpp, 20) << __func__ << "::removing tail object: " << raw_obj.oid
                          << dendl;
+      d_ctl.metadata_access_throttle.acquire();
       ret = ioctx.remove(raw_obj.oid);
     }
 
@@ -567,6 +581,8 @@ namespace rgw::dedup {
       }
 
       ObjectWriteOperation op;
+      d_ctl.metadata_access_throttle.acquire();
+      ldpp_dout(dpp, 20) << __func__ << "::dec ref-count on tail object: " << raw_obj.oid << dendl;
       cls_refcount_put(op, ref_tag, true);
       rgw::AioResultList completed = aio->get(obj.obj,
                                               rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield),
@@ -602,6 +618,7 @@ namespace rgw::dedup {
 
       ObjectWriteOperation op;
       cls_refcount_get(op, ref_tag, true);
+      d_ctl.metadata_access_throttle.acquire();
       ldpp_dout(dpp, 20) << __func__ << "::inc ref-count on tail object: " << raw_obj.oid << dendl;
       rgw::AioResultList completed = aio->get(obj.obj,
                                               rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield),
@@ -782,6 +799,7 @@ namespace rgw::dedup {
     ldpp_dout(dpp, 20) << __func__ << "::ref_tag=" << ref_tag << dendl;
     int ret = inc_ref_count_by_manifest(ref_tag, src_oid, src_manifest);
     if (ret == 0) {
+      d_ctl.metadata_access_throttle.acquire();
       ldpp_dout(dpp, 20) << __func__ << "::send TGT CLS (Shared_Manifest)" << dendl;
       ret = tgt_ioctx.operate(tgt_oid, &tgt_op);
       if (unlikely(ret != 0)) {
@@ -809,6 +827,7 @@ namespace rgw::dedup {
           p_stats->set_hash_attrs++;
         }
 
+        d_ctl.metadata_access_throttle.acquire();
         ldpp_dout(dpp, 20) << __func__ <<"::send SRC CLS (Shared_Manifest)"<< dendl;
         ret = src_ioctx.operate(src_oid, &src_op);
         if (unlikely(ret != 0)) {
@@ -1075,6 +1094,7 @@ namespace rgw::dedup {
       return 0;
     }
 
+    d_ctl.metadata_access_throttle.acquire();
     ret = p_obj->get_obj_attrs(null_yield, dpp);
     if (unlikely(ret < 0)) {
       p_stats->ingress_failed_get_obj_attrs++;
@@ -1393,6 +1413,7 @@ namespace rgw::dedup {
         }
       }
 
+      p_stats->ingress_slabs++;
       (*p_slab_count)++;
       failure_count = 0;
       unsigned slab_rec_count = 0;
@@ -1646,6 +1667,7 @@ namespace rgw::dedup {
       const string& oid = oids[current_shard];
       rgw_cls_list_ret result;
       librados::ObjectReadOperation op;
+      d_ctl.bucket_index_throttle.acquire();
       // get bucket-indices of @current_shard
       cls_rgw_bucket_list_op(op, marker, null_prefix, null_delimiter, max_entries,
                              list_versions, &result);
@@ -1780,7 +1802,7 @@ namespace rgw::dedup {
     display_table_stat_counters(dpp, p_stats);
 
     ldpp_dout(dpp, 10) << __func__ << "::MD5 Loop::" << d_ctl.dedup_type << dendl;
-    if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_FULL) {
+    if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_EXEC) {
       for (work_shard_t worker_id = 0; worker_id < num_work_shards; worker_id++) {
         remove_slabs(worker_id, md5_shard, slab_count_arr[worker_id]);
       }
@@ -2034,6 +2056,8 @@ namespace rgw::dedup {
                                                 &worker_stats,raw_mem, raw_mem_size);
     if (ret == 0) {
       worker_stats.duration = ceph_clock_now() - start_time;
+      worker_stats.bidx_throttle_sleep_events = d_ctl.bucket_index_throttle.get_sleep_events();
+      worker_stats.bidx_throttle_sleep_time_usec = d_ctl.bucket_index_throttle.get_sleep_time_usec();
       d_cluster.mark_work_shard_token_completed(store, worker_id, &worker_stats);
       ldpp_dout(dpp, 10) << "stat counters [worker]:\n" << worker_stats << dendl;
       ldpp_dout(dpp, 10) << "Shard Process Duration   = "
@@ -2041,6 +2065,7 @@ namespace rgw::dedup {
     }
     //ldpp_dout(dpp, 0) << __func__ << "::sleep for 2 seconds\n" << dendl;
     //std::this_thread::sleep_for(std::chrono::seconds(2));
+    //std::this_thread::sleep_forstd::chrono::microseconds(usec_timeout);
     return ret;
   }
 
@@ -2058,6 +2083,9 @@ namespace rgw::dedup {
     int ret = objects_dedup_single_md5_shard(&table, md5_shard, &md5_stats, num_work_shards);
     if (ret == 0) {
       md5_stats.duration = ceph_clock_now() - start_time;
+      md5_stats.md_throttle_sleep_events = d_ctl.metadata_access_throttle.get_sleep_events();
+      md5_stats.md_throttle_sleep_time_usec = d_ctl.metadata_access_throttle.get_sleep_time_usec();
+
       d_cluster.mark_md5_shard_token_completed(store, md5_shard, &md5_stats);
       ldpp_dout(dpp, 10) << "stat counters [md5]:\n" << md5_stats << dendl;
       ldpp_dout(dpp, 10) << "Shard Process Duration   = "
@@ -2247,7 +2275,7 @@ namespace rgw::dedup {
     ldpp_dout(dpp, 10) <<__func__ << "::" << *p_epoch << dendl;
     d_ctl.dedup_type = p_epoch->dedup_type;
 #ifdef FULL_DEDUP_SUPPORT
-    ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL ||
+    ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC ||
                 d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE);
 #else
     ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE);
@@ -2290,14 +2318,27 @@ namespace rgw::dedup {
   {
     int ret = 0;
     int32_t urgent_msg = URGENT_MSG_NONE;
+    auto bl_iter = bl.cbegin();
     try {
-      auto bl_iter = bl.cbegin();
       ceph::decode(urgent_msg, bl_iter);
     } catch (buffer::error& err) {
       ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad urgent_msg" << dendl;
-      ret = -EINVAL;
+      cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL);
+      return;
+    }
+    ldpp_dout(dpp, 5) << __func__ << "::" << get_urgent_msg_names(urgent_msg) << dendl;
+
+    throttle_msg_t throttle_msg;
+    if (urgent_msg == URGENT_MSG_THROTTLE) {
+      try {
+        decode(throttle_msg, bl_iter);
+        ldpp_dout(dpp, 5) << __func__ << "::" << throttle_msg << dendl;
+      } catch (buffer::error& err) {
+        ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad throttle_msg" << dendl;
+        cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL);
+        return;
+      }
     }
-    ldpp_dout(dpp, 5) << __func__ << "::-->" << get_urgent_msg_names(urgent_msg) << dendl;
 
     // use lock to prevent concurrent pause/resume requests
     std::unique_lock cond_lock(d_cond_mutex); // [------>open lock block
@@ -2358,6 +2399,24 @@ namespace rgw::dedup {
         ldpp_dout(dpp, 5) << __func__ << "::dedup is not paused->nothing to do" << dendl;
       }
       break;
+    case URGENT_MSG_THROTTLE:
+      for (auto action : throttle_msg.vec) {
+        if (action.op_type == BUCKET_INDEX_OP) {
+          d_ctl.bucket_index_throttle.set_max_calls_per_sec(action.limit);
+        }
+        else if (action.op_type == METADATA_ACCESS_OP) {
+          d_ctl.metadata_access_throttle.set_max_calls_per_sec(action.limit);
+        }
+        else if (action.op_type == STAT) {
+          ldpp_dout(dpp, 10) << __func__ << "::Throttle STAT" << dendl;
+        }
+        else {
+          ldpp_dout(dpp, 1) << __func__ << "::unexpected throttle_msg "
+                            << action.op_type << dendl;
+          ret = -EINVAL;
+        }
+      }
+      break;
     default:
       ldpp_dout(dpp, 1) << __func__ << "::unexpected urgent_msg: "
                         << get_urgent_msg_names(urgent_msg) << dendl;
index 48dafe38cb1ecde7194c0d0c45f5ebc9de5b8825..e88b9724dad243b8f79e3c110ce6e05e3ba58af1 100644 (file)
@@ -55,6 +55,8 @@ namespace rgw::dedup {
     bool remote_pause_req   = false;
     bool remote_paused      = false;
     bool remote_restart_req = false;
+    Throttle bucket_index_throttle;
+    Throttle metadata_access_throttle;
   };
   std::ostream& operator<<(std::ostream &out, const control_t &ctl);
   void encode(const control_t& ctl, ceph::bufferlist& bl);
@@ -230,9 +232,6 @@ namespace rgw::dedup {
     librados::IoCtx d_dedup_cluster_ioctx;
     utime_t  d_heart_beat_last_update;
     unsigned d_heart_beat_max_elapsed_sec;
-
-    // A pool with 6 billion objects has a  1/(2^64) chance for collison with a 128bit MD5
-    uint64_t d_max_protected_objects   = (6ULL * 1024 * 1024 * 1024);
     uint64_t d_all_buckets_obj_count   = 0;
     uint64_t d_all_buckets_obj_size    = 0;
     // we don't benefit from deduping RGW objects smaller than head-object size
index 7bdb308af87ccc2ef700a129bbe8d9c141f65a69..e50d82bf031702ac8e22e9ef7ca063734cbb5b72 100644 (file)
@@ -899,9 +899,15 @@ namespace rgw::dedup {
     if (!has_incomplete_shards) {
       return;
     }
+    //utime_t now = ceph_clock_now();
     Formatter::ArraySection array_section{*fmt, "incomplete_shards"};
     for (unsigned shard = 0; shard < num_shards; shard++) {
-      if (sp_arr[shard].is_completed() ) {
+      if (sp_arr[shard].is_completed()) {
+        continue;
+      }
+      if (sp_arr[shard].was_not_started() ) {
+        Formatter::ObjectSection object_section{*fmt, "pending shard:"};
+        fmt->dump_unsigned("shard_id", shard);
         continue;
       }
       Formatter::ObjectSection object_section{*fmt, "shard_progress"};
@@ -910,6 +916,8 @@ namespace rgw::dedup {
       fmt->dump_unsigned("progress_a", sp_arr[shard].progress_a);
       fmt->dump_unsigned("progress_b", sp_arr[shard].progress_b);
       fmt->dump_stream("last updated") << sp_arr[shard].update_time;
+      utime_t elapsed = sp_arr[shard].update_time - sp_arr[shard].creation_time;
+      fmt->dump_unsigned("time elapsed (sec)", elapsed.tv.tv_sec);
     }
   }
 
@@ -1185,21 +1193,34 @@ namespace rgw::dedup {
   }
 
   //---------------------------------------------------------------------------
-  // command-line called from radosgw-admin.cc
-  int cluster::dedup_control(rgw::sal::RadosStore *store,
-                             const DoutPrefixProvider *dpp,
-                             urgent_msg_t urgent_msg)
+  static void report_throttle_state(const struct rgw::dedup::control_t &ctl)
   {
-    ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = "
-                       << get_urgent_msg_names(urgent_msg) << dendl;
-    if (urgent_msg != URGENT_MSG_RESUME  &&
-        urgent_msg != URGENT_MSG_PASUE   &&
-        urgent_msg != URGENT_MSG_RESTART &&
-        urgent_msg != URGENT_MSG_ABORT) {
-      ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl;
-      return -EINVAL;
+    if (!ctl.bucket_index_throttle.is_disabled()) {
+      std::cout << "bucket-index throttle="
+                << ctl.bucket_index_throttle.get_max_calls_per_second()
+                << std::endl;
+    }
+    else {
+      std::cout << "bucket-index throttle is disabled" << std::endl;
     }
 
+    if (!ctl.metadata_access_throttle.is_disabled()) {
+      std::cout << "metadata throttle="
+                << ctl.metadata_access_throttle.get_max_calls_per_second()
+                << std::endl;
+    }
+    else {
+      std::cout << "metadata throttle is disabled" << std::endl;
+    }
+  }
+
+  //---------------------------------------------------------------------------
+  // command-line called from radosgw-admin.cc
+  int cluster::dedup_control_bl(rgw::sal::RadosStore *store,
+                                const DoutPrefixProvider *dpp,
+                                urgent_msg_t urgent_msg,
+                                bufferlist urgent_msg_bl)
+  {
     librados::IoCtx ctl_ioctx;
     int ret = get_control_ioctx(store, dpp, ctl_ioctx);
     if (unlikely(ret != 0)) {
@@ -1208,8 +1229,7 @@ namespace rgw::dedup {
 
     // 10 seconds timeout
     const uint64_t timeout_ms = 10*1000;
-    bufferlist reply_bl, urgent_msg_bl;
-    ceph::encode(urgent_msg, urgent_msg_bl);
+    bufferlist reply_bl;
     ret = rgw_rados_notify(dpp, ctl_ioctx, DEDUP_WATCH_OBJ, urgent_msg_bl,
                            timeout_ms, &reply_bl, null_yield);
     if (ret < 0) {
@@ -1235,6 +1255,9 @@ namespace rgw::dedup {
         struct rgw::dedup::control_t ctl;
         decode(ctl, iter);
         ldpp_dout(dpp, 10) << __func__ << "::++ACK::ctl=" << ctl << "::ret=" << ret << dendl;
+        if (urgent_msg == URGENT_MSG_THROTTLE) {
+          report_throttle_state(ctl);
+        }
       } catch (buffer::error& err) {
         ldpp_dout(dpp, 1) << __func__ << "::failed decoding notify acks" << dendl;
         return -EINVAL;
@@ -1245,11 +1268,31 @@ namespace rgw::dedup {
         return ret;
       }
     }
-    ldpp_dout(dpp, 10) << __func__ << "::" << get_urgent_msg_names(urgent_msg)
-                       << " finished successfully!" << dendl;
+    ldpp_dout(dpp, 10) << __func__ << "::finished successfully!" << dendl;
     return 0;
   }
 
+  //---------------------------------------------------------------------------
+  // command-line called from radosgw-admin.cc
+  int cluster::dedup_control(rgw::sal::RadosStore *store,
+                             const DoutPrefixProvider *dpp,
+                             urgent_msg_t urgent_msg)
+  {
+    ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = "
+                       << get_urgent_msg_names(urgent_msg) << dendl;
+    if (urgent_msg != URGENT_MSG_RESUME  &&
+        urgent_msg != URGENT_MSG_PASUE   &&
+        urgent_msg != URGENT_MSG_RESTART &&
+        urgent_msg != URGENT_MSG_ABORT) {
+      ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl;
+      return -EINVAL;
+    }
+
+    bufferlist urgent_msg_bl;
+    ceph::encode(urgent_msg, urgent_msg_bl);
+    return dedup_control_bl(store, dpp, urgent_msg, urgent_msg_bl);
+  }
+
   //---------------------------------------------------------------------------
   // command-line called from radosgw-admin.cc
   int cluster::dedup_restart_scan(rgw::sal::RadosStore *store,
@@ -1289,7 +1332,7 @@ namespace rgw::dedup {
     ldpp_dout(dpp, 10) << __func__ << dedup_type << dendl;
 #ifdef FULL_DEDUP_SUPPORT
     ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE ||
-                dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL);
+                dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC);
 #else
     ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE);
 #endif
index 64b2c54a4fa28a1e2f69f2c62d8ca212ec892a79..1b5b9cdc175684f793728d047e3c8f2ee5725228 100644 (file)
@@ -92,6 +92,10 @@ namespace rgw::dedup {
                             uint64_t notify_id,
                             uint64_t cookie,
                             int status);
+    static int   dedup_control_bl(rgw::sal::RadosStore *store,
+                                  const DoutPrefixProvider *dpp,
+                                  urgent_msg_t urgent_msg,
+                                  bufferlist urgent_msg_bl);
     static int   dedup_control(rgw::sal::RadosStore *store,
                                const DoutPrefixProvider *dpp,
                                urgent_msg_t urgent_msg);
index fd15bbc372d8a8fbd7d0599538f0d284f66c5f91..29d18f37a0404e55e08fb8f64a29c7d548ec05b4 100644 (file)
@@ -616,6 +616,9 @@ namespace rgw::dedup {
     unsigned len = (p_curr_block + 1 - p_arr) * sizeof(disk_block_t);
     bufferlist bl = bufferlist::static_from_mem((char*)p_arr, len);
     int ret = store_slab(ioctx, bl, d_md5_shard, d_worker_id, d_seq_number, dpp);
+    if (unlikely(ret != 0)) {
+      p_stats->write_slab_failure++;
+    }
     // Need to make sure the call to rgw_put_system_obj was fully synchronous
 
     // d_seq_number++ must be called **after** flush!!
index baadee5aeef5437133fe6214983b789c197fbe36..a32f4ecbd7e8af5b15cde4c2a91b4b10229a89c5 100644 (file)
@@ -25,8 +25,8 @@ namespace rgw::dedup {
     else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE) {
       out << "DEDUP_TYPE_ESTIMATE";
     }
-    else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL) {
-      out << "DEDUP_TYPE_FULL";
+    else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC) {
+      out << "DEDUP_TYPE_EXEC (full dedup)";
     }
     else {
       out << "\n*** unexpected dedup_type ***\n";
@@ -35,6 +35,108 @@ namespace rgw::dedup {
     return out;
   }
 
+  //---------------------------------------------------------------------------
+  void validate_max_calls_offset()
+  {
+    // max_calls must be the first data member to guarantee 8 Bytes alignment
+    // this will allow us to avoid using std::atomic (which is expensive)
+    static_assert(offsetof(Throttle, max_calls) == 0);
+  }
+
+  //---------------------------------------------------------------------------
+  void encode(const Throttle& t, ceph::bufferlist& bl)
+  {
+    ENCODE_START(1, 1, bl);
+    encode(t.get_max_calls_per_second(), bl);
+    ENCODE_FINISH(bl);
+  }
+
+  //---------------------------------------------------------------------------
+  void decode(Throttle& t, ceph::bufferlist::const_iterator& bl)
+  {
+    DECODE_START(1, bl);
+    size_t max_calls_per_sec;
+    decode(max_calls_per_sec, bl);
+    t.set_max_calls_per_sec(max_calls_per_sec);
+    DECODE_FINISH(bl);
+  }
+
+  //---------------------------------------------------------------------------
+  std::ostream& operator<<(std::ostream &out, const throttle_action_t& msg)
+  {
+    if (msg.op_type == BUCKET_INDEX_OP) {
+      out << "Set Bucket Index Throttling to ";
+      if (msg.limit) {
+        out << msg.limit << " IOPS";
+      }
+      else {
+        out << "unlimited IOPS";
+      }
+    }
+    else if (msg.op_type == METADATA_ACCESS_OP) {
+      out << "Set Metadata Throttling to ";
+      if (msg.limit) {
+        out << msg.limit << " IOPS";
+      }
+      else {
+        out << "unlimited IOPS";
+      }
+    }
+    else if (msg.op_type == DATA_READ_WRITE_OP) {
+      out << "Set Read/Write Throttling to " << msg.limit << " MB/sec";
+    }
+    else {
+      out << "\n*** unexpected throttling type ***\n";
+    }
+
+    return out;
+  }
+
+  //---------------------------------------------------------------------------
+  std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg)
+  {
+    for (auto action : msg.vec) {
+      out << action << " :: ";
+    }
+    return out;
+  }
+
+  //---------------------------------------------------------------------------
+  void encode(const throttle_action_t& m, ceph::bufferlist& bl)
+  {
+    ENCODE_START(1, 1, bl);
+    encode((int)m.op_type, bl);
+    encode(m.limit, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  //---------------------------------------------------------------------------
+  void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl)
+  {
+    DECODE_START(1, bl);
+    int tmp;
+    decode(tmp, bl);
+    m.op_type = (op_type_t)tmp;
+    decode(m.limit, bl);
+    DECODE_FINISH(bl);
+  }
+
+  //---------------------------------------------------------------------------
+  void encode(const throttle_msg_t& m, ceph::bufferlist& bl)
+  {
+    ENCODE_START(1, 1, bl);
+    encode(m.vec, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  //---------------------------------------------------------------------------
+  void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl)
+  {
+    DECODE_START(1, bl);
+    decode(m.vec, bl);
+    DECODE_FINISH(bl);
+  }
+
   //---------------------------------------------------------------------------
   dedup_stats_t& dedup_stats_t::operator+=(const dedup_stats_t& other)
   {
@@ -244,6 +346,7 @@ namespace rgw::dedup {
     "URGENT_MSG_PASUE",
     "URGENT_MSG_RESUME",
     "URGENT_MSG_RESTART",
+    "URGENT_MSG_THROTTLE",
     "URGENT_MSG_INVALID"
   };
 
@@ -266,6 +369,9 @@ namespace rgw::dedup {
     this->egress_records += other.egress_records;
     this->egress_blocks += other.egress_blocks;
     this->egress_slabs += other.egress_slabs;
+    this->write_slab_failure += other.write_slab_failure;
+    this->bidx_throttle_sleep_events += other.bidx_throttle_sleep_events;
+    this->bidx_throttle_sleep_time_usec += other.bidx_throttle_sleep_time_usec;
     this->single_part_objs += other.single_part_objs;
     this->multipart_objs += other.multipart_objs;
     this->small_multipart_obj += other.small_multipart_obj;
@@ -302,8 +408,14 @@ namespace rgw::dedup {
 
     {
       Formatter::ObjectSection notify(*f, "notify");
+      if (this->bidx_throttle_sleep_events) {
+        f->dump_unsigned("Bucket-Index Throttle Sleep Events",
+                         this->bidx_throttle_sleep_events);
+        f->dump_unsigned("Bucket-Index Throttle Sleep Time (sec)",
+                         this->bidx_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND);
+      }
 
-      if(this->non_default_storage_class_objs) {
+      if (this->non_default_storage_class_objs) {
         f->dump_unsigned("non default storage class objs",
                          this->non_default_storage_class_objs);
         f->dump_unsigned("non default storage class objs bytes",
@@ -317,7 +429,7 @@ namespace rgw::dedup {
 
     {
       Formatter::ObjectSection skipped(*f, "skipped");
-      if(this->ingress_skip_too_small) {
+      if (this->ingress_skip_too_small) {
         f->dump_unsigned("Ingress skip: too small objs",
                          this->ingress_skip_too_small);
         f->dump_unsigned("Ingress skip: too small bytes",
@@ -334,7 +446,10 @@ namespace rgw::dedup {
 
     {
       Formatter::ObjectSection failed(*f, "failed");
-      if(this->ingress_corrupted_etag) {
+      if (this->write_slab_failure) {
+        f->dump_unsigned("Write SLAB failures", this->write_slab_failure);
+      }
+      if (this->ingress_corrupted_etag) {
         f->dump_unsigned("Corrupted ETAG", this->ingress_corrupted_etag);
       }
     }
@@ -360,6 +475,9 @@ namespace rgw::dedup {
     encode(w.egress_records, bl);
     encode(w.egress_blocks, bl);
     encode(w.egress_slabs, bl);
+    encode(w.write_slab_failure, bl);
+    encode(w.bidx_throttle_sleep_events, bl);
+    encode(w.bidx_throttle_sleep_time_usec, bl);
 
     encode(w.single_part_objs, bl);
     encode(w.multipart_objs, bl);
@@ -391,6 +509,9 @@ namespace rgw::dedup {
     decode(w.egress_records, bl);
     decode(w.egress_blocks, bl);
     decode(w.egress_slabs, bl);
+    decode(w.write_slab_failure, bl);
+    decode(w.bidx_throttle_sleep_events, bl);
+    decode(w.bidx_throttle_sleep_time_usec, bl);
     decode(w.single_part_objs, bl);
     decode(w.multipart_objs, bl);
     decode(w.small_multipart_obj, bl);
@@ -413,6 +534,7 @@ namespace rgw::dedup {
   {
     this->small_objs_stat               += other.small_objs_stat;
     this->big_objs_stat                 += other.big_objs_stat;
+    this->ingress_slabs                 += other.ingress_slabs;
     this->ingress_failed_load_bucket    += other.ingress_failed_load_bucket;
     this->ingress_failed_get_object     += other.ingress_failed_get_object;
     this->ingress_failed_get_obj_attrs  += other.ingress_failed_get_obj_attrs;
@@ -451,6 +573,8 @@ namespace rgw::dedup {
     this->dup_head_bytes          += other.dup_head_bytes;
 
     this->failed_dedup            += other.failed_dedup;
+    this->md_throttle_sleep_events    += other.md_throttle_sleep_events;
+    this->md_throttle_sleep_time_usec += other.md_throttle_sleep_time_usec;
     this->failed_table_load       += other.failed_table_load;
     this->failed_map_overflow     += other.failed_map_overflow;
     return *this;
@@ -476,6 +600,7 @@ namespace rgw::dedup {
 
       f->dump_unsigned("Total processed objects", this->processed_objects);
       f->dump_unsigned("Loaded objects", this->loaded_objects);
+      f->dump_unsigned("Ingress Slabs", this->ingress_slabs);
       f->dump_unsigned("Set Shared-Manifest SRC", this->set_shared_manifest_src);
       f->dump_unsigned("Deduped Obj (this cycle)", this->deduped_objects);
       f->dump_unsigned("Deduped Bytes(this cycle)", this->deduped_objects_bytes);
@@ -507,6 +632,12 @@ namespace rgw::dedup {
 
     {
       Formatter::ObjectSection notify(*f, "notify");
+      if (this->md_throttle_sleep_events) {
+        f->dump_unsigned("Metadata Throttle Sleep Events", this->md_throttle_sleep_events);
+        f->dump_unsigned("Metadata Throttle Sleep Time (sec)",
+                         this->md_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND);
+      }
+
       if (this->failed_table_load) {
         f->dump_unsigned("Failed Table Load", this->failed_table_load);
       }
@@ -601,6 +732,7 @@ namespace rgw::dedup {
 
     encode(m.small_objs_stat, bl);
     encode(m.big_objs_stat, bl);
+    encode(m.ingress_slabs, bl);
     encode(m.ingress_failed_load_bucket, bl);
     encode(m.ingress_failed_get_object, bl);
     encode(m.ingress_failed_get_obj_attrs, bl);
@@ -638,6 +770,8 @@ namespace rgw::dedup {
     encode(m.deduped_objects_bytes, bl);
     encode(m.dup_head_bytes, bl);
     encode(m.failed_dedup, bl);
+    encode(m.md_throttle_sleep_events, bl);
+    encode(m.md_throttle_sleep_time_usec, bl);
     encode(m.failed_table_load, bl);
     encode(m.failed_map_overflow, bl);
 
@@ -651,6 +785,7 @@ namespace rgw::dedup {
     DECODE_START(1, bl);
     decode(m.small_objs_stat, bl);
     decode(m.big_objs_stat, bl);
+    decode(m.ingress_slabs, bl);
     decode(m.ingress_failed_load_bucket, bl);
     decode(m.ingress_failed_get_object, bl);
     decode(m.ingress_failed_get_obj_attrs, bl);
@@ -688,6 +823,8 @@ namespace rgw::dedup {
     decode(m.deduped_objects_bytes, bl);
     decode(m.dup_head_bytes, bl);
     decode(m.failed_dedup, bl);
+    decode(m.md_throttle_sleep_events, bl);
+    decode(m.md_throttle_sleep_time_usec, bl);
     decode(m.failed_table_load, bl);
     decode(m.failed_map_overflow, bl);
 
index f008fcaba38b5f4a7246e0633a7ba4eec92db795..88d440582d11733fc37fa51bdc863b3051977808 100644 (file)
 #include "common/Formatter.h"
 #include "common/ceph_json.h"
 #include <time.h>
+#include <chrono>
 #include "include/utime.h"
 #include "include/encoding.h"
 #include "common/dout.h"
 
 #define FULL_DEDUP_SUPPORT
 namespace rgw::dedup {
+  using namespace std::chrono;
   using work_shard_t   = uint16_t;
   using md5_shard_t    = uint16_t;
 
+  const uint64_t MICROSECONDS_PER_SECOND = 1000000;
   // settings to help debug small systems
   const work_shard_t MIN_WORK_SHARD = 2;
   const md5_shard_t  MIN_MD5_SHARD  = 4;
@@ -57,7 +60,7 @@ namespace rgw::dedup {
   enum dedup_req_type_t {
     DEDUP_TYPE_NONE     = 0,
     DEDUP_TYPE_ESTIMATE = 1,
-    DEDUP_TYPE_FULL     = 2
+    DEDUP_TYPE_EXEC     = 2
   };
 
   std::ostream& operator<<(std::ostream &out, const dedup_req_type_t& dedup_type);
@@ -85,6 +88,93 @@ namespace rgw::dedup {
     uint8_t flags;
   };
 
+  class alignas(8) Throttle {
+    friend void validate_max_calls_offset();
+  public:
+    // @max_calls_per_sec - max requests per second allowed, 0 means unlimited
+    // disbaled by default
+    Throttle(size_t max_calls_per_sec=0) {
+      set_max_calls_per_sec(max_calls_per_sec);
+      reset();
+    }
+
+    // set the number of calls per second
+    // zero means unlimited
+    inline void set_max_calls_per_sec(uint32_t max_calls_per_sec) {
+      max_calls = max_calls_per_sec;
+    }
+
+    inline size_t get_max_calls_per_second() const {
+      return max_calls;
+    }
+
+    inline uint64_t get_sleep_events() const {
+      return sleep_events;
+    }
+
+    inline uint64_t get_sleep_time_usec() const {
+      return sleep_time_usec;
+    }
+
+    inline void disable() {
+      set_max_calls_per_sec(0);
+    }
+
+    inline bool is_disabled() const {
+      return (max_calls == 0);
+    }
+
+    // Blocks until allowed to proceed
+    void acquire() {
+      if (is_disabled()) {
+        return;
+      }
+      // Should work fine without atomic since acquire is single threaded
+      const steady_clock::time_point now = steady_clock::now();
+      uint64_t elapsed_usec = duration_cast<microseconds>(now - last_reset).count();
+      if (elapsed_usec >= MICROSECONDS_PER_SECOND || last_reset > now) {
+        // Renew tokens if a second (or more) has passed since last_reset
+        reset();
+        --tokens;
+        return;
+      }
+
+      if (tokens > 0) {
+        --tokens;
+        return;
+      }
+
+      // if reached here, all tokens were exhausted, wait for the next time slot
+      ceph_assert(MICROSECONDS_PER_SECOND > elapsed_usec);
+      uint64_t wait_time_usec = MICROSECONDS_PER_SECOND - elapsed_usec;
+      sleep_events ++;
+      sleep_time_usec += wait_time_usec;
+
+      std::this_thread::sleep_for(microseconds(wait_time_usec));
+      // After sleeping, reset and return
+      reset();
+      tokens --;
+    }
+
+  private:
+    void reset() {
+      // atomic operation because it is 8 Bytes aligned
+      tokens = max_calls;
+      last_reset = steady_clock::now();
+    }
+
+    // @max_calls must be the first data member to guarantee 8 Bytes alignment
+    uint32_t max_calls;
+    uint32_t tokens;
+    steady_clock::time_point last_reset;
+    uint64_t sleep_events = 0;
+    uint64_t sleep_time_usec = 0;
+  } __attribute__ ((aligned (8)));
+
+  void validate_max_calls_offset();
+  void encode(const Throttle& t, ceph::bufferlist& bl);
+  void decode(Throttle& t, ceph::bufferlist::const_iterator& bl);
+
   struct dedup_stats_t {
     dedup_stats_t& operator+=(const dedup_stats_t& other);
 
@@ -107,6 +197,9 @@ namespace rgw::dedup {
     uint64_t egress_records = 0;
     uint64_t egress_blocks = 0;
     uint64_t egress_slabs = 0;
+    uint64_t write_slab_failure = 0;
+    uint64_t bidx_throttle_sleep_events = 0;
+    uint64_t bidx_throttle_sleep_time_usec = 0;
 
     uint64_t single_part_objs = 0;
     uint64_t multipart_objs = 0;
@@ -139,6 +232,7 @@ namespace rgw::dedup {
 
     dedup_stats_t small_objs_stat;
     dedup_stats_t big_objs_stat;
+    uint64_t ingress_slabs = 0;
     uint64_t ingress_failed_load_bucket = 0;
     uint64_t ingress_failed_get_object = 0;
     uint64_t ingress_failed_get_obj_attrs = 0;
@@ -178,6 +272,8 @@ namespace rgw::dedup {
     uint64_t deduped_objects_bytes = 0;
     uint64_t dup_head_bytes = 0;
     uint64_t failed_dedup = 0;
+    uint64_t md_throttle_sleep_events = 0;
+    uint64_t md_throttle_sleep_time_usec = 0;
     uint64_t failed_table_load = 0;
     uint64_t failed_map_overflow = 0;
     utime_t  duration = {0, 0};
@@ -218,15 +314,39 @@ namespace rgw::dedup {
   }
 
   enum urgent_msg_t {
-    URGENT_MSG_NONE    = 0,
-    URGENT_MSG_ABORT   = 1,
-    URGENT_MSG_PASUE   = 2,
-    URGENT_MSG_RESUME  = 3,
-    URGENT_MSG_RESTART = 4,
-    URGENT_MSG_INVALID = 5
+    URGENT_MSG_NONE = 0,
+    URGENT_MSG_ABORT,
+    URGENT_MSG_PASUE,
+    URGENT_MSG_RESUME,
+    URGENT_MSG_RESTART,
+    URGENT_MSG_THROTTLE,
+    URGENT_MSG_INVALID
   };
-
   const char* get_urgent_msg_names(int msg);
+  enum op_type_t {
+    NO_OP = 0,
+    BUCKET_INDEX_OP,
+    METADATA_ACCESS_OP,
+    DATA_READ_WRITE_OP,
+    STAT,
+    INVALID_OP
+  };
+
+  struct throttle_action_t {
+    op_type_t op_type;
+    uint32_t  limit;
+  };
+  void encode(const throttle_action_t& m, ceph::bufferlist& bl);
+  void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl);
+  std::ostream& operator<<(std::ostream &out, const throttle_action_t& action);
+  struct throttle_msg_t {
+    std::vector<throttle_action_t> vec;
+  };
+
+  std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg);
+  void encode(const throttle_msg_t& m, ceph::bufferlist& bl);
+  void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl);
+
   bool hex2int(const char *p, const char *p_end, uint64_t *p_val);
   bool parse_etag_string(const std::string& etag, parsed_etag_t *parsed_etag);
   void etag_to_bufferlist(uint64_t md5_high, uint64_t md5_low, uint16_t num_parts,
index ca26e0a3091d01fa295cde321a8f2fa6e4124f0a..48b254d6358f209cf22162253602d4f9de9fac2c 100644 (file)
@@ -157,10 +157,11 @@ void usage()
   cout << "  caps rm                          remove user capabilities\n";
   cout << "  dedup stats                      Display dedup statistics from the last run\n";
   cout << "  dedup estimate                   Runs dedup in estimate mode (no changes will be made)\n";
-  cout << "  dedup restart                    Restart dedup; must include --yes-i-really-mean-it to activate\n";
+  cout << "  dedup exec                       Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate\n";
   cout << "  dedup abort                      Abort dedup\n";
   cout << "  dedup pause                      Pause dedup\n";
   cout << "  dedup resume                     Resume paused dedup\n";
+  cout << "  dedup throttle                   Throttle dedup execution\n";
   cout << "  subuser create                   create a new subuser\n" ;
   cout << "  subuser modify                   modify subuser\n";
   cout << "  subuser rm                       remove subuser\n";
@@ -490,6 +491,10 @@ void usage()
   cout << "   --disable-feature                 disable a zone/zonegroup feature\n";
   cout << "\n";
   cout << "<date> := \"YYYY-MM-DD[ hh:mm:ss]\"\n";
+  cout << "\nDedup throttle options:\n";
+  cout << "   --max-bucket-index-ops        specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited\n";
+  cout << "   --max-metadata-ops            specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited\n";
+  cout << "   --stat                        display dedup throttle setting\n";
   cout << "\nQuota options:\n";
   cout << "   --max-objects                 specify max objects (negative value to disable)\n";
   cout << "   --max-size                    specify max size (in B/K/M/G/T, negative value to disable)\n";
@@ -756,9 +761,10 @@ enum class OPT {
   DEDUP_STATS,
   DEDUP_ESTIMATE,
   DEDUP_ABORT,
-  DEDUP_RESTART,
+  DEDUP_EXEC,
   DEDUP_PAUSE,
   DEDUP_RESUME,
+  DEDUP_THROTTLE,
   GC_LIST,
   GC_PROCESS,
   LC_LIST,
@@ -1009,9 +1015,11 @@ static SimpleCmd::Commands all_cmds = {
   { "dedup stats", OPT::DEDUP_STATS },
   { "dedup estimate", OPT::DEDUP_ESTIMATE },
   { "dedup abort", OPT::DEDUP_ABORT },
-  { "dedup restart", OPT::DEDUP_RESTART },
+  { "dedup restart", OPT::DEDUP_EXEC },
+  { "dedup exec", OPT::DEDUP_EXEC },
   { "dedup pause", OPT::DEDUP_PAUSE },
   { "dedup resume", OPT::DEDUP_RESUME },
+  { "dedup throttle", OPT::DEDUP_THROTTLE },
   { "gc list", OPT::GC_LIST },
   { "gc process", OPT::GC_PROCESS },
   { "lc list", OPT::LC_LIST },
@@ -3637,6 +3645,7 @@ int main(int argc, const char **argv)
   int skip_zero_entries = false;  // log show
   int purge_keys = false;
   int yes_i_really_mean_it = false;
+  int throttle_stat = false;
   int delete_child_objects = false;
   int fix = false;
   int remove_bad = false;
@@ -3689,12 +3698,16 @@ int main(int argc, const char **argv)
   int64_t max_write_ops = 0;
   int64_t max_read_bytes = 0;
   int64_t max_write_bytes = 0;
+  uint32_t max_bucket_index_ops = 0;
+  uint32_t max_metadata_ops = 0;
   bool have_max_objects = false;
   bool have_max_size = false;
   bool have_max_write_ops = false;
   bool have_max_read_ops = false;
   bool have_max_write_bytes = false;
   bool have_max_read_bytes = false;
+  bool have_max_bucket_index_ops = false;
+  bool have_max_metadata_ops = false;
   int include_all = false;
   int allow_unordered = false;
 
@@ -4003,6 +4016,20 @@ int main(int argc, const char **argv)
         return EINVAL;
       }
       have_max_write_bytes = true;
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-bucket-index-ops", (char*)NULL)) {
+      max_bucket_index_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+      if (!err.empty()) {
+       cerr << "ERROR: failed to parse max bucket index ops: " << err << std::endl;
+       return EINVAL;
+      }
+      have_max_bucket_index_ops = true;
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-metadata-ops", (char*)NULL)) {
+      max_metadata_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+      if (!err.empty()) {
+       cerr << "ERROR: failed to parse max metadata ops: " << err << std::endl;
+       return EINVAL;
+      }
+      have_max_metadata_ops = true;
     } else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) {
       date = val;
       if (end_date.empty())
@@ -4097,6 +4124,8 @@ int main(int argc, const char **argv)
       // do nothing
     } else if (ceph_argparse_binary_flag(args, i, &yes_i_really_mean_it, NULL, "--yes-i-really-mean-it", (char*)NULL)) {
       // do nothing
+    } else if (ceph_argparse_binary_flag(args, i, &throttle_stat, NULL, "--stat", (char*)NULL)) {
+      // do nothing
     } else if (ceph_argparse_binary_flag(args, i, &fix, NULL, "--fix", (char*)NULL)) {
       // do nothing
     } else if (ceph_argparse_binary_flag(args, i, &remove_bad, NULL, "--remove-bad", (char*)NULL)) {
@@ -4514,9 +4543,10 @@ int main(int argc, const char **argv)
                         OPT::DEDUP_STATS,
                         OPT::DEDUP_ESTIMATE,
                         OPT::DEDUP_ABORT,     // TBD - not READ-ONLY
-                        OPT::DEDUP_RESTART,   // TBD - not READ-ONLY
+                        OPT::DEDUP_EXEC,   // TBD - not READ-ONLY
                         OPT::DEDUP_PAUSE,
                         OPT::DEDUP_RESUME,
+                        OPT::DEDUP_THROTTLE,
                         OPT::GC_LIST,
                         OPT::LC_LIST,
                         OPT::ORPHANS_LIST_JOBS,
@@ -9167,7 +9197,8 @@ next:
       opt_cmd == OPT::DEDUP_ABORT    ||
       opt_cmd == OPT::DEDUP_PAUSE    ||
       opt_cmd == OPT::DEDUP_RESUME   ||
-      opt_cmd == OPT::DEDUP_RESTART) {
+      opt_cmd == OPT::DEDUP_THROTTLE ||
+      opt_cmd == OPT::DEDUP_EXEC) {
 
     using namespace rgw::dedup;
     rgw::sal::RadosStore *store = dynamic_cast<rgw::sal::RadosStore*>(driver);
@@ -9188,7 +9219,41 @@ next:
       return ret;
     }
 
-    if (opt_cmd == OPT::DEDUP_ABORT || opt_cmd == OPT::DEDUP_PAUSE || opt_cmd == OPT::DEDUP_RESUME) {
+    if (opt_cmd == OPT::DEDUP_THROTTLE) {
+      bufferlist urgent_msg_bl;
+      urgent_msg_t urgent_msg = URGENT_MSG_THROTTLE;
+      ceph::encode(urgent_msg, urgent_msg_bl);
+      throttle_msg_t throttle_msg;
+
+      if (throttle_stat) {
+       encode(throttle_msg, urgent_msg_bl);
+       return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl);
+      }
+
+      if (unlikely(!have_max_bucket_index_ops && !have_max_metadata_ops)) {
+       std::cerr << "dedup throttle must set either --max-bucket-index-ops or --max-metadata-ops" << std::endl;
+       return EINVAL;
+      }
+
+      if (have_max_bucket_index_ops) {
+       throttle_action_t action = { .op_type = BUCKET_INDEX_OP,
+                                    .limit = max_bucket_index_ops};
+       throttle_msg.vec.push_back(action);
+      }
+
+      if (have_max_metadata_ops) {
+       throttle_action_t action = { .op_type = METADATA_ACCESS_OP,
+                                    .limit = max_metadata_ops};
+       throttle_msg.vec.push_back(action);
+      }
+
+      encode(throttle_msg, urgent_msg_bl);
+      return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl);
+    }
+
+    if (opt_cmd == OPT::DEDUP_ABORT  ||
+       opt_cmd == OPT::DEDUP_PAUSE  ||
+       opt_cmd == OPT::DEDUP_RESUME) {
       urgent_msg_t urgent_msg;
       if (opt_cmd == OPT::DEDUP_ABORT) {
        urgent_msg = URGENT_MSG_ABORT;
@@ -9202,7 +9267,7 @@ next:
       return cluster::dedup_control(store, dpp(), urgent_msg);
     }
 
-    if (opt_cmd == OPT::DEDUP_RESTART || opt_cmd == OPT::DEDUP_ESTIMATE) {
+    if (opt_cmd == OPT::DEDUP_EXEC || opt_cmd == OPT::DEDUP_ESTIMATE) {
       dedup_req_type_t dedup_type = dedup_req_type_t::DEDUP_TYPE_NONE;
       if (opt_cmd == OPT::DEDUP_ESTIMATE) {
        dedup_type = dedup_req_type_t::DEDUP_TYPE_ESTIMATE;
@@ -9214,7 +9279,7 @@ next:
               << std::endl;
          return EINVAL;
        }
-       dedup_type = dedup_req_type_t::DEDUP_TYPE_FULL;
+       dedup_type = dedup_req_type_t::DEDUP_TYPE_EXEC;
 #ifndef FULL_DEDUP_SUPPORT
        std::cerr << "Only dedup estimate is supported!" << std::endl;
        return EPERM;
index e5f1f69541d2a86e9db8bad88aa44f78ef7cc10d..1d7806a452c3a2603069b7b9277dadebfdf9797a 100644 (file)
     caps rm                          remove user capabilities
     dedup stats                      Display dedup statistics from the last run
     dedup estimate                   Runs dedup in estimate mode (no changes will be made)
-    dedup restart                    Restart dedup; must include --yes-i-really-mean-it to activate
+    dedup exec                       Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate
     dedup abort                      Abort dedup
     dedup pause                      Pause dedup
     dedup resume                     Resume paused dedup
+    dedup throttle                   Throttle dedup execution
     subuser create                   create a new subuser
     subuser modify                   modify subuser
     subuser rm                       remove subuser
   
   <date> := "YYYY-MM-DD[ hh:mm:ss]"
   
+  Dedup throttle options:
+     --max-bucket-index-ops        specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited
+     --max-metadata-ops            specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited
+     --stat                        display dedup throttle setting
+  
   Quota options:
      --max-objects                 specify max objects (negative value to disable)
      --max-size                    specify max size (in B/K/M/G/T, negative value to disable)
index 4b7e21fe1644d02318536618e22face5cfdf1bd7..f1dc15e73dc52604c0103a3108ef962b954b3cd2 100644 (file)
@@ -1069,20 +1069,31 @@ def read_dedup_stats(dry_run):
     return (dedup_work_was_completed, dedup_stats, dedup_ratio_estimate, dedup_ratio_actual)
 
 
+#-------------------------------------------------------------------------------
+def set_bucket_index_throttling(limit):
+    cmd = ['dedup', 'throttle', '--max-bucket-index-ops', str(limit)]
+    result = admin(cmd)
+    assert result[1] == 0
+    log.debug(result[0])
+
 #-------------------------------------------------------------------------------
 def exec_dedup_internal(expected_dedup_stats, dry_run, max_dedup_time):
+    ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs)
+    limit=random.randint(50, 200)
+    set_bucket_index_throttling(limit)
+
     log.debug("sending exec_dedup request: dry_run=%d", dry_run)
     if dry_run:
         result = admin(['dedup', 'estimate'])
         reset_full_dedup_stats(expected_dedup_stats)
     else:
-        result = admin(['dedup', 'restart', '--yes-i-really-mean-it'])
+        result = admin(['dedup', 'exec', '--yes-i-really-mean-it'])
 
     assert result[1] == 0
     log.debug("wait for dedup to complete")
 
     dedup_time = 0
-    dedup_timeout = 5
+    dedup_timeout = 3
     dedup_stats = Dedup_Stats()
     dedup_ratio=Dedup_Ratio()
     wait_for_completion = True
@@ -1095,7 +1106,10 @@ def exec_dedup_internal(expected_dedup_stats, dry_run, max_dedup_time):
             wait_for_completion = False
             log.info("dedup completed in %d seconds", dedup_time)
             return (dedup_time, ret[1], ret[2], ret[3])
-
+        else:
+            ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs)
+            limit=random.randint(50, 200)
+            set_bucket_index_throttling(limit)
 
 #-------------------------------------------------------------------------------
 def exec_dedup(expected_dedup_stats, dry_run, verify_stats=True):
@@ -1308,14 +1322,14 @@ def check_full_dedup_state():
     global full_dedup_state_was_checked
     global full_dedup_state_disabled
     log.debug("check_full_dedup_state:: sending FULL Dedup request")
-    result = admin(['dedup', 'restart', '--yes-i-really-mean-it'])
+    result = admin(['dedup', 'exec', '--yes-i-really-mean-it'])
     if result[1] == 0:
-        log.info("full dedup is enabled!")
+        log.debug("full dedup is enabled!")
         full_dedup_state_disabled = False
         result = admin(['dedup', 'abort'])
         assert result[1] == 0
     else:
-        log.info("full dedup is disabled, skip all full dedup tests")
+        log.debug("full dedup is disabled, skip all full dedup tests")
         full_dedup_state_disabled = True
 
     full_dedup_state_was_checked = True
@@ -2039,7 +2053,7 @@ def test_dedup_inc_with_remove_multi_tenants():
         # REMOVE some objects and update stats/expected
         src_record=0
         shared_manifest=0
-        valid_hash=0
+        valid_sha=0
         object_keys=[]
         files_sub=[]
         dedup_stats = Dedup_Stats()
@@ -2052,7 +2066,7 @@ def test_dedup_inc_with_remove_multi_tenants():
             log.debug("objects::%s::size=%d, num_copies=%d", filename, obj_size, num_copies_2);
             if num_copies_2:
                 if num_copies_2 > 1 and obj_size > RADOS_OBJ_SIZE:
-                    valid_hash += num_copies_2
+                    valid_sha += num_copies_2
                     src_record += 1
                     shared_manifest += (num_copies_2 - 1)
 
@@ -2076,7 +2090,7 @@ def test_dedup_inc_with_remove_multi_tenants():
         dedup_stats.deduped_obj_bytes=0
         dedup_stats.skip_src_record=src_record
         dedup_stats.skip_shared_manifest=shared_manifest
-        dedup_stats.valid_hash=valid_hash
+        dedup_stats.valid_hash=valid_sha
         dedup_stats.invalid_hash=0
         dedup_stats.set_hash=0
 
@@ -2119,7 +2133,7 @@ def test_dedup_inc_with_remove():
         # REMOVE some objects and update stats/expected
         src_record=0
         shared_manifest=0
-        valid_hash=0
+        valid_sha=0
         object_keys=[]
         files_sub=[]
         dedup_stats = Dedup_Stats()
@@ -2132,7 +2146,7 @@ def test_dedup_inc_with_remove():
             log.debug("objects::%s::size=%d, num_copies=%d", filename, obj_size, num_copies_2);
             if num_copies_2:
                 if num_copies_2 > 1 and obj_size > RADOS_OBJ_SIZE:
-                    valid_hash += num_copies_2
+                    valid_sha += num_copies_2
                     src_record += 1
                     shared_manifest += (num_copies_2 - 1)
 
@@ -2163,7 +2177,7 @@ def test_dedup_inc_with_remove():
         dedup_stats.deduped_obj_bytes=0
         dedup_stats.skip_src_record=src_record
         dedup_stats.skip_shared_manifest=shared_manifest
-        dedup_stats.valid_hash=valid_hash
+        dedup_stats.valid_hash=valid_sha
         dedup_stats.invalid_hash=0
         dedup_stats.set_hash=0
 
@@ -2322,7 +2336,7 @@ def test_dedup_small_multipart():
 #-------------------------------------------------------------------------------
 @pytest.mark.basic_test
 def test_dedup_large_scale_with_tenants():
-    #return
+    return
 
     if full_dedup_is_disabled():
         return
@@ -2342,7 +2356,7 @@ def test_dedup_large_scale_with_tenants():
 #-------------------------------------------------------------------------------
 @pytest.mark.basic_test
 def test_dedup_large_scale():
-    #return
+    return
 
     if full_dedup_is_disabled():
         return
@@ -2362,7 +2376,7 @@ def test_dedup_large_scale():
 #-------------------------------------------------------------------------------
 @pytest.mark.basic_test
 def test_empty_bucket():
-    #return
+    return
 
     if full_dedup_is_disabled():
         return