**************
- ``radosgw-admin dedup estimate``:
Starts a new dedup estimate session (aborting first existing session if exists).
-
It doesn't make any change to the existing system and will only collect statistics and report them.
-- ``radosgw-admin dedup restart --yes-i-really-mean-it``:
+- ``radosgw-admin dedup exec --yes-i-really-mean-it``:
Starts a new dedup session (aborting first existing session if exists).
- It will perfrom a full dedup, finding duplicated tail-objects and removing them.
+ It will perform a full dedup, finding duplicated tail-objects and removing them.
This command can lead to **data-loss** and should not be used on production data!!
- ``radosgw-admin dedup pause``:
Aborts an active dedup session and release all resources used by it.
- ``radosgw-admin dedup stats``:
Collects & displays last dedup statistics.
+- ``radosgw-admin dedup estimate``:
+ Starts a new dedup estimate session (aborting first existing session if exists).
+- ``radosgw-admin dedup throttle --max-bucket-index-ops=<count>``:
+ Specify max bucket-index requests per second allowed for a single RGW server during dedup, 0 means unlimited.
+- ``radosgw-admin dedup throttle --stat``:
+ Display dedup throttle setting.
***************
Skipped Objects
virtually always stored on a fast medium (SSD with heavy memory
caching).
+The admin can throttle the estimate process by setting a limit to the number of
+bucket-index reads per-second per an RGW server (each read brings 1000 object entries) using:
+
+$ radosgw-admin dedup throttle --max-bucket-index-ops=<count>
+
+A typical RGW server performs about 100 bucket-index reads per second (i.e. 100,000 object entries).
+Setting the count to 50 will typically slow down access by half and so on...
+
*********************
Full Dedup Processing
*********************
this->remote_pause_req = false;
this->remote_paused = false;
this->remote_restart_req = false;
+ this->bucket_index_throttle.disable();
+ this->metadata_access_throttle.disable();
}
//---------------------------------------------------------------------------
encode(ctl.remote_pause_req, bl);
encode(ctl.remote_paused, bl);
encode(ctl.remote_restart_req, bl);
+ encode(ctl.bucket_index_throttle, bl);
+ encode(ctl.metadata_access_throttle, bl);
ENCODE_FINISH(bl);
}
decode(ctl.remote_pause_req, bl);
decode(ctl.remote_paused, bl);
decode(ctl.remote_restart_req, bl);
+ decode(ctl.bucket_index_throttle, bl);
+ decode(ctl.metadata_access_throttle, bl);
DECODE_FINISH(bl);
}
out << "::remote_restart_req";
}
+ if (!ctl.bucket_index_throttle.is_disabled()) {
+ out << "::bucket_index_throttle=" << ctl.bucket_index_throttle.get_max_calls_per_second();
+ }
+ if (!ctl.metadata_access_throttle.is_disabled()) {
+ out << "::metadata_throttle=" << ctl.metadata_access_throttle.get_max_calls_per_second();
+ }
+
return out;
}
librados::IoCtx ioctx = obj.ioctx;
ldpp_dout(dpp, 20) << __func__ << "::removing tail object: " << raw_obj.oid
<< dendl;
+ d_ctl.metadata_access_throttle.acquire();
ret = ioctx.remove(raw_obj.oid);
}
}
ObjectWriteOperation op;
+ d_ctl.metadata_access_throttle.acquire();
+ ldpp_dout(dpp, 20) << __func__ << "::dec ref-count on tail object: " << raw_obj.oid << dendl;
cls_refcount_put(op, ref_tag, true);
rgw::AioResultList completed = aio->get(obj.obj,
rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield),
ObjectWriteOperation op;
cls_refcount_get(op, ref_tag, true);
+ d_ctl.metadata_access_throttle.acquire();
ldpp_dout(dpp, 20) << __func__ << "::inc ref-count on tail object: " << raw_obj.oid << dendl;
rgw::AioResultList completed = aio->get(obj.obj,
rgw::Aio::librados_op(obj.ioctx, std::move(op), null_yield),
ldpp_dout(dpp, 20) << __func__ << "::ref_tag=" << ref_tag << dendl;
int ret = inc_ref_count_by_manifest(ref_tag, src_oid, src_manifest);
if (ret == 0) {
+ d_ctl.metadata_access_throttle.acquire();
ldpp_dout(dpp, 20) << __func__ << "::send TGT CLS (Shared_Manifest)" << dendl;
ret = tgt_ioctx.operate(tgt_oid, &tgt_op);
if (unlikely(ret != 0)) {
p_stats->set_hash_attrs++;
}
+ d_ctl.metadata_access_throttle.acquire();
ldpp_dout(dpp, 20) << __func__ <<"::send SRC CLS (Shared_Manifest)"<< dendl;
ret = src_ioctx.operate(src_oid, &src_op);
if (unlikely(ret != 0)) {
return 0;
}
+ d_ctl.metadata_access_throttle.acquire();
ret = p_obj->get_obj_attrs(null_yield, dpp);
if (unlikely(ret < 0)) {
p_stats->ingress_failed_get_obj_attrs++;
}
}
+ p_stats->ingress_slabs++;
(*p_slab_count)++;
failure_count = 0;
unsigned slab_rec_count = 0;
const string& oid = oids[current_shard];
rgw_cls_list_ret result;
librados::ObjectReadOperation op;
+ d_ctl.bucket_index_throttle.acquire();
// get bucket-indices of @current_shard
cls_rgw_bucket_list_op(op, marker, null_prefix, null_delimiter, max_entries,
list_versions, &result);
display_table_stat_counters(dpp, p_stats);
ldpp_dout(dpp, 10) << __func__ << "::MD5 Loop::" << d_ctl.dedup_type << dendl;
- if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_FULL) {
+ if (d_ctl.dedup_type != dedup_req_type_t::DEDUP_TYPE_EXEC) {
for (work_shard_t worker_id = 0; worker_id < num_work_shards; worker_id++) {
remove_slabs(worker_id, md5_shard, slab_count_arr[worker_id]);
}
&worker_stats,raw_mem, raw_mem_size);
if (ret == 0) {
worker_stats.duration = ceph_clock_now() - start_time;
+ worker_stats.bidx_throttle_sleep_events = d_ctl.bucket_index_throttle.get_sleep_events();
+ worker_stats.bidx_throttle_sleep_time_usec = d_ctl.bucket_index_throttle.get_sleep_time_usec();
d_cluster.mark_work_shard_token_completed(store, worker_id, &worker_stats);
ldpp_dout(dpp, 10) << "stat counters [worker]:\n" << worker_stats << dendl;
ldpp_dout(dpp, 10) << "Shard Process Duration = "
}
//ldpp_dout(dpp, 0) << __func__ << "::sleep for 2 seconds\n" << dendl;
//std::this_thread::sleep_for(std::chrono::seconds(2));
+ //std::this_thread::sleep_forstd::chrono::microseconds(usec_timeout);
return ret;
}
int ret = objects_dedup_single_md5_shard(&table, md5_shard, &md5_stats, num_work_shards);
if (ret == 0) {
md5_stats.duration = ceph_clock_now() - start_time;
+ md5_stats.md_throttle_sleep_events = d_ctl.metadata_access_throttle.get_sleep_events();
+ md5_stats.md_throttle_sleep_time_usec = d_ctl.metadata_access_throttle.get_sleep_time_usec();
+
d_cluster.mark_md5_shard_token_completed(store, md5_shard, &md5_stats);
ldpp_dout(dpp, 10) << "stat counters [md5]:\n" << md5_stats << dendl;
ldpp_dout(dpp, 10) << "Shard Process Duration = "
ldpp_dout(dpp, 10) <<__func__ << "::" << *p_epoch << dendl;
d_ctl.dedup_type = p_epoch->dedup_type;
#ifdef FULL_DEDUP_SUPPORT
- ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL ||
+ ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC ||
d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE);
#else
ceph_assert(d_ctl.dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE);
{
int ret = 0;
int32_t urgent_msg = URGENT_MSG_NONE;
+ auto bl_iter = bl.cbegin();
try {
- auto bl_iter = bl.cbegin();
ceph::decode(urgent_msg, bl_iter);
} catch (buffer::error& err) {
ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad urgent_msg" << dendl;
- ret = -EINVAL;
+ cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL);
+ return;
+ }
+ ldpp_dout(dpp, 5) << __func__ << "::" << get_urgent_msg_names(urgent_msg) << dendl;
+
+ throttle_msg_t throttle_msg;
+ if (urgent_msg == URGENT_MSG_THROTTLE) {
+ try {
+ decode(throttle_msg, bl_iter);
+ ldpp_dout(dpp, 5) << __func__ << "::" << throttle_msg << dendl;
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << __func__ << "::ERROR: bad throttle_msg" << dendl;
+ cluster::ack_notify(store, dpp, &d_ctl, notify_id, cookie, -EINVAL);
+ return;
+ }
}
- ldpp_dout(dpp, 5) << __func__ << "::-->" << get_urgent_msg_names(urgent_msg) << dendl;
// use lock to prevent concurrent pause/resume requests
std::unique_lock cond_lock(d_cond_mutex); // [------>open lock block
ldpp_dout(dpp, 5) << __func__ << "::dedup is not paused->nothing to do" << dendl;
}
break;
+ case URGENT_MSG_THROTTLE:
+ for (auto action : throttle_msg.vec) {
+ if (action.op_type == BUCKET_INDEX_OP) {
+ d_ctl.bucket_index_throttle.set_max_calls_per_sec(action.limit);
+ }
+ else if (action.op_type == METADATA_ACCESS_OP) {
+ d_ctl.metadata_access_throttle.set_max_calls_per_sec(action.limit);
+ }
+ else if (action.op_type == STAT) {
+ ldpp_dout(dpp, 10) << __func__ << "::Throttle STAT" << dendl;
+ }
+ else {
+ ldpp_dout(dpp, 1) << __func__ << "::unexpected throttle_msg "
+ << action.op_type << dendl;
+ ret = -EINVAL;
+ }
+ }
+ break;
default:
ldpp_dout(dpp, 1) << __func__ << "::unexpected urgent_msg: "
<< get_urgent_msg_names(urgent_msg) << dendl;
bool remote_pause_req = false;
bool remote_paused = false;
bool remote_restart_req = false;
+ Throttle bucket_index_throttle;
+ Throttle metadata_access_throttle;
};
std::ostream& operator<<(std::ostream &out, const control_t &ctl);
void encode(const control_t& ctl, ceph::bufferlist& bl);
librados::IoCtx d_dedup_cluster_ioctx;
utime_t d_heart_beat_last_update;
unsigned d_heart_beat_max_elapsed_sec;
-
- // A pool with 6 billion objects has a 1/(2^64) chance for collison with a 128bit MD5
- uint64_t d_max_protected_objects = (6ULL * 1024 * 1024 * 1024);
uint64_t d_all_buckets_obj_count = 0;
uint64_t d_all_buckets_obj_size = 0;
// we don't benefit from deduping RGW objects smaller than head-object size
if (!has_incomplete_shards) {
return;
}
+ //utime_t now = ceph_clock_now();
Formatter::ArraySection array_section{*fmt, "incomplete_shards"};
for (unsigned shard = 0; shard < num_shards; shard++) {
- if (sp_arr[shard].is_completed() ) {
+ if (sp_arr[shard].is_completed()) {
+ continue;
+ }
+ if (sp_arr[shard].was_not_started() ) {
+ Formatter::ObjectSection object_section{*fmt, "pending shard:"};
+ fmt->dump_unsigned("shard_id", shard);
continue;
}
Formatter::ObjectSection object_section{*fmt, "shard_progress"};
fmt->dump_unsigned("progress_a", sp_arr[shard].progress_a);
fmt->dump_unsigned("progress_b", sp_arr[shard].progress_b);
fmt->dump_stream("last updated") << sp_arr[shard].update_time;
+ utime_t elapsed = sp_arr[shard].update_time - sp_arr[shard].creation_time;
+ fmt->dump_unsigned("time elapsed (sec)", elapsed.tv.tv_sec);
}
}
}
//---------------------------------------------------------------------------
- // command-line called from radosgw-admin.cc
- int cluster::dedup_control(rgw::sal::RadosStore *store,
- const DoutPrefixProvider *dpp,
- urgent_msg_t urgent_msg)
+ static void report_throttle_state(const struct rgw::dedup::control_t &ctl)
{
- ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = "
- << get_urgent_msg_names(urgent_msg) << dendl;
- if (urgent_msg != URGENT_MSG_RESUME &&
- urgent_msg != URGENT_MSG_PASUE &&
- urgent_msg != URGENT_MSG_RESTART &&
- urgent_msg != URGENT_MSG_ABORT) {
- ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl;
- return -EINVAL;
+ if (!ctl.bucket_index_throttle.is_disabled()) {
+ std::cout << "bucket-index throttle="
+ << ctl.bucket_index_throttle.get_max_calls_per_second()
+ << std::endl;
+ }
+ else {
+ std::cout << "bucket-index throttle is disabled" << std::endl;
}
+ if (!ctl.metadata_access_throttle.is_disabled()) {
+ std::cout << "metadata throttle="
+ << ctl.metadata_access_throttle.get_max_calls_per_second()
+ << std::endl;
+ }
+ else {
+ std::cout << "metadata throttle is disabled" << std::endl;
+ }
+ }
+
+ //---------------------------------------------------------------------------
+ // command-line called from radosgw-admin.cc
+ int cluster::dedup_control_bl(rgw::sal::RadosStore *store,
+ const DoutPrefixProvider *dpp,
+ urgent_msg_t urgent_msg,
+ bufferlist urgent_msg_bl)
+ {
librados::IoCtx ctl_ioctx;
int ret = get_control_ioctx(store, dpp, ctl_ioctx);
if (unlikely(ret != 0)) {
// 10 seconds timeout
const uint64_t timeout_ms = 10*1000;
- bufferlist reply_bl, urgent_msg_bl;
- ceph::encode(urgent_msg, urgent_msg_bl);
+ bufferlist reply_bl;
ret = rgw_rados_notify(dpp, ctl_ioctx, DEDUP_WATCH_OBJ, urgent_msg_bl,
timeout_ms, &reply_bl, null_yield);
if (ret < 0) {
struct rgw::dedup::control_t ctl;
decode(ctl, iter);
ldpp_dout(dpp, 10) << __func__ << "::++ACK::ctl=" << ctl << "::ret=" << ret << dendl;
+ if (urgent_msg == URGENT_MSG_THROTTLE) {
+ report_throttle_state(ctl);
+ }
} catch (buffer::error& err) {
ldpp_dout(dpp, 1) << __func__ << "::failed decoding notify acks" << dendl;
return -EINVAL;
return ret;
}
}
- ldpp_dout(dpp, 10) << __func__ << "::" << get_urgent_msg_names(urgent_msg)
- << " finished successfully!" << dendl;
+ ldpp_dout(dpp, 10) << __func__ << "::finished successfully!" << dendl;
return 0;
}
+ //---------------------------------------------------------------------------
+ // command-line called from radosgw-admin.cc
+ int cluster::dedup_control(rgw::sal::RadosStore *store,
+ const DoutPrefixProvider *dpp,
+ urgent_msg_t urgent_msg)
+ {
+ ldpp_dout(dpp, 10) << __func__ << "::dedup_control req = "
+ << get_urgent_msg_names(urgent_msg) << dendl;
+ if (urgent_msg != URGENT_MSG_RESUME &&
+ urgent_msg != URGENT_MSG_PASUE &&
+ urgent_msg != URGENT_MSG_RESTART &&
+ urgent_msg != URGENT_MSG_ABORT) {
+ ldpp_dout(dpp, 1) << __func__ << "::illegal urgent_msg="<< urgent_msg << dendl;
+ return -EINVAL;
+ }
+
+ bufferlist urgent_msg_bl;
+ ceph::encode(urgent_msg, urgent_msg_bl);
+ return dedup_control_bl(store, dpp, urgent_msg, urgent_msg_bl);
+ }
+
//---------------------------------------------------------------------------
// command-line called from radosgw-admin.cc
int cluster::dedup_restart_scan(rgw::sal::RadosStore *store,
ldpp_dout(dpp, 10) << __func__ << dedup_type << dendl;
#ifdef FULL_DEDUP_SUPPORT
ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE ||
- dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL);
+ dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC);
#else
ceph_assert(dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE);
#endif
uint64_t notify_id,
uint64_t cookie,
int status);
+ static int dedup_control_bl(rgw::sal::RadosStore *store,
+ const DoutPrefixProvider *dpp,
+ urgent_msg_t urgent_msg,
+ bufferlist urgent_msg_bl);
static int dedup_control(rgw::sal::RadosStore *store,
const DoutPrefixProvider *dpp,
urgent_msg_t urgent_msg);
unsigned len = (p_curr_block + 1 - p_arr) * sizeof(disk_block_t);
bufferlist bl = bufferlist::static_from_mem((char*)p_arr, len);
int ret = store_slab(ioctx, bl, d_md5_shard, d_worker_id, d_seq_number, dpp);
+ if (unlikely(ret != 0)) {
+ p_stats->write_slab_failure++;
+ }
// Need to make sure the call to rgw_put_system_obj was fully synchronous
// d_seq_number++ must be called **after** flush!!
else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_ESTIMATE) {
out << "DEDUP_TYPE_ESTIMATE";
}
- else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_FULL) {
- out << "DEDUP_TYPE_FULL";
+ else if (dedup_type == dedup_req_type_t::DEDUP_TYPE_EXEC) {
+ out << "DEDUP_TYPE_EXEC (full dedup)";
}
else {
out << "\n*** unexpected dedup_type ***\n";
return out;
}
+ //---------------------------------------------------------------------------
+ void validate_max_calls_offset()
+ {
+ // max_calls must be the first data member to guarantee 8 Bytes alignment
+ // this will allow us to avoid using std::atomic (which is expensive)
+ static_assert(offsetof(Throttle, max_calls) == 0);
+ }
+
+ //---------------------------------------------------------------------------
+ void encode(const Throttle& t, ceph::bufferlist& bl)
+ {
+ ENCODE_START(1, 1, bl);
+ encode(t.get_max_calls_per_second(), bl);
+ ENCODE_FINISH(bl);
+ }
+
+ //---------------------------------------------------------------------------
+ void decode(Throttle& t, ceph::bufferlist::const_iterator& bl)
+ {
+ DECODE_START(1, bl);
+ size_t max_calls_per_sec;
+ decode(max_calls_per_sec, bl);
+ t.set_max_calls_per_sec(max_calls_per_sec);
+ DECODE_FINISH(bl);
+ }
+
+ //---------------------------------------------------------------------------
+ std::ostream& operator<<(std::ostream &out, const throttle_action_t& msg)
+ {
+ if (msg.op_type == BUCKET_INDEX_OP) {
+ out << "Set Bucket Index Throttling to ";
+ if (msg.limit) {
+ out << msg.limit << " IOPS";
+ }
+ else {
+ out << "unlimited IOPS";
+ }
+ }
+ else if (msg.op_type == METADATA_ACCESS_OP) {
+ out << "Set Metadata Throttling to ";
+ if (msg.limit) {
+ out << msg.limit << " IOPS";
+ }
+ else {
+ out << "unlimited IOPS";
+ }
+ }
+ else if (msg.op_type == DATA_READ_WRITE_OP) {
+ out << "Set Read/Write Throttling to " << msg.limit << " MB/sec";
+ }
+ else {
+ out << "\n*** unexpected throttling type ***\n";
+ }
+
+ return out;
+ }
+
+ //---------------------------------------------------------------------------
+ std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg)
+ {
+ for (auto action : msg.vec) {
+ out << action << " :: ";
+ }
+ return out;
+ }
+
+ //---------------------------------------------------------------------------
+ void encode(const throttle_action_t& m, ceph::bufferlist& bl)
+ {
+ ENCODE_START(1, 1, bl);
+ encode((int)m.op_type, bl);
+ encode(m.limit, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ //---------------------------------------------------------------------------
+ void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl)
+ {
+ DECODE_START(1, bl);
+ int tmp;
+ decode(tmp, bl);
+ m.op_type = (op_type_t)tmp;
+ decode(m.limit, bl);
+ DECODE_FINISH(bl);
+ }
+
+ //---------------------------------------------------------------------------
+ void encode(const throttle_msg_t& m, ceph::bufferlist& bl)
+ {
+ ENCODE_START(1, 1, bl);
+ encode(m.vec, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ //---------------------------------------------------------------------------
+ void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl)
+ {
+ DECODE_START(1, bl);
+ decode(m.vec, bl);
+ DECODE_FINISH(bl);
+ }
+
//---------------------------------------------------------------------------
dedup_stats_t& dedup_stats_t::operator+=(const dedup_stats_t& other)
{
"URGENT_MSG_PASUE",
"URGENT_MSG_RESUME",
"URGENT_MSG_RESTART",
+ "URGENT_MSG_THROTTLE",
"URGENT_MSG_INVALID"
};
this->egress_records += other.egress_records;
this->egress_blocks += other.egress_blocks;
this->egress_slabs += other.egress_slabs;
+ this->write_slab_failure += other.write_slab_failure;
+ this->bidx_throttle_sleep_events += other.bidx_throttle_sleep_events;
+ this->bidx_throttle_sleep_time_usec += other.bidx_throttle_sleep_time_usec;
this->single_part_objs += other.single_part_objs;
this->multipart_objs += other.multipart_objs;
this->small_multipart_obj += other.small_multipart_obj;
{
Formatter::ObjectSection notify(*f, "notify");
+ if (this->bidx_throttle_sleep_events) {
+ f->dump_unsigned("Bucket-Index Throttle Sleep Events",
+ this->bidx_throttle_sleep_events);
+ f->dump_unsigned("Bucket-Index Throttle Sleep Time (sec)",
+ this->bidx_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND);
+ }
- if(this->non_default_storage_class_objs) {
+ if (this->non_default_storage_class_objs) {
f->dump_unsigned("non default storage class objs",
this->non_default_storage_class_objs);
f->dump_unsigned("non default storage class objs bytes",
{
Formatter::ObjectSection skipped(*f, "skipped");
- if(this->ingress_skip_too_small) {
+ if (this->ingress_skip_too_small) {
f->dump_unsigned("Ingress skip: too small objs",
this->ingress_skip_too_small);
f->dump_unsigned("Ingress skip: too small bytes",
{
Formatter::ObjectSection failed(*f, "failed");
- if(this->ingress_corrupted_etag) {
+ if (this->write_slab_failure) {
+ f->dump_unsigned("Write SLAB failures", this->write_slab_failure);
+ }
+ if (this->ingress_corrupted_etag) {
f->dump_unsigned("Corrupted ETAG", this->ingress_corrupted_etag);
}
}
encode(w.egress_records, bl);
encode(w.egress_blocks, bl);
encode(w.egress_slabs, bl);
+ encode(w.write_slab_failure, bl);
+ encode(w.bidx_throttle_sleep_events, bl);
+ encode(w.bidx_throttle_sleep_time_usec, bl);
encode(w.single_part_objs, bl);
encode(w.multipart_objs, bl);
decode(w.egress_records, bl);
decode(w.egress_blocks, bl);
decode(w.egress_slabs, bl);
+ decode(w.write_slab_failure, bl);
+ decode(w.bidx_throttle_sleep_events, bl);
+ decode(w.bidx_throttle_sleep_time_usec, bl);
decode(w.single_part_objs, bl);
decode(w.multipart_objs, bl);
decode(w.small_multipart_obj, bl);
{
this->small_objs_stat += other.small_objs_stat;
this->big_objs_stat += other.big_objs_stat;
+ this->ingress_slabs += other.ingress_slabs;
this->ingress_failed_load_bucket += other.ingress_failed_load_bucket;
this->ingress_failed_get_object += other.ingress_failed_get_object;
this->ingress_failed_get_obj_attrs += other.ingress_failed_get_obj_attrs;
this->dup_head_bytes += other.dup_head_bytes;
this->failed_dedup += other.failed_dedup;
+ this->md_throttle_sleep_events += other.md_throttle_sleep_events;
+ this->md_throttle_sleep_time_usec += other.md_throttle_sleep_time_usec;
this->failed_table_load += other.failed_table_load;
this->failed_map_overflow += other.failed_map_overflow;
return *this;
f->dump_unsigned("Total processed objects", this->processed_objects);
f->dump_unsigned("Loaded objects", this->loaded_objects);
+ f->dump_unsigned("Ingress Slabs", this->ingress_slabs);
f->dump_unsigned("Set Shared-Manifest SRC", this->set_shared_manifest_src);
f->dump_unsigned("Deduped Obj (this cycle)", this->deduped_objects);
f->dump_unsigned("Deduped Bytes(this cycle)", this->deduped_objects_bytes);
{
Formatter::ObjectSection notify(*f, "notify");
+ if (this->md_throttle_sleep_events) {
+ f->dump_unsigned("Metadata Throttle Sleep Events", this->md_throttle_sleep_events);
+ f->dump_unsigned("Metadata Throttle Sleep Time (sec)",
+ this->md_throttle_sleep_time_usec/MICROSECONDS_PER_SECOND);
+ }
+
if (this->failed_table_load) {
f->dump_unsigned("Failed Table Load", this->failed_table_load);
}
encode(m.small_objs_stat, bl);
encode(m.big_objs_stat, bl);
+ encode(m.ingress_slabs, bl);
encode(m.ingress_failed_load_bucket, bl);
encode(m.ingress_failed_get_object, bl);
encode(m.ingress_failed_get_obj_attrs, bl);
encode(m.deduped_objects_bytes, bl);
encode(m.dup_head_bytes, bl);
encode(m.failed_dedup, bl);
+ encode(m.md_throttle_sleep_events, bl);
+ encode(m.md_throttle_sleep_time_usec, bl);
encode(m.failed_table_load, bl);
encode(m.failed_map_overflow, bl);
DECODE_START(1, bl);
decode(m.small_objs_stat, bl);
decode(m.big_objs_stat, bl);
+ decode(m.ingress_slabs, bl);
decode(m.ingress_failed_load_bucket, bl);
decode(m.ingress_failed_get_object, bl);
decode(m.ingress_failed_get_obj_attrs, bl);
decode(m.deduped_objects_bytes, bl);
decode(m.dup_head_bytes, bl);
decode(m.failed_dedup, bl);
+ decode(m.md_throttle_sleep_events, bl);
+ decode(m.md_throttle_sleep_time_usec, bl);
decode(m.failed_table_load, bl);
decode(m.failed_map_overflow, bl);
#include "common/Formatter.h"
#include "common/ceph_json.h"
#include <time.h>
+#include <chrono>
#include "include/utime.h"
#include "include/encoding.h"
#include "common/dout.h"
#define FULL_DEDUP_SUPPORT
namespace rgw::dedup {
+ using namespace std::chrono;
using work_shard_t = uint16_t;
using md5_shard_t = uint16_t;
+ const uint64_t MICROSECONDS_PER_SECOND = 1000000;
// settings to help debug small systems
const work_shard_t MIN_WORK_SHARD = 2;
const md5_shard_t MIN_MD5_SHARD = 4;
enum dedup_req_type_t {
DEDUP_TYPE_NONE = 0,
DEDUP_TYPE_ESTIMATE = 1,
- DEDUP_TYPE_FULL = 2
+ DEDUP_TYPE_EXEC = 2
};
std::ostream& operator<<(std::ostream &out, const dedup_req_type_t& dedup_type);
uint8_t flags;
};
+ class alignas(8) Throttle {
+ friend void validate_max_calls_offset();
+ public:
+ // @max_calls_per_sec - max requests per second allowed, 0 means unlimited
+ // disbaled by default
+ Throttle(size_t max_calls_per_sec=0) {
+ set_max_calls_per_sec(max_calls_per_sec);
+ reset();
+ }
+
+ // set the number of calls per second
+ // zero means unlimited
+ inline void set_max_calls_per_sec(uint32_t max_calls_per_sec) {
+ max_calls = max_calls_per_sec;
+ }
+
+ inline size_t get_max_calls_per_second() const {
+ return max_calls;
+ }
+
+ inline uint64_t get_sleep_events() const {
+ return sleep_events;
+ }
+
+ inline uint64_t get_sleep_time_usec() const {
+ return sleep_time_usec;
+ }
+
+ inline void disable() {
+ set_max_calls_per_sec(0);
+ }
+
+ inline bool is_disabled() const {
+ return (max_calls == 0);
+ }
+
+ // Blocks until allowed to proceed
+ void acquire() {
+ if (is_disabled()) {
+ return;
+ }
+ // Should work fine without atomic since acquire is single threaded
+ const steady_clock::time_point now = steady_clock::now();
+ uint64_t elapsed_usec = duration_cast<microseconds>(now - last_reset).count();
+ if (elapsed_usec >= MICROSECONDS_PER_SECOND || last_reset > now) {
+ // Renew tokens if a second (or more) has passed since last_reset
+ reset();
+ --tokens;
+ return;
+ }
+
+ if (tokens > 0) {
+ --tokens;
+ return;
+ }
+
+ // if reached here, all tokens were exhausted, wait for the next time slot
+ ceph_assert(MICROSECONDS_PER_SECOND > elapsed_usec);
+ uint64_t wait_time_usec = MICROSECONDS_PER_SECOND - elapsed_usec;
+ sleep_events ++;
+ sleep_time_usec += wait_time_usec;
+
+ std::this_thread::sleep_for(microseconds(wait_time_usec));
+ // After sleeping, reset and return
+ reset();
+ tokens --;
+ }
+
+ private:
+ void reset() {
+ // atomic operation because it is 8 Bytes aligned
+ tokens = max_calls;
+ last_reset = steady_clock::now();
+ }
+
+ // @max_calls must be the first data member to guarantee 8 Bytes alignment
+ uint32_t max_calls;
+ uint32_t tokens;
+ steady_clock::time_point last_reset;
+ uint64_t sleep_events = 0;
+ uint64_t sleep_time_usec = 0;
+ } __attribute__ ((aligned (8)));
+
+ void validate_max_calls_offset();
+ void encode(const Throttle& t, ceph::bufferlist& bl);
+ void decode(Throttle& t, ceph::bufferlist::const_iterator& bl);
+
struct dedup_stats_t {
dedup_stats_t& operator+=(const dedup_stats_t& other);
uint64_t egress_records = 0;
uint64_t egress_blocks = 0;
uint64_t egress_slabs = 0;
+ uint64_t write_slab_failure = 0;
+ uint64_t bidx_throttle_sleep_events = 0;
+ uint64_t bidx_throttle_sleep_time_usec = 0;
uint64_t single_part_objs = 0;
uint64_t multipart_objs = 0;
dedup_stats_t small_objs_stat;
dedup_stats_t big_objs_stat;
+ uint64_t ingress_slabs = 0;
uint64_t ingress_failed_load_bucket = 0;
uint64_t ingress_failed_get_object = 0;
uint64_t ingress_failed_get_obj_attrs = 0;
uint64_t deduped_objects_bytes = 0;
uint64_t dup_head_bytes = 0;
uint64_t failed_dedup = 0;
+ uint64_t md_throttle_sleep_events = 0;
+ uint64_t md_throttle_sleep_time_usec = 0;
uint64_t failed_table_load = 0;
uint64_t failed_map_overflow = 0;
utime_t duration = {0, 0};
}
enum urgent_msg_t {
- URGENT_MSG_NONE = 0,
- URGENT_MSG_ABORT = 1,
- URGENT_MSG_PASUE = 2,
- URGENT_MSG_RESUME = 3,
- URGENT_MSG_RESTART = 4,
- URGENT_MSG_INVALID = 5
+ URGENT_MSG_NONE = 0,
+ URGENT_MSG_ABORT,
+ URGENT_MSG_PASUE,
+ URGENT_MSG_RESUME,
+ URGENT_MSG_RESTART,
+ URGENT_MSG_THROTTLE,
+ URGENT_MSG_INVALID
};
-
const char* get_urgent_msg_names(int msg);
+ enum op_type_t {
+ NO_OP = 0,
+ BUCKET_INDEX_OP,
+ METADATA_ACCESS_OP,
+ DATA_READ_WRITE_OP,
+ STAT,
+ INVALID_OP
+ };
+
+ struct throttle_action_t {
+ op_type_t op_type;
+ uint32_t limit;
+ };
+ void encode(const throttle_action_t& m, ceph::bufferlist& bl);
+ void decode(throttle_action_t& m, ceph::bufferlist::const_iterator& bl);
+ std::ostream& operator<<(std::ostream &out, const throttle_action_t& action);
+ struct throttle_msg_t {
+ std::vector<throttle_action_t> vec;
+ };
+
+ std::ostream& operator<<(std::ostream &out, const throttle_msg_t& msg);
+ void encode(const throttle_msg_t& m, ceph::bufferlist& bl);
+ void decode(throttle_msg_t& m, ceph::bufferlist::const_iterator& bl);
+
bool hex2int(const char *p, const char *p_end, uint64_t *p_val);
bool parse_etag_string(const std::string& etag, parsed_etag_t *parsed_etag);
void etag_to_bufferlist(uint64_t md5_high, uint64_t md5_low, uint16_t num_parts,
cout << " caps rm remove user capabilities\n";
cout << " dedup stats Display dedup statistics from the last run\n";
cout << " dedup estimate Runs dedup in estimate mode (no changes will be made)\n";
- cout << " dedup restart Restart dedup; must include --yes-i-really-mean-it to activate\n";
+ cout << " dedup exec Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate\n";
cout << " dedup abort Abort dedup\n";
cout << " dedup pause Pause dedup\n";
cout << " dedup resume Resume paused dedup\n";
+ cout << " dedup throttle Throttle dedup execution\n";
cout << " subuser create create a new subuser\n" ;
cout << " subuser modify modify subuser\n";
cout << " subuser rm remove subuser\n";
cout << " --disable-feature disable a zone/zonegroup feature\n";
cout << "\n";
cout << "<date> := \"YYYY-MM-DD[ hh:mm:ss]\"\n";
+ cout << "\nDedup throttle options:\n";
+ cout << " --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited\n";
+ cout << " --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited\n";
+ cout << " --stat display dedup throttle setting\n";
cout << "\nQuota options:\n";
cout << " --max-objects specify max objects (negative value to disable)\n";
cout << " --max-size specify max size (in B/K/M/G/T, negative value to disable)\n";
DEDUP_STATS,
DEDUP_ESTIMATE,
DEDUP_ABORT,
- DEDUP_RESTART,
+ DEDUP_EXEC,
DEDUP_PAUSE,
DEDUP_RESUME,
+ DEDUP_THROTTLE,
GC_LIST,
GC_PROCESS,
LC_LIST,
{ "dedup stats", OPT::DEDUP_STATS },
{ "dedup estimate", OPT::DEDUP_ESTIMATE },
{ "dedup abort", OPT::DEDUP_ABORT },
- { "dedup restart", OPT::DEDUP_RESTART },
+ { "dedup restart", OPT::DEDUP_EXEC },
+ { "dedup exec", OPT::DEDUP_EXEC },
{ "dedup pause", OPT::DEDUP_PAUSE },
{ "dedup resume", OPT::DEDUP_RESUME },
+ { "dedup throttle", OPT::DEDUP_THROTTLE },
{ "gc list", OPT::GC_LIST },
{ "gc process", OPT::GC_PROCESS },
{ "lc list", OPT::LC_LIST },
int skip_zero_entries = false; // log show
int purge_keys = false;
int yes_i_really_mean_it = false;
+ int throttle_stat = false;
int delete_child_objects = false;
int fix = false;
int remove_bad = false;
int64_t max_write_ops = 0;
int64_t max_read_bytes = 0;
int64_t max_write_bytes = 0;
+ uint32_t max_bucket_index_ops = 0;
+ uint32_t max_metadata_ops = 0;
bool have_max_objects = false;
bool have_max_size = false;
bool have_max_write_ops = false;
bool have_max_read_ops = false;
bool have_max_write_bytes = false;
bool have_max_read_bytes = false;
+ bool have_max_bucket_index_ops = false;
+ bool have_max_metadata_ops = false;
int include_all = false;
int allow_unordered = false;
return EINVAL;
}
have_max_write_bytes = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-bucket-index-ops", (char*)NULL)) {
+ max_bucket_index_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse max bucket index ops: " << err << std::endl;
+ return EINVAL;
+ }
+ have_max_bucket_index_ops = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-metadata-ops", (char*)NULL)) {
+ max_metadata_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse max metadata ops: " << err << std::endl;
+ return EINVAL;
+ }
+ have_max_metadata_ops = true;
} else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) {
date = val;
if (end_date.empty())
// do nothing
} else if (ceph_argparse_binary_flag(args, i, &yes_i_really_mean_it, NULL, "--yes-i-really-mean-it", (char*)NULL)) {
// do nothing
+ } else if (ceph_argparse_binary_flag(args, i, &throttle_stat, NULL, "--stat", (char*)NULL)) {
+ // do nothing
} else if (ceph_argparse_binary_flag(args, i, &fix, NULL, "--fix", (char*)NULL)) {
// do nothing
} else if (ceph_argparse_binary_flag(args, i, &remove_bad, NULL, "--remove-bad", (char*)NULL)) {
OPT::DEDUP_STATS,
OPT::DEDUP_ESTIMATE,
OPT::DEDUP_ABORT, // TBD - not READ-ONLY
- OPT::DEDUP_RESTART, // TBD - not READ-ONLY
+ OPT::DEDUP_EXEC, // TBD - not READ-ONLY
OPT::DEDUP_PAUSE,
OPT::DEDUP_RESUME,
+ OPT::DEDUP_THROTTLE,
OPT::GC_LIST,
OPT::LC_LIST,
OPT::ORPHANS_LIST_JOBS,
opt_cmd == OPT::DEDUP_ABORT ||
opt_cmd == OPT::DEDUP_PAUSE ||
opt_cmd == OPT::DEDUP_RESUME ||
- opt_cmd == OPT::DEDUP_RESTART) {
+ opt_cmd == OPT::DEDUP_THROTTLE ||
+ opt_cmd == OPT::DEDUP_EXEC) {
using namespace rgw::dedup;
rgw::sal::RadosStore *store = dynamic_cast<rgw::sal::RadosStore*>(driver);
return ret;
}
- if (opt_cmd == OPT::DEDUP_ABORT || opt_cmd == OPT::DEDUP_PAUSE || opt_cmd == OPT::DEDUP_RESUME) {
+ if (opt_cmd == OPT::DEDUP_THROTTLE) {
+ bufferlist urgent_msg_bl;
+ urgent_msg_t urgent_msg = URGENT_MSG_THROTTLE;
+ ceph::encode(urgent_msg, urgent_msg_bl);
+ throttle_msg_t throttle_msg;
+
+ if (throttle_stat) {
+ encode(throttle_msg, urgent_msg_bl);
+ return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl);
+ }
+
+ if (unlikely(!have_max_bucket_index_ops && !have_max_metadata_ops)) {
+ std::cerr << "dedup throttle must set either --max-bucket-index-ops or --max-metadata-ops" << std::endl;
+ return EINVAL;
+ }
+
+ if (have_max_bucket_index_ops) {
+ throttle_action_t action = { .op_type = BUCKET_INDEX_OP,
+ .limit = max_bucket_index_ops};
+ throttle_msg.vec.push_back(action);
+ }
+
+ if (have_max_metadata_ops) {
+ throttle_action_t action = { .op_type = METADATA_ACCESS_OP,
+ .limit = max_metadata_ops};
+ throttle_msg.vec.push_back(action);
+ }
+
+ encode(throttle_msg, urgent_msg_bl);
+ return cluster::dedup_control_bl(store, dpp(), urgent_msg, urgent_msg_bl);
+ }
+
+ if (opt_cmd == OPT::DEDUP_ABORT ||
+ opt_cmd == OPT::DEDUP_PAUSE ||
+ opt_cmd == OPT::DEDUP_RESUME) {
urgent_msg_t urgent_msg;
if (opt_cmd == OPT::DEDUP_ABORT) {
urgent_msg = URGENT_MSG_ABORT;
return cluster::dedup_control(store, dpp(), urgent_msg);
}
- if (opt_cmd == OPT::DEDUP_RESTART || opt_cmd == OPT::DEDUP_ESTIMATE) {
+ if (opt_cmd == OPT::DEDUP_EXEC || opt_cmd == OPT::DEDUP_ESTIMATE) {
dedup_req_type_t dedup_type = dedup_req_type_t::DEDUP_TYPE_NONE;
if (opt_cmd == OPT::DEDUP_ESTIMATE) {
dedup_type = dedup_req_type_t::DEDUP_TYPE_ESTIMATE;
<< std::endl;
return EINVAL;
}
- dedup_type = dedup_req_type_t::DEDUP_TYPE_FULL;
+ dedup_type = dedup_req_type_t::DEDUP_TYPE_EXEC;
#ifndef FULL_DEDUP_SUPPORT
std::cerr << "Only dedup estimate is supported!" << std::endl;
return EPERM;
caps rm remove user capabilities
dedup stats Display dedup statistics from the last run
dedup estimate Runs dedup in estimate mode (no changes will be made)
- dedup restart Restart dedup; must include --yes-i-really-mean-it to activate
+ dedup exec Execute dedup (duplicated tail objects will be deleted); must include --yes-i-really-mean-it to activate
dedup abort Abort dedup
dedup pause Pause dedup
dedup resume Resume paused dedup
+ dedup throttle Throttle dedup execution
subuser create create a new subuser
subuser modify modify subuser
subuser rm remove subuser
<date> := "YYYY-MM-DD[ hh:mm:ss]"
+ Dedup throttle options:
+ --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited
+ --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited
+ --stat display dedup throttle setting
+
Quota options:
--max-objects specify max objects (negative value to disable)
--max-size specify max size (in B/K/M/G/T, negative value to disable)
return (dedup_work_was_completed, dedup_stats, dedup_ratio_estimate, dedup_ratio_actual)
+#-------------------------------------------------------------------------------
+def set_bucket_index_throttling(limit):
+ cmd = ['dedup', 'throttle', '--max-bucket-index-ops', str(limit)]
+ result = admin(cmd)
+ assert result[1] == 0
+ log.debug(result[0])
+
#-------------------------------------------------------------------------------
def exec_dedup_internal(expected_dedup_stats, dry_run, max_dedup_time):
+ ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs)
+ limit=random.randint(50, 200)
+ set_bucket_index_throttling(limit)
+
log.debug("sending exec_dedup request: dry_run=%d", dry_run)
if dry_run:
result = admin(['dedup', 'estimate'])
reset_full_dedup_stats(expected_dedup_stats)
else:
- result = admin(['dedup', 'restart', '--yes-i-really-mean-it'])
+ result = admin(['dedup', 'exec', '--yes-i-really-mean-it'])
assert result[1] == 0
log.debug("wait for dedup to complete")
dedup_time = 0
- dedup_timeout = 5
+ dedup_timeout = 3
dedup_stats = Dedup_Stats()
dedup_ratio=Dedup_Ratio()
wait_for_completion = True
wait_for_completion = False
log.info("dedup completed in %d seconds", dedup_time)
return (dedup_time, ret[1], ret[2], ret[3])
-
+ else:
+ ### set throttling to a rand val between 50-200 IOPS (i.e. 50K-200K objs)
+ limit=random.randint(50, 200)
+ set_bucket_index_throttling(limit)
#-------------------------------------------------------------------------------
def exec_dedup(expected_dedup_stats, dry_run, verify_stats=True):
global full_dedup_state_was_checked
global full_dedup_state_disabled
log.debug("check_full_dedup_state:: sending FULL Dedup request")
- result = admin(['dedup', 'restart', '--yes-i-really-mean-it'])
+ result = admin(['dedup', 'exec', '--yes-i-really-mean-it'])
if result[1] == 0:
- log.info("full dedup is enabled!")
+ log.debug("full dedup is enabled!")
full_dedup_state_disabled = False
result = admin(['dedup', 'abort'])
assert result[1] == 0
else:
- log.info("full dedup is disabled, skip all full dedup tests")
+ log.debug("full dedup is disabled, skip all full dedup tests")
full_dedup_state_disabled = True
full_dedup_state_was_checked = True
# REMOVE some objects and update stats/expected
src_record=0
shared_manifest=0
- valid_hash=0
+ valid_sha=0
object_keys=[]
files_sub=[]
dedup_stats = Dedup_Stats()
log.debug("objects::%s::size=%d, num_copies=%d", filename, obj_size, num_copies_2);
if num_copies_2:
if num_copies_2 > 1 and obj_size > RADOS_OBJ_SIZE:
- valid_hash += num_copies_2
+ valid_sha += num_copies_2
src_record += 1
shared_manifest += (num_copies_2 - 1)
dedup_stats.deduped_obj_bytes=0
dedup_stats.skip_src_record=src_record
dedup_stats.skip_shared_manifest=shared_manifest
- dedup_stats.valid_hash=valid_hash
+ dedup_stats.valid_hash=valid_sha
dedup_stats.invalid_hash=0
dedup_stats.set_hash=0
# REMOVE some objects and update stats/expected
src_record=0
shared_manifest=0
- valid_hash=0
+ valid_sha=0
object_keys=[]
files_sub=[]
dedup_stats = Dedup_Stats()
log.debug("objects::%s::size=%d, num_copies=%d", filename, obj_size, num_copies_2);
if num_copies_2:
if num_copies_2 > 1 and obj_size > RADOS_OBJ_SIZE:
- valid_hash += num_copies_2
+ valid_sha += num_copies_2
src_record += 1
shared_manifest += (num_copies_2 - 1)
dedup_stats.deduped_obj_bytes=0
dedup_stats.skip_src_record=src_record
dedup_stats.skip_shared_manifest=shared_manifest
- dedup_stats.valid_hash=valid_hash
+ dedup_stats.valid_hash=valid_sha
dedup_stats.invalid_hash=0
dedup_stats.set_hash=0
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_large_scale_with_tenants():
- #return
+ return
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_large_scale():
- #return
+ return
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_empty_bucket():
- #return
+ return
if full_dedup_is_disabled():
return