--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab ft=cpp
+
+#include "rgw_rest_dedup.h"
+#include "rgw_op.h"
+#include "rgw_sal.h"
+#include "rgw_sal_rados.h"
+#include "rgw_dedup_cluster.h"
+#include "rgw_dedup_utils.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+
+static rgw::sal::RadosStore* get_rados_store(rgw::sal::Driver* driver)
+{
+ return dynamic_cast<rgw::sal::RadosStore*>(driver);
+}
+
+// GET /admin/dedup?op=stats
+class RGWOp_Dedup_Stats : public RGWRESTOp {
+public:
+ int check_caps(const RGWUserCaps& caps) override {
+ return caps.check_cap("dedup", RGW_CAP_READ);
+ }
+
+ void execute(optional_yield y) override {
+ auto store = get_rados_store(driver);
+ if (!store) {
+ op_ret = -EPERM;
+ return;
+ }
+
+ op_ret = rgw::dedup::cluster::collect_all_shard_stats(
+ store, s->formatter, this);
+ }
+
+ const char* name() const override { return "get_dedup_stats"; }
+};
+
+// GET /admin/dedup?op=throttle
+class RGWOp_Dedup_Throttle_Get : public RGWRESTOp {
+public:
+ int check_caps(const RGWUserCaps& caps) override {
+ return caps.check_cap("dedup", RGW_CAP_READ);
+ }
+
+ void execute(optional_yield y) override {
+ using namespace rgw::dedup;
+ auto store = get_rados_store(driver);
+ if (!store) {
+ op_ret = -EPERM;
+ return;
+ }
+
+ bufferlist urgent_msg_bl;
+ urgent_msg_t urgent_msg = URGENT_MSG_THROTTLE;
+ ceph::encode(urgent_msg, urgent_msg_bl);
+ throttle_msg_t throttle_msg;
+ encode(throttle_msg, urgent_msg_bl);
+
+ op_ret = cluster::dedup_control_bl(store, this, urgent_msg, urgent_msg_bl,
+ s->formatter);
+ }
+
+ const char* name() const override { return "get_dedup_throttle"; }
+};
+
+// POST /admin/dedup?op=estimate|exec
+class RGWOp_Dedup_Scan : public RGWRESTOp {
+ rgw::dedup::dedup_req_type_t dedup_type;
+public:
+ RGWOp_Dedup_Scan(rgw::dedup::dedup_req_type_t dedup_type)
+ : dedup_type(dedup_type) {}
+
+ int check_caps(const RGWUserCaps& caps) override {
+ return caps.check_cap("dedup", RGW_CAP_WRITE);
+ }
+
+ void execute(optional_yield y) override {
+ auto store = get_rados_store(driver);
+ if (!store) {
+ op_ret = -EPERM;
+ return;
+ }
+
+ if (dedup_type == rgw::dedup::dedup_req_type_t::DEDUP_TYPE_EXEC) {
+ bool confirmed = false;
+ RESTArgs::get_bool(s, "yes-i-really-mean-it", false, &confirmed);
+ if (!confirmed) {
+ op_ret = -EINVAL;
+ return;
+ }
+#ifndef FULL_DEDUP_SUPPORT
+ op_ret = -EPERM;
+ return;
+#endif
+ }
+
+ op_ret = rgw::dedup::cluster::dedup_restart_scan(store, dedup_type, this);
+ }
+
+ const char* name() const override {
+ return dedup_type == rgw::dedup::dedup_req_type_t::DEDUP_TYPE_EXEC
+ ? "run dedup_exec" : "run dedup_estimate";
+ }
+};
+
+// POST /admin/dedup?op=abort|pause|resume
+class RGWOp_Dedup_Control : public RGWRESTOp {
+ rgw::dedup::urgent_msg_t msg;
+public:
+ RGWOp_Dedup_Control(rgw::dedup::urgent_msg_t msg) : msg(msg) {}
+
+ int check_caps(const RGWUserCaps& caps) override {
+ return caps.check_cap("dedup", RGW_CAP_WRITE);
+ }
+
+ void execute(optional_yield y) override {
+ auto store = get_rados_store(driver);
+ if (!store) {
+ op_ret = -EPERM;
+ return;
+ }
+
+ op_ret = rgw::dedup::cluster::dedup_control(store, this, msg);
+ }
+
+ const char* name() const override {
+ switch (msg) {
+ case rgw::dedup::URGENT_MSG_ABORT: return "abort dedup";
+ case rgw::dedup::URGENT_MSG_PASUE: return "pause dedup";
+ case rgw::dedup::URGENT_MSG_RESUME: return "resume dedup";
+ default: return "dedup control";
+ }
+ }
+};
+
+// POST /admin/dedup?op=throttle&max-bucket-index-ops=N&max-metadata-ops=M
+class RGWOp_Dedup_Throttle_Set : public RGWRESTOp {
+public:
+ int check_caps(const RGWUserCaps& caps) override {
+ return caps.check_cap("dedup", RGW_CAP_WRITE);
+ }
+
+ void execute(optional_yield y) override {
+ using namespace rgw::dedup;
+ auto store = get_rados_store(driver);
+ if (!store) {
+ op_ret = -EPERM;
+ return;
+ }
+
+ bufferlist urgent_msg_bl;
+ urgent_msg_t urgent_msg = URGENT_MSG_THROTTLE;
+ ceph::encode(urgent_msg, urgent_msg_bl);
+ throttle_msg_t throttle_msg;
+ string err;
+
+ auto parse_limit = [&](const char* param, op_type_t op_type) {
+ string val;
+ RESTArgs::get_string(s, param, "", &val);
+ if (!val.empty()) {
+ int64_t limit = strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ return -EINVAL;
+ }
+ throttle_msg.vec.push_back({ .op_type = op_type,
+ .limit = (uint32_t)limit });
+ }
+ return 0;
+ };
+
+ op_ret = parse_limit("max-bucket-index-ops", BUCKET_INDEX_OP);
+ if (op_ret) return;
+ op_ret = parse_limit("max-metadata-ops", METADATA_ACCESS_OP);
+ if (op_ret) return;
+
+ if (throttle_msg.vec.empty()) {
+ op_ret = -EINVAL;
+ return;
+ }
+
+ encode(throttle_msg, urgent_msg_bl);
+ op_ret = cluster::dedup_control_bl(store, this, urgent_msg, urgent_msg_bl,
+ s->formatter);
+ }
+
+ const char* name() const override { return "set_dedup_throttle"; }
+};
+
+RGWOp *RGWHandler_Dedup::op_get()
+{
+ string op;
+ RESTArgs::get_string(s, "op", "", &op);
+
+ if (op == "stats") {
+ return new RGWOp_Dedup_Stats;
+ }
+ if (op == "throttle") {
+ return new RGWOp_Dedup_Throttle_Get;
+ }
+
+ return nullptr;
+}
+
+RGWOp *RGWHandler_Dedup::op_post()
+{
+ string op;
+ RESTArgs::get_string(s, "op", "", &op);
+
+ if (op == "estimate") {
+ return new RGWOp_Dedup_Scan(rgw::dedup::dedup_req_type_t::DEDUP_TYPE_ESTIMATE);
+ }
+ if (op == "exec") {
+ return new RGWOp_Dedup_Scan(rgw::dedup::dedup_req_type_t::DEDUP_TYPE_EXEC);
+ }
+ if (op == "abort") {
+ return new RGWOp_Dedup_Control(rgw::dedup::URGENT_MSG_ABORT);
+ }
+ if (op == "pause") {
+ return new RGWOp_Dedup_Control(rgw::dedup::URGENT_MSG_PASUE);
+ }
+ if (op == "resume") {
+ return new RGWOp_Dedup_Control(rgw::dedup::URGENT_MSG_RESUME);
+ }
+ if (op == "throttle") {
+ return new RGWOp_Dedup_Throttle_Set;
+ }
+
+ return nullptr;
+}
import math
import time
import subprocess
-import urllib.request
import hashlib
from multiprocessing import Process
import filecmp
import boto3
from boto3.s3.transfer import TransferConfig
from dataclasses import dataclass
+import urllib.parse
+import urllib.request
+import urllib.error
+from botocore.auth import HmacV1Auth
+from botocore.credentials import Credentials
+from botocore.awsrequest import AWSRequest
from . import(
configfile,
cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_rados', 'noname'] + args
return bash(cmd, **kwargs)
+#------------------------------------------------------------------
+# Rest API helper functions
+#------------------------------------------------------------------
+
+_dedup_caps_granted = False
+#------------------------------------------------------------------------
+def _ensure_dedup_caps():
+ """Grant 'dedup=*' caps to the test user (once) so REST calls pass
+ the RGWUserCaps check."""
+ global _dedup_caps_granted
+ if _dedup_caps_granted:
+ return
+ access_key = get_access_key()
+ result = admin(['user', 'info', '--access-key', access_key])
+ assert result[1] == 0, "failed to look up test user"
+ info = json.loads(result[0])
+ uid = info['user_id']
+ tenant = info.get('tenant', '')
+ if tenant:
+ uid = tenant + '$' + uid
+ result = admin(['caps', 'add', '--uid', uid, '--caps', 'dedup=*'])
+ assert result[1] == 0, "failed to add dedup caps"
+ log.debug("granted dedup=* caps to uid=%s", uid)
+ _dedup_caps_granted = True
+
+#-------------------------------------------------------------------------
+def _admin_rest_url():
+ hostname = get_config_host()
+ port_no = get_config_port()
+ scheme = 'https' if port_no in (443, 8443) else 'http'
+ return f'{scheme}://{hostname}:{port_no}/admin/dedup'
+
+#--------------------------------------------------------------------------
+def admin_rest(method, params):
+ """Send a signed GET/POST to /admin/dedup and return
+ (body, returncode) matching the tuple that ``admin()`` returns."""
+ _ensure_dedup_caps()
+ url = f'{_admin_rest_url()}?{urllib.parse.urlencode(params, doseq=True)}'
+
+ creds = Credentials(get_access_key(), get_secret_key())
+ aws_req = AWSRequest(method=method, url=url)
+ HmacV1Auth(creds).add_auth(aws_req)
+
+ req = urllib.request.Request(url, method=method,
+ headers=dict(aws_req.headers))
+ try:
+ resp = urllib.request.urlopen(req, timeout=120)
+ body = resp.read().decode('utf-8')
+ return (body, 0)
+ except urllib.error.HTTPError as e:
+ body = e.read().decode('utf-8', errors='replace')
+ log.error("admin_rest %s [params=%s] HTTP %d: %s",
+ method, params, e.code, body)
+ return (body, 1)
+
+#--------------------------------------------------------------
+def dedup_admin(subcmd, **kwargs):
+ """Invoke a dedup admin operation via REST API."""
+ is_read = subcmd in ('stats',) or (subcmd == 'throttle' and kwargs.pop('stat', False))
+ method = 'GET' if is_read else 'POST'
+ params = {'op': subcmd}
+ if subcmd == 'exec':
+ params['yes-i-really-mean-it'] = ''
+ for k, v in kwargs.items():
+ params[k.replace('_', '-')] = str(v)
+ log.debug("dedup_admin [REST %s]: params=%s", method, params)
+ return admin_rest(method, params)
+
+#--------------------------------------------------------------
+def dedup_admin_cli(subcmd, *args):
+ """Invoke a dedup admin operation via radosgw-admin CLI."""
+ cli_args = ['dedup', subcmd]
+ if subcmd == 'exec':
+ cli_args.append('--yes-i-really-mean-it')
+ cli_args += list(args)
+ log.debug("dedup_admin_cli: args=%s", cli_args)
+ return admin(cli_args)
+
#-----------------------------------------------
def gen_bucket_name():
global num_buckets
dedup_ratio_estimate=Dedup_Ratio()
dedup_ratio_actual=Dedup_Ratio()
- result = admin(['dedup', 'stats'])
+ result = dedup_admin('stats')
assert result[1] == 0
jstats=json.loads(result[0])
#-------------------------------------------------------------------------------
def set_bucket_index_throttling(limit):
- cmd = ['dedup', 'throttle', '--max-bucket-index-ops', str(limit)]
- result = admin(cmd)
+ result = dedup_admin('throttle', max_bucket_index_ops=limit)
assert result[1] == 0
log.debug(result[0])
log.debug("sending exec_dedup request: dry_run=%d", dry_run)
if dry_run:
- result = admin(['dedup', 'estimate'])
+ result = dedup_admin('estimate')
reset_full_dedup_stats(expected_dedup_stats)
else:
- result = admin(['dedup', 'exec', '--yes-i-really-mean-it'])
+ result = dedup_admin('exec')
assert result[1] == 0
log.debug("wait for dedup to complete")
global full_dedup_state_was_checked
global full_dedup_state_disabled
log.debug("check_full_dedup_state:: sending FULL Dedup request")
- result = admin(['dedup', 'exec', '--yes-i-really-mean-it'])
+ result = dedup_admin('exec')
if result[1] == 0:
log.debug("full dedup is enabled!")
full_dedup_state_disabled = False
- result = admin(['dedup', 'abort'])
+ result = dedup_admin('abort')
assert result[1] == 0
else:
log.debug("full dedup is disabled, skip all full dedup tests")
names=result[0].split()
for name in names:
log.debug("name=%s", name)
- if key in name:
- log.debug("key=%s is a substring of name=%s", key, name);
+ if name.endswith(key):
+ log.debug("key=%s is a suffix of name=%s", key, name);
rados_name = name
break;
try:
threads_simple_dedup_with_tenants(files, conns, bucket_names, config, True)
except Exception:
- log.warning("test_dedup_dry_large_scale: failed!!")
+ log.warning("test_dedup_dry_large_scale_with_tenants: failed!!")
+ assert 0, "abort test_dedup_dry_large_scale_with_tenants "
finally:
# cleanup must be executed even after a failure
cleanup_all_buckets(bucket_names, conns)
cleanup(bucket_name, conn)
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_cli_operations():
+ """Exercise all dedup CLI subcommands: estimate, stats, exec, pause, resume,
+ abort, throttle."""
+ if full_dedup_is_disabled():
+ return
+
+ prepare_test()
+ bucket_name = gen_bucket_name()
+ conn = get_single_connection()
+ try:
+ files = []
+ gen_files(files, 16*KB, 3)
+ bucket = conn.create_bucket(Bucket=bucket_name)
+ indices = [0] * len(files)
+ upload_objects(bucket_name, files, indices, conn, default_config, True)
+
+ log.info("Test radosgw-admin dedup estimate");
+ result = dedup_admin_cli('estimate')
+ assert result[1] == 0, "CLI estimate failed"
+
+ dedup_time = 0
+ dedup_timeout = 3
+ max_dedup_time = 30
+ while True:
+ assert dedup_time < max_dedup_time
+ time.sleep(dedup_timeout)
+ dedup_time += dedup_timeout
+ ret = read_dedup_stats(dry_run=True)
+ if ret[0]:
+ break
+
+
+ log.info("Test radosgw-admin dedup stats");
+ result = dedup_admin_cli('stats')
+ assert result[1] == 0, "CLI stats after estimate failed"
+
+ log.info("Test radosgw-admin dedup exec");
+ result = dedup_admin_cli('exec')
+ assert result[1] == 0, "CLI exec failed"
+
+ log.info("Test radosgw-admin dedup throttle");
+ result = dedup_admin_cli('throttle', '--max-bucket-index-ops', '100')
+ assert result[1] == 0, "CLI throttle failed"
+
+ log.info("Test radosgw-admin dedup throttle stat");
+ result = dedup_admin_cli('throttle', '--stat')
+ assert result[1] == 0, "CLI throttle failed"
+
+ log.info("Test radosgw-admin dedup pause");
+ result = dedup_admin_cli('pause')
+ assert result[1] == 0, "CLI pause failed"
+
+ log.info("Test radosgw-admin dedup resume");
+ result = dedup_admin_cli('resume')
+ assert result[1] == 0, "CLI resume failed"
+
+ log.info("Test radosgw-admin dedup abort");
+ result = dedup_admin_cli('abort')
+ assert result[1] == 0, "CLI abort failed"
+
+ log.info("Test radosgw-admin dedup stats");
+ result = dedup_admin_cli('stats')
+ assert result[1] == 0, "CLI stats after abort failed"
+ finally:
+ cleanup(bucket_name, conn)
+
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_rest_pause_resume():
+ """Exercise pause and resume via REST API."""
+ if full_dedup_is_disabled():
+ return
+
+ prepare_test()
+ bucket_name = gen_bucket_name()
+ conn = get_single_connection()
+ try:
+ files = []
+ gen_files(files, 16*KB, 3)
+ bucket = conn.create_bucket(Bucket=bucket_name)
+ indices = [0] * len(files)
+ upload_objects(bucket_name, files, indices, conn, default_config, True)
+
+ result = dedup_admin('exec')
+ assert result[1] == 0, "REST exec failed"
+
+ result = dedup_admin('pause')
+ assert result[1] == 0, "REST pause failed"
+
+ result = dedup_admin('throttle', stat=True)
+ assert result[1] == 0, "REST throttle stat failed"
+
+ result = dedup_admin('resume')
+ assert result[1] == 0, "REST resume failed"
+
+ result = dedup_admin('abort')
+ assert result[1] == 0, "REST abort failed"
+
+ result = dedup_admin('stats')
+ assert result[1] == 0, "REST stats after pause/resume failed"
+ finally:
+ cleanup(bucket_name, conn)
+
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_rest_throttle():
+ """Verify REST throttle set/get preserves unmodified values."""
+ def parse_throttle(result):
+ raw = json.loads(result[0]) if result[0].strip() else {}
+ return raw.get('throttle', raw)
+
+ result = dedup_admin('throttle', stat=True)
+ assert result[1] == 0, "REST throttle initial stat failed"
+ orig = parse_throttle(result)
+ orig_bucket = orig.get('bucket_index_throttle', 0)
+ orig_metadata = orig.get('metadata_throttle', 0)
+ log.info("throttle initial: bucket_index=%s, metadata=%s",
+ orig_bucket, orig_metadata)
+
+ new_bucket=orig_bucket+17
+ new_metadata=orig_metadata+17
+ result = dedup_admin('throttle', max_bucket_index_ops=new_bucket)
+ assert result[1] == 0, "REST throttle set bucket-index failed"
+ body = parse_throttle(result)
+ log.info("throttle after set bucket_index=%d:",
+ body.get('bucket_index_throttle', 0))
+ assert body.get('bucket_index_throttle') == new_bucket
+ assert body.get('metadata_throttle', 0) == orig_metadata
+
+ result = dedup_admin('throttle', max_metadata_ops=new_metadata)
+ assert result[1] == 0, "REST throttle set metadata failed"
+ body = parse_throttle(result)
+ log.info("throttle after set metadata=%d",
+ body.get('metadata_throttle', 0))
+ assert body.get('bucket_index_throttle') == new_bucket
+ assert body.get('metadata_throttle') == new_metadata
+
+ result = dedup_admin('throttle', stat=True)
+ assert result[1] == 0, "REST throttle final stat failed"
+ body = parse_throttle(result)
+ assert body.get('bucket_index_throttle') == new_bucket
+ assert body.get('metadata_throttle') == new_metadata
+
+ kwargs = {}
+ kwargs['max_bucket_index_ops'] = orig_bucket
+ kwargs['max_metadata_ops'] = orig_metadata
+ result = dedup_admin('throttle', **kwargs)
+ assert result[1] == 0, "REST throttle restore failed"
+ body = parse_throttle(result)
+ log.info("throttle after restore: bucket_index_throttle=%d, metadata=%d",
+ body.get('bucket_index_throttle', 0),
+ body.get('metadata_throttle', 0))
+
+ log.info("throttle restored to: bucket_index=%s, metadata=%s",
+ orig_bucket, orig_metadata)
+
+
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_cleanup():
#-------------------------------------------------------------------------------
def read_filter_skip_stats():
"""Read ingress_skip_filtered_bucket/storage_class from dedup stats JSON."""
- result = admin(['dedup', 'stats'])
+ result = dedup_admin('stats')
assert result[1] == 0
jstats = json.loads(result[0])
worker_stats = jstats['worker_stats']