- ``radosgw-admin dedup throttle --stat``:
Displays dedup throttle setting.
+The ``dedup estimate`` and ``dedup exec`` commands also accept filter options:
+
+- ``--allow-bucket-list <file>``:
+ Path to a file listing bucket names to include (allowlist mode).
+ Only buckets listed in the file will be processed.
+ Mutually exclusive with ``--deny-bucket-list``.
+
+- ``--deny-bucket-list <file>``:
+ Path to a file listing bucket names to exclude (denylist mode).
+ All buckets except those listed in the file will be processed.
+ Mutually exclusive with ``--allow-bucket-list``.
+
+- ``--allow-storage-class-list <file>``:
+ Path to a file listing storage class names to include (allowlist mode).
+ Mutually exclusive with ``--deny-storage-class-list``.
+
+- ``--deny-storage-class-list <file>``:
+ Path to a file listing storage class names to exclude (denylist mode).
+ Mutually exclusive with ``--allow-storage-class-list``.
+
+**File format:** One name per line. Lines starting with or containing ``#``
+are treated as comments. Whitespace is ignored.
+
Skipped Objects
===============
driver/rados/config/zone.cc
driver/rados/config/zonegroup.cc
driver/rados/rgw_dedup.cc
+ driver/rados/rgw_dedup_filter.cc
driver/rados/rgw_dedup_table.cc
driver/rados/rgw_dedup_store.cc
driver/rados/rgw_dedup_utils.cc
p_worker_stats->ingress_obj_bytes += ondisk_byte_size;
// We limit dedup to objects from the same storage_class
- // TBD-Future:
- // Should we use a skip-list of storage_classes we should skip (like glacier) ?
const std::string& storage_class =
rgw_placement_rule::get_canonical_storage_class(entry.meta.storage_class);
+ if (!d_filter.allow_storage_class(storage_class)) {
+ ldpp_dout(dpp, 20) << __func__ << "::skip storage_class (filter): "
+ << storage_class << dendl;
+ p_worker_stats->ingress_skip_filtered_storage_class++;
+ return 0;
+ }
if (storage_class == RGW_STORAGE_CLASS_STANDARD) {
p_worker_stats->default_storage_class_objs++;
p_worker_stats->default_storage_class_objs_bytes += ondisk_byte_size;
continue;
}
ldpp_dout(dpp, 20) <<__func__ << "::bucket=" << bucket << dendl;
+ if (!d_filter.allow_bucket(bucket.name)) {
+ ldpp_dout(dpp, 10) << __func__ << "::worker_id=" << worker_id
+ << "::skip bucket (filter): " << bucket.name << dendl;
+ p_worker_stats->ingress_skip_filtered_bucket++;
+ continue;
+ }
ret = ingress_bucket_objects_single_shard(disk_arr, bucket, worker_id,
num_work_shards, p_worker_stats);
if (unlikely(ret != 0)) {
break;
case URGENT_MSG_RESTART:
if (!d_ctl.dedup_exec) {
+ // Decode optional filter (may not be present for older senders)
+ {
+ bool has_filter = false;
+ try {
+ ceph::decode(has_filter, bl_iter);
+ if (has_filter) {
+ decode(d_filter, bl_iter);
+ ldpp_dout(dpp, 5) << __func__ << "::RESTART with filter" << dendl;
+ }
+ else {
+ d_filter = dedup_filter_t{};
+ }
+ } catch (buffer::error&) {
+ // older sender without filter - reset to no-filter
+ d_filter = dedup_filter_t{};
+ }
+ }
d_ctl.remote_restart_req = true;
d_cond.notify_all();
}
#include "rgw_dedup_utils.h"
#include "rgw_dedup_table.h"
#include "rgw_dedup_cluster.h"
+#include "rgw_dedup_filter.h"
#include "rgw_realm_reloader.h"
#include <string>
#include <unordered_map>
bool d_split_head = true;
uint32_t d_head_object_size = (4ULL * 1024 * 1024);
control_t d_ctl;
+ dedup_filter_t d_filter;
uint64_t d_watch_handle = 0;
DedupWatcher d_watcher_ctx;
collect_single_shard_stats(dpp, owner_map, sp_arr.data(), shard, &show_time, "WORKER");
}
Formatter::ObjectSection worker_stats(*fmt, "worker_stats");
- wrk_stats_sum.dump(fmt);
+ wrk_stats_sum.dump(fmt, num_work_shards);
show_incomplete_shards_fmt(has_incomplete_shards, num_work_shards, sp_arr.data(), fmt);
md5_start_time = show_time_func_fmt(epoch.time, show_time, owner_map, fmt);
}
// command-line called from radosgw-admin.cc
int cluster::dedup_restart_scan(rgw::sal::RadosStore *store,
dedup_req_type_t dedup_type,
- const DoutPrefixProvider *dpp)
+ const DoutPrefixProvider *dpp,
+ const dedup_filter_t *p_filter)
{
ldpp_dout(dpp, 1) << __func__ << "::dedup_type = " << dedup_type << dendl;
ret = swap_epoch(store, dpp, &old_epoch, dedup_type, 0, 0);
if (ret == 0) {
ldpp_dout(dpp, 10) << __func__ << "::Epoch object was reset" << dendl;
- return dedup_control(store, dpp, URGENT_MSG_RESTART);
+
+ // Build the RESTART bufferlist, optionally including the filter
+ bufferlist urgent_msg_bl;
+ ceph::encode(URGENT_MSG_RESTART, urgent_msg_bl);
+ bool has_filter = (p_filter != nullptr);
+ ceph::encode(has_filter, urgent_msg_bl);
+ if (has_filter) {
+ const auto& bucket_set = p_filter->get_bucket_filter();
+ ldpp_dout(dpp, 20) << __func__ << "::bucket_set.size()="
+ << bucket_set.size() << dendl;
+ for (const auto& name : bucket_set) {
+ ldpp_dout(dpp, 20) << __func__ << "::bucket_set::" << name << dendl;
+ }
+
+ const auto& sc_vec = p_filter->get_storage_class_filter();
+ ldpp_dout(dpp, 20) << __func__ << "::sc_vec.size()="
+ << sc_vec.size() << dendl;
+ for (const auto& name : sc_vec) {
+ ldpp_dout(dpp, 20) << __func__ << "::sc_vec::" << name << dendl;
+ }
+
+ encode(*p_filter, urgent_msg_bl);
+ }
+ return dedup_control_bl(store, dpp, URGENT_MSG_RESTART, urgent_msg_bl);
}
else {
return ret;
#include "common/dout.h"
#include "rgw_dedup_utils.h"
#include "rgw_dedup_store.h"
+#include "rgw_dedup_filter.h"
#include <string>
namespace rgw::dedup {
urgent_msg_t urgent_msg);
static int dedup_restart_scan(rgw::sal::RadosStore *store,
dedup_req_type_t dedup_type,
- const DoutPrefixProvider *dpp);
+ const DoutPrefixProvider *dpp,
+ const dedup_filter_t *p_filter = nullptr);
//---------------------------------------------------------------------------
int mark_work_shard_token_completed(rgw::sal::RadosStore *store,
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2;
+// vim: ts=8 sw=2 sts=2 expandtab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Author: Gabriel BenHanokh <gbenhano@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "rgw_dedup_filter.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "include/encoding.h"
+#include "rgw_rest_s3.h"
+#include <cctype>
+#include <fstream>
+#include <iostream>
+#include <string>
+
+#define dout_subsys ceph_subsys_rgw_dedup
+
+namespace rgw::dedup {
+
+ //---------------------------------------------------------------------------
+ bool dedup_filter_t::allow_bucket(const std::string& bucket_name) const
+ {
+ switch (bucket_mode) {
+ case filter_mode_t::FILTER_NONE:
+ return true;
+ case filter_mode_t::FILTER_ALLOW:
+ return bucket_set.contains(bucket_name);
+ case filter_mode_t::FILTER_DENY:
+ return !bucket_set.contains(bucket_name);
+ default:
+ return true;
+ }
+ }
+
+ //---------------------------------------------------------------------------
+ bool dedup_filter_t::allow_storage_class(const std::string& storage_class) const
+ {
+ if (storage_class_mode == filter_mode_t::FILTER_NONE) {
+ return true;
+ }
+
+ auto it = std::find(sc_vec.begin(), sc_vec.end(), storage_class);
+ if (storage_class_mode == filter_mode_t::FILTER_ALLOW) {
+ return it != sc_vec.end();
+ }
+ else {
+ return it == sc_vec.end();
+ }
+ }
+
+ //---------------------------------------------------------------------------
+ static inline int valid_bucket_name(const std::string& name)
+ {
+ // per Casey instructions - nothing should be filtered here
+ return 0;
+
+ // return -(valid_s3_bucket_name(name, true /* relaxed */));
+ }
+
+ //---------------------------------------------------------------------------
+ static int valid_storage_class_name(const std::string& name)
+ {
+ // per Casey instructions - nothing should be filtered here
+ return 0;
+ }
+
+ //---------------------------------------------------------------------------
+ // Private static helper: read filter file, one name per line.
+ int dedup_filter_t::read_filter_file(const std::string& path,
+ std::unordered_set<std::string>& name_set,
+ int (*validator)(const std::string&),
+ const DoutPrefixProvider* dpp)
+ {
+ std::ifstream f(path);
+ if (!f.is_open()) {
+ ldpp_dout(dpp, 1) << __func__ << ":: failed to open filter file: " << path << dendl;
+ return ENOENT;
+ }
+
+ std::string line;
+ int line_num = 0;
+ while (std::getline(f, line)) {
+ line_num++;
+
+ // Strip comment (everything from '#' onward)
+ auto comment_pos = line.find('#');
+ if (comment_pos != std::string::npos) {
+ line = line.substr(0, comment_pos);
+ }
+
+ // Trim leading and trailing whitespace
+ const char* ws = " \t\r\n";
+ auto start = line.find_first_not_of(ws);
+ if (start == std::string::npos) {
+ continue; // blank line after stripping
+ }
+
+ auto end = line.find_last_not_of(ws);
+ std::string name = line.substr(start, end - start + 1);
+ if (!name.empty()) {
+ int ret = validator(name);
+ if (ret == 0) {
+ ldpp_dout(dpp, 20) << __func__ << "::" << name << dendl;
+ name_set.insert(std::move(name));
+ }
+ else {
+ ldpp_dout(dpp, 1) << __func__ << "::" << path << "::" << line_num
+ << "::invalid name '" << name << "'" << dendl;
+ return EINVAL;
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ //---------------------------------------------------------------------------
+ dedup_filter_t::dedup_filter_t(const std::string& allow_bucket_file,
+ const std::string& deny_bucket_file,
+ const std::string& allow_sc_file,
+ const std::string& deny_sc_file,
+ const DoutPrefixProvider* dpp)
+ {
+ // Validate mutual exclusivity
+ if (!allow_bucket_file.empty() && !deny_bucket_file.empty()) {
+ ldpp_dout(dpp, 1) << __func__
+ << ":: --allow-bucket-list and --deny-bucket-list are mutually exclusive"
+ << dendl;
+ d_errcode = EINVAL;
+ return;
+ }
+ if (!allow_sc_file.empty() && !deny_sc_file.empty()) {
+ ldpp_dout(dpp, 1) << __func__
+ << ":: --allow-storage-class-list and --deny-storage-class-list are mutually exclusive"
+ << dendl;
+ d_errcode = EINVAL;
+ return;
+ }
+
+ // Bucket filter
+ if (!allow_bucket_file.empty()) {
+ d_errcode = read_filter_file(allow_bucket_file, bucket_set,
+ valid_bucket_name, dpp);
+ if (d_errcode != 0) {
+ return;
+ }
+ bucket_mode = filter_mode_t::FILTER_ALLOW;
+ }
+ else if (!deny_bucket_file.empty()) {
+ d_errcode = read_filter_file(deny_bucket_file, bucket_set,
+ valid_bucket_name, dpp);
+ if (d_errcode != 0) {
+ return;
+ }
+ bucket_mode = filter_mode_t::FILTER_DENY;
+ }
+
+ // Storage-class filter
+ // First, read the filtered storage-class names into a set to prevent duplicates
+ // and then move them into a vector for better efficiency.
+ std::unordered_set<std::string> name_set;
+ if (!allow_sc_file.empty()) {
+ d_errcode = read_filter_file(allow_sc_file, name_set,
+ valid_storage_class_name, dpp);
+ if (d_errcode != 0) {
+ return;
+ }
+ storage_class_mode = filter_mode_t::FILTER_ALLOW;
+ }
+ else if (!deny_sc_file.empty()) {
+ d_errcode = read_filter_file(deny_sc_file, name_set,
+ valid_storage_class_name, dpp);
+ if (d_errcode != 0) {
+ return;
+ }
+ storage_class_mode = filter_mode_t::FILTER_DENY;
+ }
+ // move elements from set to vector
+ sc_vec.assign(name_set.begin(), name_set.end());
+ }
+
+ //---------------------------------------------------------------------------
+ void encode(const dedup_filter_t& f, ceph::bufferlist& bl)
+ {
+ ENCODE_START(1, 1, bl);
+ encode(static_cast<uint8_t>(f.bucket_mode), bl);
+ encode(f.bucket_set, bl);
+ encode(static_cast<uint8_t>(f.storage_class_mode), bl);
+ encode(f.sc_vec, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ //---------------------------------------------------------------------------
+ void decode(dedup_filter_t& f, ceph::bufferlist::const_iterator& bl)
+ {
+ DECODE_START(1, bl);
+ uint8_t mode;
+ decode(mode, bl);
+ f.bucket_mode = static_cast<filter_mode_t>(mode);
+ decode(f.bucket_set, bl);
+ decode(mode, bl);
+ f.storage_class_mode = static_cast<filter_mode_t>(mode);
+ decode(f.sc_vec, bl);
+ DECODE_FINISH(bl);
+ }
+
+} // namespace rgw::dedup
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2;
+// vim: ts=8 sw=2 sts=2 expandtab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Author: Gabriel BenHanokh <gbenhano@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+#include "common/dout.h"
+#include "include/rados/buffer.h"
+#include "include/encoding.h"
+#include <string>
+#include <unordered_set>
+
+namespace rgw::dedup {
+
+ enum class filter_mode_t : uint8_t {
+ FILTER_NONE, // no filter active
+ FILTER_ALLOW, // allowlist: only listed items pass
+ FILTER_DENY // denylist: listed items are blocked
+ };
+
+ struct dedup_filter_t {
+ friend void encode(const dedup_filter_t& f, ceph::bufferlist& bl);
+ friend void decode(dedup_filter_t& f, ceph::bufferlist::const_iterator& bl);
+
+ // Default constructor: no filter (all buckets/storage classes pass)
+ dedup_filter_t() = default;
+
+ // Constructor from file paths. Empty string = no filter for that dimension.
+ // allow and deny are mutually exclusive per dimension.
+ // Check errcode() after construction to detect any error.
+ dedup_filter_t(const std::string& allow_bucket_file,
+ const std::string& deny_bucket_file,
+ const std::string& allow_sc_file,
+ const std::string& deny_sc_file,
+ const DoutPrefixProvider* dpp);
+
+ // Returns 0 on success, errno if construction failed.
+ int errcode() const { return d_errcode; }
+
+ // Returns true if any filter dimension is active.
+ bool is_active() const {
+ return (bucket_mode != filter_mode_t::FILTER_NONE ||
+ storage_class_mode != filter_mode_t::FILTER_NONE);
+ }
+
+ // Returns true if the bucket should be processed
+ bool allow_bucket(const std::string& bucket_name) const;
+ // Returns true if the storage class should be processed
+ bool allow_storage_class(const std::string& storage_class) const;
+
+ const std::unordered_set<std::string>& get_bucket_filter() const {
+ return bucket_set;
+ }
+
+ const std::vector<std::string>& get_storage_class_filter() const {
+ return sc_vec;
+ }
+
+ private:
+ // Read filter file: one name per line, '#' starts a comment, whitespace trimmed
+ // On success returns 0 and populates name_set; on error returns errno code.
+ static int read_filter_file(const std::string& path,
+ std::unordered_set<std::string>& name_set,
+ int (*validator)(const std::string&),
+ const DoutPrefixProvider* dpp);
+
+ filter_mode_t bucket_mode = filter_mode_t::FILTER_NONE;
+ // we can have many buckets, use unordered_set for a quick search
+ std::unordered_set<std::string> bucket_set;
+
+ filter_mode_t storage_class_mode = filter_mode_t::FILTER_NONE;
+ // there are only a few active storage_classes, a vector will suffice
+ std::vector<std::string> sc_vec;
+
+ int d_errcode = 0;
+ };
+
+ void encode(const dedup_filter_t& f, ceph::bufferlist& bl);
+ void decode(dedup_filter_t& f, ceph::bufferlist::const_iterator& bl);
+
+} // namespace rgw::dedup
this->ingress_corrupted_etag += other.ingress_corrupted_etag;
this->ingress_skip_too_small_bytes += other.ingress_skip_too_small_bytes;
this->ingress_skip_too_small += other.ingress_skip_too_small;
+ this->ingress_skip_filtered_bucket += other.ingress_skip_filtered_bucket;
+ this->ingress_skip_filtered_storage_class += other.ingress_skip_filtered_storage_class;
return *this;
}
//---------------------------------------------------------------------------
- void worker_stats_t::dump(Formatter *f) const
+ void worker_stats_t::dump(Formatter *f, unsigned num_shards) const
{
// main section
{
Formatter::ObjectSection main(*f, "main");
-
+ f->dump_unsigned("Num Work-Shards", num_shards);
f->dump_unsigned("Ingress Objs count", this->ingress_obj);
f->dump_unsigned("Accum byte size Ingress Objs", this->ingress_obj_bytes);
f->dump_unsigned("Egress Records count", this->egress_records);
f->dump_unsigned("Ingress skip: too small bytes",
this->ingress_skip_too_small_bytes);
}
+
+ if (this->ingress_skip_filtered_bucket && num_shards) {
+ // buckets are scanned once per worker-shard
+ f->dump_unsigned("Ingress skip: filtered bucket",
+ this->ingress_skip_filtered_bucket/num_shards);
+ }
+ if (this->ingress_skip_filtered_storage_class) {
+ f->dump_unsigned("Ingress skipped filtered storage class, num objects skipped",
+ this->ingress_skip_filtered_storage_class);
+ }
}
{
std::ostream& operator<<(std::ostream &out, const worker_stats_t &s)
{
JSONFormatter formatter(false);
- s.dump(&formatter);
+ s.dump(&formatter, 1);
std::stringstream sstream;
formatter.flush(sstream);
out << sstream.str();
encode(w.ingress_skip_too_small_bytes, bl);
encode(w.ingress_skip_too_small, bl);
+ encode(w.ingress_skip_filtered_bucket, bl);
+ encode(w.ingress_skip_filtered_storage_class, bl);
encode(w.duration, bl);
ENCODE_FINISH(bl);
decode(w.ingress_corrupted_etag, bl);
decode(w.ingress_skip_too_small_bytes, bl);
decode(w.ingress_skip_too_small, bl);
+ decode(w.ingress_skip_filtered_bucket, bl);
+ decode(w.ingress_skip_filtered_storage_class, bl);
decode(w.duration, bl);
DECODE_FINISH(bl);
struct worker_stats_t {
worker_stats_t& operator +=(const worker_stats_t& other);
- void dump(Formatter *f) const;
+ void dump(Formatter *f, unsigned num_shards) const;
uint64_t ingress_obj = 0;
uint64_t ingress_obj_bytes = 0;
uint64_t ingress_skip_too_small_bytes = 0;
uint64_t ingress_skip_too_small = 0;
+ uint64_t ingress_skip_filtered_bucket = 0;
+ uint64_t ingress_skip_filtered_storage_class = 0;
+
utime_t duration = {0, 0};
};
std::ostream& operator<<(std::ostream &out, const worker_stats_t &s);
#include "rgw_account.h"
#include "rgw_bucket_logging.h"
#include "rgw_dedup_cluster.h"
+#include "rgw_dedup_filter.h"
#include "services/svc_sync_modules.h"
#include "services/svc_cls.h"
#include "services/svc_bilog_rados.h"
cout << " --max-bucket-index-ops specify max bucket-index requests per second allowed for an RGW during dedup, 0 means unlimited\n";
cout << " --max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited\n";
cout << " --stat display dedup throttle setting\n";
+ cout << "\nDedup filter options:\n";
+ cout << " --allow-bucket-list=<file> file with bucket names to allow in dedup (mutually exclusive with --deny-bucket-list)\n";
+ cout << " --deny-bucket-list=<file> file with bucket names to deny in dedup (mutually exclusive with --allow-bucket-list)\n";
+ cout << " --allow-storage-class-list=<file> file with storage class names to allow in dedup (mutually exclusive with --deny-storage-class-list)\n";
+ cout << " --deny-storage-class-list=<file> file with storage class names to deny in dedup (mutually exclusive with --allow-storage-class-list)\n";
cout << "\nQuota options:\n";
cout << " --max-objects specify max objects (negative value to disable)\n";
cout << " --max-size specify max size (in B/K/M/G/T, negative value to disable)\n";
bool have_max_read_bytes = false;
bool have_max_bucket_index_ops = false;
bool have_max_metadata_ops = false;
+ std::string allow_bucket_list_file;
+ std::string deny_bucket_list_file;
+ std::string allow_storage_class_list_file;
+ std::string deny_storage_class_list_file;
int include_all = false;
int allow_unordered = false;
return EINVAL;
}
have_max_metadata_ops = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--allow-bucket-list", (char*)NULL)) {
+ allow_bucket_list_file = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--deny-bucket-list", (char*)NULL)) {
+ deny_bucket_list_file = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--allow-storage-class-list", (char*)NULL)) {
+ allow_storage_class_list_file = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--deny-storage-class-list", (char*)NULL)) {
+ deny_storage_class_list_file = val;
} else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) {
date = val;
if (end_date.empty())
#endif
}
- int ret = cluster::dedup_restart_scan(store, dedup_type, dpp());
+ // Build the dedup filter from the supplied file paths
+ dedup_filter_t dedup_filter(allow_bucket_list_file, deny_bucket_list_file,
+ allow_storage_class_list_file,
+ deny_storage_class_list_file, dpp());
+ int filter_err = dedup_filter.errcode();
+ if (filter_err != 0) {
+ cerr << "ERROR: failed to build dedup filter: "
+ << cpp_strerror(filter_err) << std::endl;
+ return filter_err;
+ }
+
+ int ret = cluster::dedup_restart_scan(store, dedup_type, dpp(),
+ dedup_filter.is_active() ? &dedup_filter : nullptr);
if (ret == 0) {
std::cout << "Dedup was restarted successfully" << std::endl;
}
--max-metadata-ops specify max metadata requests per second allowed for an RGW during dedup, 0 means unlimited
--stat display dedup throttle setting
+ Dedup filter options:
+ --allow-bucket-list=<file> file with bucket names to allow in dedup (mutually exclusive with --deny-bucket-list)
+ --deny-bucket-list=<file> file with bucket names to deny in dedup (mutually exclusive with --allow-bucket-list)
+ --allow-storage-class-list=<file> file with storage class names to allow in dedup (mutually exclusive with --deny-storage-class-list)
+ --deny-storage-class-list=<file> file with storage class names to deny in dedup (mutually exclusive with --allow-storage-class-list)
+
Quota options:
--max-objects specify max objects (negative value to disable)
--max-size specify max size (in B/K/M/G/T, negative value to disable)
#==============================================
+#-------------------------------------------------------------------------------
+def verify_no_forgotten_buckets(conn):
+ bucket_count = 0
+ response = conn.list_buckets()
+ # The 'Buckets' key always exists in a successful response
+ for bucket in response['Buckets']:
+ log.warning("Forgotten bucket name = %s", bucket['Name'])
+ conn.delete_bucket(Bucket=bucket['Name'])
+ bucket_count += 1
+
+ return bucket_count == 0
+
+
g_tenant_connections=[]
g_tenants=[]
g_simple_connection=[]
for conn in g_simple_connection:
log.debug("close simple connection")
+ verify_no_forgotten_buckets(conn)
conn.close()
for conn in g_tenant_connections:
log.debug("close tenant connection")
+ verify_no_forgotten_buckets(conn)
conn.close()
#-----------------------------------------------
conn.delete_bucket(Bucket=bucket_name)
#-------------------------------------------------------------------------------
-def verify_pool_is_empty():
+def verify_pool_is_empty(conn, skip_bucket_check=False):
result = admin(['gc', 'process', '--include-all'])
assert result[1] == 0
assert count_object_parts_in_all_buckets(False, 0) == 0
-
+ if not skip_bucket_check:
+ assert verify_no_forgotten_buckets(conn)
#-------------------------------------------------------------------------------
def cleanup(bucket_name, conn):
log.debug("delete_all_objects for bucket <%s>",bucket_name)
delete_bucket_with_all_objects(bucket_name, conn)
- verify_pool_is_empty()
+ verify_pool_is_empty(conn)
#-------------------------------------------------------------------------------
log.debug("delete_all_objects for bucket <%s>",bucket_name)
delete_bucket_with_all_objects(bucket_name, conn)
- verify_pool_is_empty()
+ verify_pool_is_empty(conns[0], True)
+ for conn in conns:
+ assert verify_no_forgotten_buckets(conn)
#-------------------------------------------------------------------------------
finally:
# cleanup must be executed even after a failure
if multi_buckets:
- for bucket_name in bucket_names:
- cleanup(bucket_name, conn)
+ conns=[conn]*len(bucket_names)
+ cleanup_all_buckets(bucket_names, conns)
else:
cleanup(bucket_names[0], conn)
bucket_cp= gen_bucket_name()
bucket_names=[]
+ conns=[]
try:
conn = get_single_connection()
conn.create_bucket(Bucket=bucket_cp)
bucket_names=create_buckets(conn, max_copies_count)
+ # need a vector holding multiple copies of conns to support
+ # upload_objects_multi()/verify_objects_multi()
conns=[conn] * max_copies_count
indices=[0] * len(files)
ret=upload_objects_multi(files, conns, bucket_names, indices, config)
# object and linking to the existing tail-objects
assert (expected_results + cp_head_count) == count_object_parts_in_all_buckets(False, 0)
# delete the original objects and verify server-side-copy objects are valid
- for (bucket_name, conn) in zip(bucket_names, conns):
+ for bucket_name in bucket_names:
delete_bucket_with_all_objects(bucket_name, conn)
result = admin(['gc', 'process', '--include-all'])
# At this point the original obejcts are all removed
# Objects created by server-side-copy should keep the tail in place
- # because of teh refcount
+ # because of the refcount
verify_objects_copy(bucket_cp, files, conn, expected_results, config)
finally:
# cleanup must be executed even after a failure
delete_bucket_with_all_objects(bucket_cp, conn)
- cleanup_all_buckets(bucket_names, conns)
+ if len(bucket_names) > 0:
+ cleanup_all_buckets(bucket_names, conns)
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
threads_dedup_basic_with_tenants_common(files, num_threads, config, False)
-#-------------------------------------------------------------------------------
-@pytest.mark.basic_test
-def test_empty_bucket():
- if full_dedup_is_disabled():
- return
-
- prepare_test()
- log.debug("test_empty_bucket: connect to AWS ...")
-
- max_copies_count=2
- config = default_config
-
- files=[]
- try:
- ret=gen_connections_multi2(max_copies_count)
- tenants=ret[0]
- bucket_names=ret[1]
- conns=ret[2]
- finally:
- # cleanup must be executed even after a failure
- cleanup_all_buckets(bucket_names, conns)
-
-
#-------------------------------------------------------------------------------
def inc_step_with_tenants(stats_base, files, conns, bucket_names, config):
max_copies_count=len(conns)
log.info("test_dedup_identical_copies_multipart:full test")
__test_dedup_identical_copies(files, config, dry_run, verify, force_clean)
+
+#===============================================================================
+# Test Group 1: Filter File Parsing / Name Validation
+#===============================================================================
+#-------------------------------------------------------------------------------
+def write_filter_list_file(filepath, lines):
+ """Write a filter list file with the given lines, one per line."""
+ with open(filepath, 'w') as f:
+ for line in lines:
+ f.write(line + '\n')
+
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_filter_bucket_list_parsing():
+ """Validate CLI parsing of bucket filter list files (allow/deny).
+ Verify that illegal files are rejected
+ """
+ prepare_test()
+ try:
+ # 1. Mutual exclusivity: --allow-bucket-list and --deny-bucket-list together.
+ allow_file = OUT_DIR + "allow_buckets.txt"
+ deny_file = OUT_DIR + "deny_buckets.txt"
+ write_filter_list_file(allow_file, ['my-bucket-1'])
+ write_filter_list_file(deny_file, ['my-bucket-2'])
+ result = admin(['dedup', 'estimate',
+ '--allow-bucket-list', allow_file,
+ '--deny-bucket-list', deny_file])
+ assert result[1] != 0, "Expected failure when both allow and deny lists are given"
+ os.remove(allow_file)
+ os.remove(deny_file)
+
+ # 2. Non-existent file path.
+ result = admin(['dedup', 'estimate', '--allow-bucket-list',
+ '/nonexistent/bucket_list.txt'])
+ assert result[1] != 0, "Expected failure for non-existent filter file"
+ finally:
+ cleanup_local()
+
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_filter_storage_class_list_parsing():
+ """Validate CLI parsing of storage_class filter list files (allow/deny).
+ Verify that illegal files are rejected
+ """
+ prepare_test()
+ try:
+ # 1. Mutual exclusivity: --allow-storage-class-list and --deny-storage-class-list together.
+ allow_file = OUT_DIR + "allow_storage_classs.txt"
+ deny_file = OUT_DIR + "deny_storage_classs.txt"
+ write_filter_list_file(allow_file, ['STORAGECLASS1'])
+ write_filter_list_file(deny_file, ['STORAGECLASS2'])
+ result = admin(['dedup', 'estimate',
+ '--allow-storage-class-list', allow_file,
+ '--deny-storage-class-list', deny_file])
+ assert result[1] != 0, "Expected failure when both allow and deny lists are given"
+ os.remove(allow_file)
+ os.remove(deny_file)
+
+ # 2. Non-existent file path.
+ result = admin(['dedup', 'estimate', '--allow-storage-class-list',
+ '/nonexistent/storage_class_list.txt'])
+ assert result[1] != 0, "Expected failure for non-existent filter file"
+ finally:
+ cleanup_local()
+
+
+#-------------------------------------------------------------------------------
+def read_filter_skip_stats():
+ """Read ingress_skip_filtered_bucket/storage_class from dedup stats JSON."""
+ result = admin(['dedup', 'stats'])
+ assert result[1] == 0
+ jstats = json.loads(result[0])
+ worker_stats = jstats['worker_stats']
+ skipped = worker_stats['skipped']
+ skip_bucket = skipped.get('Ingress skip: filtered bucket', 0)
+ skip_sc = skipped.get('Ingress skipped filtered storage class, num objects skipped', 0)
+ return (skip_bucket, skip_sc)
+
+#-------------------------------------------------------------------------------
+def exec_dedup_with_filter(dry_run, deny_bucket_list=None, allow_bucket_list=None,
+ deny_storage_class_list=None, allow_storage_class_list=None,
+ max_dedup_time=300):
+ cmd = ['dedup', 'estimate' if dry_run else 'exec']
+ if not dry_run:
+ cmd += ['--yes-i-really-mean-it']
+ if deny_bucket_list:
+ cmd += ['--deny-bucket-list', deny_bucket_list]
+ if allow_bucket_list:
+ cmd += ['--allow-bucket-list', allow_bucket_list]
+ if deny_storage_class_list:
+ cmd += ['--deny-storage-class-list', deny_storage_class_list]
+ if allow_storage_class_list:
+ cmd += ['--allow-storage-class-list', allow_storage_class_list]
+
+ result = admin(cmd)
+ assert result[1] == 0
+ dedup_time = 0
+ dedup_timeout = 3
+ while dedup_time < max_dedup_time:
+ time.sleep(dedup_timeout)
+ dedup_time += dedup_timeout
+ ret = read_dedup_stats(dry_run)
+ if ret[0]: # completed
+ return ret
+
+ assert False
+
+
+
+#==============================================================================
+# Test Group 2: Storage-Class list Filters
+#===============================================================================
+
+#-------------------------------------------------------------------------------
+def dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow):
+ """Verify that objects whose storage class is denied are skipped during estimate.
+ Denying STANDARD means every object is filtered
+ Allowing STANDARD means nothing is filtered
+ """
+ prepare_test()
+ config=default_config
+ filter_file = OUT_DIR + "deny_storage_classs.txt"
+ bucket_name = gen_bucket_name()
+ conn=get_single_connection()
+ files=[]
+ num_files=7
+ base_size = 2*MB
+ log.debug("generate files: base size=%d MiB, max_size=%d MiB",
+ base_size/MB, (pow(2, num_files) * base_size)/MB)
+ gen_files(files, base_size, num_files)
+ expected_dedup_stats = Dedup_Stats() # start with an empty-stats
+ split_head_objs=0
+ rados_objects_total=0
+
+ try:
+ obj_count = 0
+ bucket = conn.create_bucket(Bucket=bucket_name)
+
+ for f in files:
+ filename=f[0]
+ obj_size=f[1]
+ num_copies=f[2]
+
+ if filter_mode_allow:
+ split_head_objs += calc_split_objs_count(obj_size, num_copies, config)
+ calc_expected_stats(expected_dedup_stats, obj_size, num_copies, config)
+ else:
+ rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config)
+ rados_objects_total += (rados_obj_count * num_copies)
+
+ for i in range(0, num_copies):
+ key = gen_object_name(filename, i)
+ log.debug("upload: %s -> %s", OUT_DIR + filename, key)
+ obj_count += 1
+ conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config)
+
+
+ write_filter_list_file(filter_file, ['STANDARD'])
+ if filter_mode_allow:
+ ret = exec_dedup_with_filter(dry_run, allow_storage_class_list=filter_file)
+ else:
+ ret = exec_dedup_with_filter(dry_run, deny_storage_class_list=filter_file)
+
+ dedup_stats=ret[1]
+ dedup_ratio_estimate=ret[2]
+
+ (skip_bucket, skip_sc) = read_filter_skip_stats()
+ assert skip_bucket == 0
+ log.debug("filtered storage class, num objects skipped = %d", skip_sc)
+ if filter_mode_allow:
+ assert skip_sc == 0
+ if not dry_run:
+ # the number of left rados-object post-dedup should equal the expected_results
+ # since we didn't filter any object
+ expected_results=calc_expected_results(files, config)
+ expected_results += split_head_objs
+ log.debug("expected_results=%d, split_head_objs=%d", expected_results, split_head_objs)
+ assert expected_results == count_object_parts_in_all_buckets(False)
+ else:
+ assert skip_sc == obj_count
+ # the number of left rados-object post-dedup should equal the rados_obj_count
+ # pre-dedup count since all objects were filtered out
+ assert rados_objects_total == count_object_parts_in_all_buckets(False)
+
+ if dry_run:
+ reset_full_dedup_stats(expected_dedup_stats)
+
+ expected_dedup_stats.size_before_dedup = dedup_stats.size_before_dedup
+ assert dedup_stats == expected_dedup_stats
+ finally:
+ cleanup_local()
+ try:
+ delete_bucket_with_all_objects(bucket_name, conn)
+ except Exception as e:
+ log.warning("Failed to cleanup bucket %s: %s", bucket_name, e)
+
+ verify_pool_is_empty(conn)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_filter_storage_class_estimate():
+ dry_run=True
+
+ log.info("dedup_filter_storage_class_estimate: filter_mode_allow")
+ dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=True)
+
+ log.info("dedup_filter_bucket_estimate: filter_mode_deny")
+ dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=False)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_filter_storage_class_exec():
+ dry_run=False
+
+ log.info("dedup_filter_storage_class_exec: filter_mode_allow")
+ dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=True)
+
+ log.info("dedup_filter_storage_class_exec: filter_mode_deny")
+ dedup_filter_allow_deny_storage_class_common(dry_run, filter_mode_allow=False)
+
+
+#==============================================================================
+# Test Group 3: Bucket list Filters
+#===============================================================================
+
+#-------------------------------------------------------------------------------
+def dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow):
+ """
+ Upload identical objects to 4 buckets. Deny bucket_a and bucket_b.
+ Verify:
+ - ingress_skip_filtered_bucket == 2 (two bucket skipped)
+ - dedup estimate reflects only bucket_c + bucket_d (2 visible copies)
+ - Rados pool has MORE tail objects than a full (unfiltered) dedup would leave,
+ because bucket_a and bucket_b tails were not deduplicated
+ """
+ prepare_test()
+ config=default_config
+ filter_file = OUT_DIR + 'filter_bucket_list.txt'
+ conn = get_single_connection()
+ files = []
+ num_files = 11
+ bucket_a = gen_bucket_name()
+ bucket_b = gen_bucket_name()
+ bucket_c = gen_bucket_name()
+ bucket_d = gen_bucket_name()
+
+ filtered_bucket_names = [bucket_a, bucket_b]
+ num_filtered=len(filtered_bucket_names)
+ visible_bucket_names = [bucket_c, bucket_d]
+ num_visible=len(visible_bucket_names)
+ bucket_names = filtered_bucket_names + visible_bucket_names
+ log.debug("filtered=%s, visible=%s, buckets=%s",
+ filtered_bucket_names, visible_bucket_names, bucket_names)
+
+ base_size = 16*KB
+ log.debug("generate files: base size=%d MiB, max_size=%d MiB",
+ base_size/MB, (pow(2, num_files) * base_size)/MB)
+ gen_files(files, base_size, num_files)
+
+ try:
+ if filter_mode_allow:
+ # forgotten buckets from previous tests will cause skip_bucket count
+ # to be too high in allow_mode (as they won't appear in the allow list)
+ verify_no_forgotten_buckets(conn)
+
+ for b in bucket_names:
+ conn.create_bucket(Bucket=b)
+
+ rados_filtered=0
+ for f in files:
+ filename=f[0]
+ obj_size=f[1]
+ rados_obj_count = calc_rados_obj_count(1, obj_size, config)
+ rados_filtered += (rados_obj_count * num_filtered)
+ for i, bkt in enumerate(filtered_bucket_names):
+ key = gen_object_name("filtered" + filename, i)
+ conn.upload_file(OUT_DIR + filename, bkt, key, Config=config)
+
+ log.debug("rados_filtered=%d", rados_filtered)
+ assert rados_filtered == count_object_parts_in_all_buckets(False, 0)
+
+ # Build expected stats for only the VISIBLE copies
+ expected_dedup_stats = Dedup_Stats()
+ rados_visible=0
+ rados_visible_post_dedup=0
+ split_head_objs=0
+ for f in files:
+ filename=f[0]
+ obj_size=f[1]
+ calc_expected_stats(expected_dedup_stats, obj_size, num_visible, config)
+ rados_obj_count = calc_rados_obj_count(1, obj_size, config)
+ split_head = calc_split_objs_count(obj_size, num_visible, config)
+ split_head_objs += split_head
+ tail_count = ((rados_obj_count + split_head) - 1)
+ rados_visible_post_dedup += (tail_count + num_visible)
+ rados_visible += (rados_obj_count*num_visible)
+ for i, bkt in enumerate(visible_bucket_names):
+ key = gen_object_name("visible" + filename, i)
+ conn.upload_file(OUT_DIR + filename, bkt, key, Config=config)
+
+ log.debug("rados_visible=%d, split_head_objs=%d", rados_visible, split_head_objs)
+ assert (rados_filtered + rados_visible) == count_object_parts_in_all_buckets(False, 0)
+
+ if filter_mode_allow:
+ # Write the allow-list (only bucket_b and bucket_c are allowed)
+ write_filter_list_file(filter_file, visible_bucket_names)
+
+ # Run dedup with allow filter
+ ret = exec_dedup_with_filter(dry_run, allow_bucket_list=filter_file)
+ else:
+ # Write the deny-list (only bucket_a is denied)
+ write_filter_list_file(filter_file, filtered_bucket_names)
+
+ # Run dedup with deny filter
+ ret = exec_dedup_with_filter(dry_run, deny_bucket_list=filter_file)
+
+ if not dry_run:
+ result = admin(['gc', 'process', '--include-all'])
+ assert result[1] == 0
+ actual_rados = count_object_parts_in_all_buckets(False, 0)
+ log.debug("rados_filtered=%d, rados_visible_post_dedup=%d, combined=%d",
+ rados_filtered, rados_visible_post_dedup,
+ (rados_filtered + rados_visible_post_dedup))
+ assert actual_rados == (rados_filtered + rados_visible_post_dedup)
+
+ (skip_bucket, skip_sc) = read_filter_skip_stats()
+ assert skip_sc == 0
+ assert skip_bucket == num_filtered
+ finally:
+ cleanup_local()
+ for b in bucket_names:
+ try:
+ delete_bucket_with_all_objects(b, conn)
+ except Exception as e:
+ log.warning("Failed to cleanup bucket %s: %s", b, e)
+
+ verify_pool_is_empty(conn)
+
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_filter_bucket_estimate():
+ dry_run=True
+ log.info("dedup_filter_bucket_estimate: filter_mode_deny")
+ dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=False)
+
+ log.info("dedup_filter_bucket_estimate: filter_mode_allow")
+ dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=True)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_filter_bucket_exec():
+ dry_run=False
+ log.info("dedup_filter_bucket_exec: filter_mode_deny")
+ dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=False)
+
+ log.info("dedup_filter_bucket_exec: filter_mode_allow")
+ dedup_filter_allow_deny_bucket_common(dry_run, filter_mode_allow=True)