From: Gabriel BenHanokh Date: Thu, 23 Apr 2026 14:16:31 +0000 (+0000) Subject: rgw/dedup: add --allow/deny-bucket-list and --allow/deny-storage-class-list to dedup... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e9f603f8ace22e245545621756a3174785c06d75;p=ceph.git rgw/dedup: add --allow/deny-bucket-list and --allow/deny-storage-class-list to dedup commands Resolves: bz#2413730 Signed-off-by: Gabriel BenHanokh --- diff --git a/doc/radosgw/s3_objects_dedup.rst b/doc/radosgw/s3_objects_dedup.rst index 7de91ed00071..0108af23025b 100644 --- a/doc/radosgw/s3_objects_dedup.rst +++ b/doc/radosgw/s3_objects_dedup.rst @@ -33,6 +33,29 @@ Admin Commands - ``radosgw-admin dedup throttle --stat``: Displays dedup throttle setting. +The ``dedup estimate`` and ``dedup exec`` commands also accept filter options: + +- ``--allow-bucket-list ``: + Path to a file listing bucket names to include (allowlist mode). + Only buckets listed in the file will be processed. + Mutually exclusive with ``--deny-bucket-list``. + +- ``--deny-bucket-list ``: + Path to a file listing bucket names to exclude (denylist mode). + All buckets except those listed in the file will be processed. + Mutually exclusive with ``--allow-bucket-list``. + +- ``--allow-storage-class-list ``: + Path to a file listing storage class names to include (allowlist mode). + Mutually exclusive with ``--deny-storage-class-list``. + +- ``--deny-storage-class-list ``: + Path to a file listing storage class names to exclude (denylist mode). + Mutually exclusive with ``--allow-storage-class-list``. + +**File format:** One name per line. Lines starting with or containing ``#`` +are treated as comments. Whitespace is ignored. + Skipped Objects =============== diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index cdb981a7143e..f445184d3189 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -236,6 +236,7 @@ if(WITH_RADOSGW_RADOS) driver/rados/config/zone.cc driver/rados/config/zonegroup.cc driver/rados/rgw_dedup.cc + driver/rados/rgw_dedup_filter.cc driver/rados/rgw_dedup_table.cc driver/rados/rgw_dedup_store.cc driver/rados/rgw_dedup_utils.cc diff --git a/src/rgw/driver/rados/rgw_dedup.cc b/src/rgw/driver/rados/rgw_dedup.cc index a03c78771a26..23f7fd90acc2 100644 --- a/src/rgw/driver/rados/rgw_dedup.cc +++ b/src/rgw/driver/rados/rgw_dedup.cc @@ -2292,10 +2292,14 @@ namespace rgw::dedup { p_worker_stats->ingress_obj_bytes += ondisk_byte_size; // We limit dedup to objects from the same storage_class - // TBD-Future: - // Should we use a skip-list of storage_classes we should skip (like glacier) ? const std::string& storage_class = rgw_placement_rule::get_canonical_storage_class(entry.meta.storage_class); + if (!d_filter.allow_storage_class(storage_class)) { + ldpp_dout(dpp, 20) << __func__ << "::skip storage_class (filter): " + << storage_class << dendl; + p_worker_stats->ingress_skip_filtered_storage_class++; + return 0; + } if (storage_class == RGW_STORAGE_CLASS_STANDARD) { p_worker_stats->default_storage_class_objs++; p_worker_stats->default_storage_class_objs_bytes += ondisk_byte_size; @@ -2746,6 +2750,12 @@ namespace rgw::dedup { continue; } ldpp_dout(dpp, 20) <<__func__ << "::bucket=" << bucket << dendl; + if (!d_filter.allow_bucket(bucket.name)) { + ldpp_dout(dpp, 10) << __func__ << "::worker_id=" << worker_id + << "::skip bucket (filter): " << bucket.name << dendl; + p_worker_stats->ingress_skip_filtered_bucket++; + continue; + } ret = ingress_bucket_objects_single_shard(disk_arr, bucket, worker_id, num_work_shards, p_worker_stats); if (unlikely(ret != 0)) { @@ -3118,6 +3128,23 @@ namespace rgw::dedup { break; case URGENT_MSG_RESTART: if (!d_ctl.dedup_exec) { + // Decode optional filter (may not be present for older senders) + { + bool has_filter = false; + try { + ceph::decode(has_filter, bl_iter); + if (has_filter) { + decode(d_filter, bl_iter); + ldpp_dout(dpp, 5) << __func__ << "::RESTART with filter" << dendl; + } + else { + d_filter = dedup_filter_t{}; + } + } catch (buffer::error&) { + // older sender without filter - reset to no-filter + d_filter = dedup_filter_t{}; + } + } d_ctl.remote_restart_req = true; d_cond.notify_all(); } diff --git a/src/rgw/driver/rados/rgw_dedup.h b/src/rgw/driver/rados/rgw_dedup.h index 71d980fb58b0..dac4a7662be1 100644 --- a/src/rgw/driver/rados/rgw_dedup.h +++ b/src/rgw/driver/rados/rgw_dedup.h @@ -18,6 +18,7 @@ #include "rgw_dedup_utils.h" #include "rgw_dedup_table.h" #include "rgw_dedup_cluster.h" +#include "rgw_dedup_filter.h" #include "rgw_realm_reloader.h" #include #include @@ -259,6 +260,7 @@ namespace rgw::dedup { bool d_split_head = true; uint32_t d_head_object_size = (4ULL * 1024 * 1024); control_t d_ctl; + dedup_filter_t d_filter; uint64_t d_watch_handle = 0; DedupWatcher d_watcher_ctx; diff --git a/src/rgw/driver/rados/rgw_dedup_cluster.cc b/src/rgw/driver/rados/rgw_dedup_cluster.cc index 5d95e15c174b..d3c40036b83e 100644 --- a/src/rgw/driver/rados/rgw_dedup_cluster.cc +++ b/src/rgw/driver/rados/rgw_dedup_cluster.cc @@ -1056,7 +1056,7 @@ namespace rgw::dedup { collect_single_shard_stats(dpp, owner_map, sp_arr.data(), shard, &show_time, "WORKER"); } Formatter::ObjectSection worker_stats(*fmt, "worker_stats"); - wrk_stats_sum.dump(fmt); + wrk_stats_sum.dump(fmt, num_work_shards); show_incomplete_shards_fmt(has_incomplete_shards, num_work_shards, sp_arr.data(), fmt); md5_start_time = show_time_func_fmt(epoch.time, show_time, owner_map, fmt); } @@ -1295,7 +1295,8 @@ namespace rgw::dedup { // command-line called from radosgw-admin.cc int cluster::dedup_restart_scan(rgw::sal::RadosStore *store, dedup_req_type_t dedup_type, - const DoutPrefixProvider *dpp) + const DoutPrefixProvider *dpp, + const dedup_filter_t *p_filter) { ldpp_dout(dpp, 1) << __func__ << "::dedup_type = " << dedup_type << dendl; @@ -1337,7 +1338,30 @@ namespace rgw::dedup { ret = swap_epoch(store, dpp, &old_epoch, dedup_type, 0, 0); if (ret == 0) { ldpp_dout(dpp, 10) << __func__ << "::Epoch object was reset" << dendl; - return dedup_control(store, dpp, URGENT_MSG_RESTART); + + // Build the RESTART bufferlist, optionally including the filter + bufferlist urgent_msg_bl; + ceph::encode(URGENT_MSG_RESTART, urgent_msg_bl); + bool has_filter = (p_filter != nullptr); + ceph::encode(has_filter, urgent_msg_bl); + if (has_filter) { + const auto& bucket_set = p_filter->get_bucket_filter(); + ldpp_dout(dpp, 20) << __func__ << "::bucket_set.size()=" + << bucket_set.size() << dendl; + for (const auto& name : bucket_set) { + ldpp_dout(dpp, 20) << __func__ << "::bucket_set::" << name << dendl; + } + + const auto& sc_vec = p_filter->get_storage_class_filter(); + ldpp_dout(dpp, 20) << __func__ << "::sc_vec.size()=" + << sc_vec.size() << dendl; + for (const auto& name : sc_vec) { + ldpp_dout(dpp, 20) << __func__ << "::sc_vec::" << name << dendl; + } + + encode(*p_filter, urgent_msg_bl); + } + return dedup_control_bl(store, dpp, URGENT_MSG_RESTART, urgent_msg_bl); } else { return ret; diff --git a/src/rgw/driver/rados/rgw_dedup_cluster.h b/src/rgw/driver/rados/rgw_dedup_cluster.h index b897de2548dc..da64b1fd90f2 100644 --- a/src/rgw/driver/rados/rgw_dedup_cluster.h +++ b/src/rgw/driver/rados/rgw_dedup_cluster.h @@ -16,6 +16,7 @@ #include "common/dout.h" #include "rgw_dedup_utils.h" #include "rgw_dedup_store.h" +#include "rgw_dedup_filter.h" #include namespace rgw::dedup { @@ -101,7 +102,8 @@ namespace rgw::dedup { urgent_msg_t urgent_msg); static int dedup_restart_scan(rgw::sal::RadosStore *store, dedup_req_type_t dedup_type, - const DoutPrefixProvider *dpp); + const DoutPrefixProvider *dpp, + const dedup_filter_t *p_filter = nullptr); //--------------------------------------------------------------------------- int mark_work_shard_token_completed(rgw::sal::RadosStore *store, diff --git a/src/rgw/driver/rados/rgw_dedup_filter.cc b/src/rgw/driver/rados/rgw_dedup_filter.cc new file mode 100644 index 000000000000..c3b62f6f4af1 --- /dev/null +++ b/src/rgw/driver/rados/rgw_dedup_filter.cc @@ -0,0 +1,216 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; +// vim: ts=8 sw=2 sts=2 expandtab +/* + * Ceph - scalable distributed file system + * + * Author: Gabriel BenHanokh + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw_dedup_filter.h" +#include "common/dout.h" +#include "common/errno.h" +#include "include/encoding.h" +#include "rgw_rest_s3.h" +#include +#include +#include +#include + +#define dout_subsys ceph_subsys_rgw_dedup + +namespace rgw::dedup { + + //--------------------------------------------------------------------------- + bool dedup_filter_t::allow_bucket(const std::string& bucket_name) const + { + switch (bucket_mode) { + case filter_mode_t::FILTER_NONE: + return true; + case filter_mode_t::FILTER_ALLOW: + return bucket_set.contains(bucket_name); + case filter_mode_t::FILTER_DENY: + return !bucket_set.contains(bucket_name); + default: + return true; + } + } + + //--------------------------------------------------------------------------- + bool dedup_filter_t::allow_storage_class(const std::string& storage_class) const + { + if (storage_class_mode == filter_mode_t::FILTER_NONE) { + return true; + } + + auto it = std::find(sc_vec.begin(), sc_vec.end(), storage_class); + if (storage_class_mode == filter_mode_t::FILTER_ALLOW) { + return it != sc_vec.end(); + } + else { + return it == sc_vec.end(); + } + } + + //--------------------------------------------------------------------------- + static inline int valid_bucket_name(const std::string& name) + { + // per Casey instructions - nothing should be filtered here + return 0; + + // return -(valid_s3_bucket_name(name, true /* relaxed */)); + } + + //--------------------------------------------------------------------------- + static int valid_storage_class_name(const std::string& name) + { + // per Casey instructions - nothing should be filtered here + return 0; + } + + //--------------------------------------------------------------------------- + // Private static helper: read filter file, one name per line. + int dedup_filter_t::read_filter_file(const std::string& path, + std::unordered_set& name_set, + int (*validator)(const std::string&), + const DoutPrefixProvider* dpp) + { + std::ifstream f(path); + if (!f.is_open()) { + ldpp_dout(dpp, 1) << __func__ << ":: failed to open filter file: " << path << dendl; + return ENOENT; + } + + std::string line; + int line_num = 0; + while (std::getline(f, line)) { + line_num++; + + // Strip comment (everything from '#' onward) + auto comment_pos = line.find('#'); + if (comment_pos != std::string::npos) { + line = line.substr(0, comment_pos); + } + + // Trim leading and trailing whitespace + const char* ws = " \t\r\n"; + auto start = line.find_first_not_of(ws); + if (start == std::string::npos) { + continue; // blank line after stripping + } + + auto end = line.find_last_not_of(ws); + std::string name = line.substr(start, end - start + 1); + if (!name.empty()) { + int ret = validator(name); + if (ret == 0) { + ldpp_dout(dpp, 20) << __func__ << "::" << name << dendl; + name_set.insert(std::move(name)); + } + else { + ldpp_dout(dpp, 1) << __func__ << "::" << path << "::" << line_num + << "::invalid name '" << name << "'" << dendl; + return EINVAL; + } + } + } + + return 0; + } + + //--------------------------------------------------------------------------- + dedup_filter_t::dedup_filter_t(const std::string& allow_bucket_file, + const std::string& deny_bucket_file, + const std::string& allow_sc_file, + const std::string& deny_sc_file, + const DoutPrefixProvider* dpp) + { + // Validate mutual exclusivity + if (!allow_bucket_file.empty() && !deny_bucket_file.empty()) { + ldpp_dout(dpp, 1) << __func__ + << ":: --allow-bucket-list and --deny-bucket-list are mutually exclusive" + << dendl; + d_errcode = EINVAL; + return; + } + if (!allow_sc_file.empty() && !deny_sc_file.empty()) { + ldpp_dout(dpp, 1) << __func__ + << ":: --allow-storage-class-list and --deny-storage-class-list are mutually exclusive" + << dendl; + d_errcode = EINVAL; + return; + } + + // Bucket filter + if (!allow_bucket_file.empty()) { + d_errcode = read_filter_file(allow_bucket_file, bucket_set, + valid_bucket_name, dpp); + if (d_errcode != 0) { + return; + } + bucket_mode = filter_mode_t::FILTER_ALLOW; + } + else if (!deny_bucket_file.empty()) { + d_errcode = read_filter_file(deny_bucket_file, bucket_set, + valid_bucket_name, dpp); + if (d_errcode != 0) { + return; + } + bucket_mode = filter_mode_t::FILTER_DENY; + } + + // Storage-class filter + // First, read the filtered storage-class names into a set to prevent duplicates + // and then move them into a vector for better efficiency. + std::unordered_set name_set; + if (!allow_sc_file.empty()) { + d_errcode = read_filter_file(allow_sc_file, name_set, + valid_storage_class_name, dpp); + if (d_errcode != 0) { + return; + } + storage_class_mode = filter_mode_t::FILTER_ALLOW; + } + else if (!deny_sc_file.empty()) { + d_errcode = read_filter_file(deny_sc_file, name_set, + valid_storage_class_name, dpp); + if (d_errcode != 0) { + return; + } + storage_class_mode = filter_mode_t::FILTER_DENY; + } + // move elements from set to vector + sc_vec.assign(name_set.begin(), name_set.end()); + } + + //--------------------------------------------------------------------------- + void encode(const dedup_filter_t& f, ceph::bufferlist& bl) + { + ENCODE_START(1, 1, bl); + encode(static_cast(f.bucket_mode), bl); + encode(f.bucket_set, bl); + encode(static_cast(f.storage_class_mode), bl); + encode(f.sc_vec, bl); + ENCODE_FINISH(bl); + } + + //--------------------------------------------------------------------------- + void decode(dedup_filter_t& f, ceph::bufferlist::const_iterator& bl) + { + DECODE_START(1, bl); + uint8_t mode; + decode(mode, bl); + f.bucket_mode = static_cast(mode); + decode(f.bucket_set, bl); + decode(mode, bl); + f.storage_class_mode = static_cast(mode); + decode(f.sc_vec, bl); + DECODE_FINISH(bl); + } + +} // namespace rgw::dedup diff --git a/src/rgw/driver/rados/rgw_dedup_filter.h b/src/rgw/driver/rados/rgw_dedup_filter.h new file mode 100644 index 000000000000..8bda372e20b1 --- /dev/null +++ b/src/rgw/driver/rados/rgw_dedup_filter.h @@ -0,0 +1,90 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; +// vim: ts=8 sw=2 sts=2 expandtab +/* + * Ceph - scalable distributed file system + * + * Author: Gabriel BenHanokh + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once +#include "common/dout.h" +#include "include/rados/buffer.h" +#include "include/encoding.h" +#include +#include + +namespace rgw::dedup { + + enum class filter_mode_t : uint8_t { + FILTER_NONE, // no filter active + FILTER_ALLOW, // allowlist: only listed items pass + FILTER_DENY // denylist: listed items are blocked + }; + + struct dedup_filter_t { + friend void encode(const dedup_filter_t& f, ceph::bufferlist& bl); + friend void decode(dedup_filter_t& f, ceph::bufferlist::const_iterator& bl); + + // Default constructor: no filter (all buckets/storage classes pass) + dedup_filter_t() = default; + + // Constructor from file paths. Empty string = no filter for that dimension. + // allow and deny are mutually exclusive per dimension. + // Check errcode() after construction to detect any error. + dedup_filter_t(const std::string& allow_bucket_file, + const std::string& deny_bucket_file, + const std::string& allow_sc_file, + const std::string& deny_sc_file, + const DoutPrefixProvider* dpp); + + // Returns 0 on success, errno if construction failed. + int errcode() const { return d_errcode; } + + // Returns true if any filter dimension is active. + bool is_active() const { + return (bucket_mode != filter_mode_t::FILTER_NONE || + storage_class_mode != filter_mode_t::FILTER_NONE); + } + + // Returns true if the bucket should be processed + bool allow_bucket(const std::string& bucket_name) const; + // Returns true if the storage class should be processed + bool allow_storage_class(const std::string& storage_class) const; + + const std::unordered_set& get_bucket_filter() const { + return bucket_set; + } + + const std::vector& get_storage_class_filter() const { + return sc_vec; + } + + private: + // Read filter file: one name per line, '#' starts a comment, whitespace trimmed + // On success returns 0 and populates name_set; on error returns errno code. + static int read_filter_file(const std::string& path, + std::unordered_set& name_set, + int (*validator)(const std::string&), + const DoutPrefixProvider* dpp); + + filter_mode_t bucket_mode = filter_mode_t::FILTER_NONE; + // we can have many buckets, use unordered_set for a quick search + std::unordered_set bucket_set; + + filter_mode_t storage_class_mode = filter_mode_t::FILTER_NONE; + // there are only a few active storage_classes, a vector will suffice + std::vector sc_vec; + + int d_errcode = 0; + }; + + void encode(const dedup_filter_t& f, ceph::bufferlist& bl); + void decode(dedup_filter_t& f, ceph::bufferlist::const_iterator& bl); + +} // namespace rgw::dedup diff --git a/src/rgw/driver/rados/rgw_dedup_utils.cc b/src/rgw/driver/rados/rgw_dedup_utils.cc index 52fdfa2c04f3..fcea57fe9c64 100644 --- a/src/rgw/driver/rados/rgw_dedup_utils.cc +++ b/src/rgw/driver/rados/rgw_dedup_utils.cc @@ -382,16 +382,18 @@ namespace rgw::dedup { this->ingress_corrupted_etag += other.ingress_corrupted_etag; this->ingress_skip_too_small_bytes += other.ingress_skip_too_small_bytes; this->ingress_skip_too_small += other.ingress_skip_too_small; + this->ingress_skip_filtered_bucket += other.ingress_skip_filtered_bucket; + this->ingress_skip_filtered_storage_class += other.ingress_skip_filtered_storage_class; return *this; } //--------------------------------------------------------------------------- - void worker_stats_t::dump(Formatter *f) const + void worker_stats_t::dump(Formatter *f, unsigned num_shards) const { // main section { Formatter::ObjectSection main(*f, "main"); - + f->dump_unsigned("Num Work-Shards", num_shards); f->dump_unsigned("Ingress Objs count", this->ingress_obj); f->dump_unsigned("Accum byte size Ingress Objs", this->ingress_obj_bytes); f->dump_unsigned("Egress Records count", this->egress_records); @@ -439,6 +441,16 @@ namespace rgw::dedup { f->dump_unsigned("Ingress skip: too small bytes", this->ingress_skip_too_small_bytes); } + + if (this->ingress_skip_filtered_bucket && num_shards) { + // buckets are scanned once per worker-shard + f->dump_unsigned("Ingress skip: filtered bucket", + this->ingress_skip_filtered_bucket/num_shards); + } + if (this->ingress_skip_filtered_storage_class) { + f->dump_unsigned("Ingress skipped filtered storage class, num objects skipped", + this->ingress_skip_filtered_storage_class); + } } { @@ -456,7 +468,7 @@ namespace rgw::dedup { std::ostream& operator<<(std::ostream &out, const worker_stats_t &s) { JSONFormatter formatter(false); - s.dump(&formatter); + s.dump(&formatter, 1); std::stringstream sstream; formatter.flush(sstream); out << sstream.str(); @@ -489,6 +501,8 @@ namespace rgw::dedup { encode(w.ingress_skip_too_small_bytes, bl); encode(w.ingress_skip_too_small, bl); + encode(w.ingress_skip_filtered_bucket, bl); + encode(w.ingress_skip_filtered_storage_class, bl); encode(w.duration, bl); ENCODE_FINISH(bl); @@ -516,6 +530,8 @@ namespace rgw::dedup { decode(w.ingress_corrupted_etag, bl); decode(w.ingress_skip_too_small_bytes, bl); decode(w.ingress_skip_too_small, bl); + decode(w.ingress_skip_filtered_bucket, bl); + decode(w.ingress_skip_filtered_storage_class, bl); decode(w.duration, bl); DECODE_FINISH(bl); diff --git a/src/rgw/driver/rados/rgw_dedup_utils.h b/src/rgw/driver/rados/rgw_dedup_utils.h index 6a7f508cc36e..2c2fe4b9c8ab 100644 --- a/src/rgw/driver/rados/rgw_dedup_utils.h +++ b/src/rgw/driver/rados/rgw_dedup_utils.h @@ -172,7 +172,7 @@ namespace rgw::dedup { struct worker_stats_t { worker_stats_t& operator +=(const worker_stats_t& other); - void dump(Formatter *f) const; + void dump(Formatter *f, unsigned num_shards) const; uint64_t ingress_obj = 0; uint64_t ingress_obj_bytes = 0; @@ -198,6 +198,9 @@ namespace rgw::dedup { uint64_t ingress_skip_too_small_bytes = 0; uint64_t ingress_skip_too_small = 0; + uint64_t ingress_skip_filtered_bucket = 0; + uint64_t ingress_skip_filtered_storage_class = 0; + utime_t duration = {0, 0}; }; std::ostream& operator<<(std::ostream &out, const worker_stats_t &s); diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index 7a00982a13b6..bb1170329f80 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -82,6 +82,7 @@ extern "C" { #include "rgw_account.h" #include "rgw_bucket_logging.h" #include "rgw_dedup_cluster.h" +#include "rgw_dedup_filter.h" #include "services/svc_sync_modules.h" #include "services/svc_cls.h" #include "services/svc_bilog_rados.h" @@ -502,6 +503,11 @@ void usage() cout << " --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited\n"; cout << " --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited\n"; cout << " --stat display dedup throttle setting\n"; + cout << "\nDedup filter options:\n"; + cout << " --allow-bucket-list= file with bucket names to allow in dedup (mutually exclusive with --deny-bucket-list)\n"; + cout << " --deny-bucket-list= file with bucket names to deny in dedup (mutually exclusive with --allow-bucket-list)\n"; + cout << " --allow-storage-class-list= file with storage class names to allow in dedup (mutually exclusive with --deny-storage-class-list)\n"; + cout << " --deny-storage-class-list= file with storage class names to deny in dedup (mutually exclusive with --allow-storage-class-list)\n"; cout << "\nQuota options:\n"; cout << " --max-objects specify max objects (negative value to disable)\n"; cout << " --max-size specify max size (in B/K/M/G/T, negative value to disable)\n"; @@ -3777,6 +3783,10 @@ int main(int argc, const char **argv) bool have_max_read_bytes = false; bool have_max_bucket_index_ops = false; bool have_max_metadata_ops = false; + std::string allow_bucket_list_file; + std::string deny_bucket_list_file; + std::string allow_storage_class_list_file; + std::string deny_storage_class_list_file; int include_all = false; int allow_unordered = false; @@ -4115,6 +4125,14 @@ int main(int argc, const char **argv) return EINVAL; } have_max_metadata_ops = true; + } else if (ceph_argparse_witharg(args, i, &val, "--allow-bucket-list", (char*)NULL)) { + allow_bucket_list_file = val; + } else if (ceph_argparse_witharg(args, i, &val, "--deny-bucket-list", (char*)NULL)) { + deny_bucket_list_file = val; + } else if (ceph_argparse_witharg(args, i, &val, "--allow-storage-class-list", (char*)NULL)) { + allow_storage_class_list_file = val; + } else if (ceph_argparse_witharg(args, i, &val, "--deny-storage-class-list", (char*)NULL)) { + deny_storage_class_list_file = val; } else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) { date = val; if (end_date.empty()) @@ -9448,7 +9466,19 @@ next: #endif } - int ret = cluster::dedup_restart_scan(store, dedup_type, dpp()); + // Build the dedup filter from the supplied file paths + dedup_filter_t dedup_filter(allow_bucket_list_file, deny_bucket_list_file, + allow_storage_class_list_file, + deny_storage_class_list_file, dpp()); + int filter_err = dedup_filter.errcode(); + if (filter_err != 0) { + cerr << "ERROR: failed to build dedup filter: " + << cpp_strerror(filter_err) << std::endl; + return filter_err; + } + + int ret = cluster::dedup_restart_scan(store, dedup_type, dpp(), + dedup_filter.is_active() ? &dedup_filter : nullptr); if (ret == 0) { std::cout << "Dedup was restarted successfully" << std::endl; } diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 0a691e71c9c8..008aecd56cbf 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -364,6 +364,12 @@ --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited --stat display dedup throttle setting + Dedup filter options: + --allow-bucket-list= file with bucket names to allow in dedup (mutually exclusive with --deny-bucket-list) + --deny-bucket-list= file with bucket names to deny in dedup (mutually exclusive with --allow-bucket-list) + --allow-storage-class-list= file with storage class names to allow in dedup (mutually exclusive with --deny-storage-class-list) + --deny-storage-class-list= file with storage class names to deny in dedup (mutually exclusive with --allow-storage-class-list) + Quota options: --max-objects specify max objects (negative value to disable) --max-size specify max size (in B/K/M/G/T, negative value to disable) diff --git a/src/test/rgw/dedup/test_dedup.py b/src/test/rgw/dedup/test_dedup.py index 25177a10ebb1..c8ffa50ddb66 100644 --- a/src/test/rgw/dedup/test_dedup.py +++ b/src/test/rgw/dedup/test_dedup.py @@ -110,6 +110,19 @@ def get_buckets(num_buckets): #============================================== +#------------------------------------------------------------------------------- +def verify_no_forgotten_buckets(conn): + bucket_count = 0 + response = conn.list_buckets() + # The 'Buckets' key always exists in a successful response + for bucket in response['Buckets']: + log.warning("Forgotten bucket name = %s", bucket['Name']) + conn.delete_bucket(Bucket=bucket['Name']) + bucket_count += 1 + + return bucket_count == 0 + + g_tenant_connections=[] g_tenants=[] g_simple_connection=[] @@ -120,10 +133,12 @@ def close_all_connections(): for conn in g_simple_connection: log.debug("close simple connection") + verify_no_forgotten_buckets(conn) conn.close() for conn in g_tenant_connections: log.debug("close tenant connection") + verify_no_forgotten_buckets(conn) conn.close() #----------------------------------------------- @@ -544,11 +559,12 @@ def delete_bucket_with_all_objects(bucket_name, conn): conn.delete_bucket(Bucket=bucket_name) #------------------------------------------------------------------------------- -def verify_pool_is_empty(): +def verify_pool_is_empty(conn, skip_bucket_check=False): result = admin(['gc', 'process', '--include-all']) assert result[1] == 0 assert count_object_parts_in_all_buckets(False, 0) == 0 - + if not skip_bucket_check: + assert verify_no_forgotten_buckets(conn) #------------------------------------------------------------------------------- def cleanup(bucket_name, conn): @@ -556,7 +572,7 @@ def cleanup(bucket_name, conn): log.debug("delete_all_objects for bucket <%s>",bucket_name) delete_bucket_with_all_objects(bucket_name, conn) - verify_pool_is_empty() + verify_pool_is_empty(conn) #------------------------------------------------------------------------------- @@ -566,7 +582,9 @@ def cleanup_all_buckets(bucket_names, conns): log.debug("delete_all_objects for bucket <%s>",bucket_name) delete_bucket_with_all_objects(bucket_name, conn) - verify_pool_is_empty() + verify_pool_is_empty(conns[0], True) + for conn in conns: + assert verify_no_forgotten_buckets(conn) #------------------------------------------------------------------------------- @@ -2252,8 +2270,8 @@ def dedup_copy_internal(multi_buckets): finally: # cleanup must be executed even after a failure if multi_buckets: - for bucket_name in bucket_names: - cleanup(bucket_name, conn) + conns=[conn]*len(bucket_names) + cleanup_all_buckets(bucket_names, conns) else: cleanup(bucket_names[0], conn) @@ -2291,10 +2309,13 @@ def test_copy_after_dedup(): bucket_cp= gen_bucket_name() bucket_names=[] + conns=[] try: conn = get_single_connection() conn.create_bucket(Bucket=bucket_cp) bucket_names=create_buckets(conn, max_copies_count) + # need a vector holding multiple copies of conns to support + # upload_objects_multi()/verify_objects_multi() conns=[conn] * max_copies_count indices=[0] * len(files) ret=upload_objects_multi(files, conns, bucket_names, indices, config) @@ -2323,7 +2344,7 @@ def test_copy_after_dedup(): # object and linking to the existing tail-objects assert (expected_results + cp_head_count) == count_object_parts_in_all_buckets(False, 0) # delete the original objects and verify server-side-copy objects are valid - for (bucket_name, conn) in zip(bucket_names, conns): + for bucket_name in bucket_names: delete_bucket_with_all_objects(bucket_name, conn) result = admin(['gc', 'process', '--include-all']) @@ -2333,13 +2354,14 @@ def test_copy_after_dedup(): # At this point the original obejcts are all removed # Objects created by server-side-copy should keep the tail in place - # because of teh refcount + # because of the refcount verify_objects_copy(bucket_cp, files, conn, expected_results, config) finally: # cleanup must be executed even after a failure delete_bucket_with_all_objects(bucket_cp, conn) - cleanup_all_buckets(bucket_names, conns) + if len(bucket_names) > 0: + cleanup_all_buckets(bucket_names, conns) #------------------------------------------------------------------------------- @pytest.mark.basic_test @@ -3140,29 +3162,6 @@ def test_dedup_large_scale(): threads_dedup_basic_with_tenants_common(files, num_threads, config, False) -#------------------------------------------------------------------------------- -@pytest.mark.basic_test -def test_empty_bucket(): - if full_dedup_is_disabled(): - return - - prepare_test() - log.debug("test_empty_bucket: connect to AWS ...") - - max_copies_count=2 - config = default_config - - files=[] - try: - ret=gen_connections_multi2(max_copies_count) - tenants=ret[0] - bucket_names=ret[1] - conns=ret[2] - finally: - # cleanup must be executed even after a failure - cleanup_all_buckets(bucket_names, conns) - - #------------------------------------------------------------------------------- def inc_step_with_tenants(stats_base, files, conns, bucket_names, config): max_copies_count=len(conns) @@ -3657,3 +3656,363 @@ def test_dedup_identical_copies_multipart_small(): log.info("test_dedup_identical_copies_multipart:full test") __test_dedup_identical_copies(files, config, dry_run, verify, force_clean) + +#=============================================================================== +# Test Group 1: Filter File Parsing / Name Validation +#=============================================================================== +#------------------------------------------------------------------------------- +def write_filter_list_file(filepath, lines): + """Write a filter list file with the given lines, one per line.""" + with open(filepath, 'w') as f: + for line in lines: + f.write(line + '\n') + + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_filter_bucket_list_parsing(): + """Validate CLI parsing of bucket filter list files (allow/deny). + Verify that illegal files are rejected + """ + prepare_test() + try: + # 1. Mutual exclusivity: --allow-bucket-list and --deny-bucket-list together. + allow_file = OUT_DIR + "allow_buckets.txt" + deny_file = OUT_DIR + "deny_buckets.txt" + write_filter_list_file(allow_file, ['my-bucket-1']) + write_filter_list_file(deny_file, ['my-bucket-2']) + result = admin(['dedup', 'estimate', + '--allow-bucket-list', allow_file, + '--deny-bucket-list', deny_file]) + assert result[1] != 0, "Expected failure when both allow and deny lists are given" + os.remove(allow_file) + os.remove(deny_file) + + # 2. Non-existent file path. + result = admin(['dedup', 'estimate', '--allow-bucket-list', + '/nonexistent/bucket_list.txt']) + assert result[1] != 0, "Expected failure for non-existent filter file" + finally: + cleanup_local() + + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_filter_storage_class_list_parsing(): + """Validate CLI parsing of storage_class filter list files (allow/deny). + Verify that illegal files are rejected + """ + prepare_test() + try: + # 1. Mutual exclusivity: --allow-storage-class-list and --deny-storage-class-list together. + allow_file = OUT_DIR + "allow_storage_classs.txt" + deny_file = OUT_DIR + "deny_storage_classs.txt" + write_filter_list_file(allow_file, ['STORAGECLASS1']) + write_filter_list_file(deny_file, ['STORAGECLASS2']) + result = admin(['dedup', 'estimate', + '--allow-storage-class-list', allow_file, + '--deny-storage-class-list', deny_file]) + assert result[1] != 0, "Expected failure when both allow and deny lists are given" + os.remove(allow_file) + os.remove(deny_file) + + # 2. Non-existent file path. + result = admin(['dedup', 'estimate', '--allow-storage-class-list', + '/nonexistent/storage_class_list.txt']) + assert result[1] != 0, "Expected failure for non-existent filter file" + finally: + cleanup_local() + + +#------------------------------------------------------------------------------- +def read_filter_skip_stats(): + """Read ingress_skip_filtered_bucket/storage_class from dedup stats JSON.""" + result = admin(['dedup', 'stats']) + assert result[1] == 0 + jstats = json.loads(result[0]) + worker_stats = jstats['worker_stats'] + skipped = worker_stats['skipped'] + skip_bucket = skipped.get('Ingress skip: filtered bucket', 0) + skip_sc = skipped.get('Ingress skipped filtered storage class, num objects skipped', 0) + return (skip_bucket, skip_sc) + +#------------------------------------------------------------------------------- +def exec_dedup_with_filter(dry_run, deny_bucket_list=None, allow_bucket_list=None, + deny_storage_class_list=None, allow_storage_class_list=None, + max_dedup_time=300): + cmd = ['dedup', 'estimate' if dry_run else 'exec'] + if not dry_run: + cmd += ['--yes-i-really-mean-it'] + if deny_bucket_list: + cmd += ['--deny-bucket-list', deny_bucket_list] + if allow_bucket_list: + cmd += ['--allow-bucket-list', allow_bucket_list] + if deny_storage_class_list: + cmd += ['--deny-storage-class-list', deny_storage_class_list] + if allow_storage_class_list: + cmd += ['--allow-storage-class-list', allow_storage_class_list] + + result = admin(cmd) + assert result[1] == 0 + dedup_time = 0 + dedup_timeout = 3 + while dedup_time < max_dedup_time: + time.sleep(dedup_timeout) + dedup_time += dedup_timeout + ret = read_dedup_stats(dry_run) + if ret[0]: # completed + return ret + + assert False + + + +#============================================================================== +# Test Group 2: Storage-Class list Filters +#=============================================================================== + +#------------------------------------------------------------------------------- +def dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow): + """Verify that objects whose storage class is denied are skipped during estimate. + Denying STANDARD means every object is filtered + Allowing STANDARD means nothing is filtered + """ + prepare_test() + config=default_config + filter_file = OUT_DIR + "deny_storage_classs.txt" + bucket_name = gen_bucket_name() + conn=get_single_connection() + files=[] + num_files=7 + base_size = 2*MB + log.debug("generate files: base size=%d MiB, max_size=%d MiB", + base_size/MB, (pow(2, num_files) * base_size)/MB) + gen_files(files, base_size, num_files) + expected_dedup_stats = Dedup_Stats() # start with an empty-stats + split_head_objs=0 + rados_objects_total=0 + + try: + obj_count = 0 + bucket = conn.create_bucket(Bucket=bucket_name) + + for f in files: + filename=f[0] + obj_size=f[1] + num_copies=f[2] + + if filter_mode_allow: + split_head_objs += calc_split_objs_count(obj_size, num_copies, config) + calc_expected_stats(expected_dedup_stats, obj_size, num_copies, config) + else: + rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config) + rados_objects_total += (rados_obj_count * num_copies) + + for i in range(0, num_copies): + key = gen_object_name(filename, i) + log.debug("upload: %s -> %s", OUT_DIR + filename, key) + obj_count += 1 + conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config) + + + write_filter_list_file(filter_file, ['STANDARD']) + if filter_mode_allow: + ret = exec_dedup_with_filter(dry_run, allow_storage_class_list=filter_file) + else: + ret = exec_dedup_with_filter(dry_run, deny_storage_class_list=filter_file) + + dedup_stats=ret[1] + dedup_ratio_estimate=ret[2] + + (skip_bucket, skip_sc) = read_filter_skip_stats() + assert skip_bucket == 0 + log.debug("filtered storage class, num objects skipped = %d", skip_sc) + if filter_mode_allow: + assert skip_sc == 0 + if not dry_run: + # the number of left rados-object post-dedup should equal the expected_results + # since we didn't filter any object + expected_results=calc_expected_results(files, config) + expected_results += split_head_objs + log.debug("expected_results=%d, split_head_objs=%d", expected_results, split_head_objs) + assert expected_results == count_object_parts_in_all_buckets(False) + else: + assert skip_sc == obj_count + # the number of left rados-object post-dedup should equal the rados_obj_count + # pre-dedup count since all objects were filtered out + assert rados_objects_total == count_object_parts_in_all_buckets(False) + + if dry_run: + reset_full_dedup_stats(expected_dedup_stats) + + expected_dedup_stats.size_before_dedup = dedup_stats.size_before_dedup + assert dedup_stats == expected_dedup_stats + finally: + cleanup_local() + try: + delete_bucket_with_all_objects(bucket_name, conn) + except Exception as e: + log.warning("Failed to cleanup bucket %s: %s", bucket_name, e) + + verify_pool_is_empty(conn) + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_filter_storage_class_estimate(): + dry_run=True + + log.info("dedup_filter_storage_class_estimate: filter_mode_allow") + dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=True) + + log.info("dedup_filter_bucket_estimate: filter_mode_deny") + dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=False) + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_filter_storage_class_exec(): + dry_run=False + + log.info("dedup_filter_storage_class_exec: filter_mode_allow") + dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=True) + + log.info("dedup_filter_storage_class_exec: filter_mode_deny") + dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=False) + + +#============================================================================== +# Test Group 3: Bucket list Filters +#=============================================================================== + +#------------------------------------------------------------------------------- +def dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow): + """ + Upload identical objects to 4 buckets. Deny bucket_a and bucket_b. + Verify: + - ingress_skip_filtered_bucket == 2 (two bucket skipped) + - dedup estimate reflects only bucket_c + bucket_d (2 visible copies) + - Rados pool has MORE tail objects than a full (unfiltered) dedup would leave, + because bucket_a and bucket_b tails were not deduplicated + """ + prepare_test() + config=default_config + filter_file = OUT_DIR + 'filter_bucket_list.txt' + conn = get_single_connection() + files = [] + num_files = 11 + bucket_a = gen_bucket_name() + bucket_b = gen_bucket_name() + bucket_c = gen_bucket_name() + bucket_d = gen_bucket_name() + + filtered_bucket_names = [bucket_a, bucket_b] + num_filtered=len(filtered_bucket_names) + visible_bucket_names = [bucket_c, bucket_d] + num_visible=len(visible_bucket_names) + bucket_names = filtered_bucket_names + visible_bucket_names + log.debug("filtered=%s, visible=%s, buckets=%s", + filtered_bucket_names, visible_bucket_names, bucket_names) + + base_size = 16*KB + log.debug("generate files: base size=%d MiB, max_size=%d MiB", + base_size/MB, (pow(2, num_files) * base_size)/MB) + gen_files(files, base_size, num_files) + + try: + if filter_mode_allow: + # forgotten buckets from previous tests will cause skip_bucket count + # to be too high in allow_mode (as they won't appear in the allow list) + verify_no_forgotten_buckets(conn) + + for b in bucket_names: + conn.create_bucket(Bucket=b) + + rados_filtered=0 + for f in files: + filename=f[0] + obj_size=f[1] + rados_obj_count = calc_rados_obj_count(1, obj_size, config) + rados_filtered += (rados_obj_count * num_filtered) + for i, bkt in enumerate(filtered_bucket_names): + key = gen_object_name("filtered" + filename, i) + conn.upload_file(OUT_DIR + filename, bkt, key, Config=config) + + log.debug("rados_filtered=%d", rados_filtered) + assert rados_filtered == count_object_parts_in_all_buckets(False, 0) + + # Build expected stats for only the VISIBLE copies + expected_dedup_stats = Dedup_Stats() + rados_visible=0 + rados_visible_post_dedup=0 + split_head_objs=0 + for f in files: + filename=f[0] + obj_size=f[1] + calc_expected_stats(expected_dedup_stats, obj_size, num_visible, config) + rados_obj_count = calc_rados_obj_count(1, obj_size, config) + split_head = calc_split_objs_count(obj_size, num_visible, config) + split_head_objs += split_head + tail_count = ((rados_obj_count + split_head) - 1) + rados_visible_post_dedup += (tail_count + num_visible) + rados_visible += (rados_obj_count*num_visible) + for i, bkt in enumerate(visible_bucket_names): + key = gen_object_name("visible" + filename, i) + conn.upload_file(OUT_DIR + filename, bkt, key, Config=config) + + log.debug("rados_visible=%d, split_head_objs=%d", rados_visible, split_head_objs) + assert (rados_filtered + rados_visible) == count_object_parts_in_all_buckets(False, 0) + + if filter_mode_allow: + # Write the allow-list (only bucket_b and bucket_c are allowed) + write_filter_list_file(filter_file, visible_bucket_names) + + # Run dedup with allow filter + ret = exec_dedup_with_filter(dry_run, allow_bucket_list=filter_file) + else: + # Write the deny-list (only bucket_a is denied) + write_filter_list_file(filter_file, filtered_bucket_names) + + # Run dedup with deny filter + ret = exec_dedup_with_filter(dry_run, deny_bucket_list=filter_file) + + if not dry_run: + result = admin(['gc', 'process', '--include-all']) + assert result[1] == 0 + actual_rados = count_object_parts_in_all_buckets(False, 0) + log.debug("rados_filtered=%d, rados_visible_post_dedup=%d, combined=%d", + rados_filtered, rados_visible_post_dedup, + (rados_filtered + rados_visible_post_dedup)) + assert actual_rados == (rados_filtered + rados_visible_post_dedup) + + (skip_bucket, skip_sc) = read_filter_skip_stats() + assert skip_sc == 0 + assert skip_bucket == num_filtered + finally: + cleanup_local() + for b in bucket_names: + try: + delete_bucket_with_all_objects(b, conn) + except Exception as e: + log.warning("Failed to cleanup bucket %s: %s", b, e) + + verify_pool_is_empty(conn) + + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_filter_bucket_estimate(): + dry_run=True + log.info("dedup_filter_bucket_estimate: filter_mode_deny") + dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=False) + + log.info("dedup_filter_bucket_estimate: filter_mode_allow") + dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=True) + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_filter_bucket_exec(): + dry_run=False + log.info("dedup_filter_bucket_exec: filter_mode_deny") + dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=False) + + log.info("dedup_filter_bucket_exec: filter_mode_allow") + dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=True)