You can add read, write or all capabilities to users, buckets, metadata and
usage (utilization). For example::
- --caps="[users|buckets|metadata|usage|zone]=[*|read|write|read, write]"
+ --caps="[users|buckets|metadata|usage|zone|amz-cache|info|bilog|mdlog|datalog|user-policy|oidc-provider|roles]=[*|read|write|read, write]"
For example::
be restarted for the changes to take effect.
+Rate Limit Management
+=====================
+
+The Ceph Object Gateway enables you to set rate limits on users and buckets.
+Rate limit includes the maximum number of read ops and write ops per minute
+and how many bytes per minute could be written or read per user or per bucket.
+Requests that are using GET or HEAD method in the REST request are considered as "read requests", otherwise they are considered as "write requests".
+Every Object Gateway tracks per user and bucket metrics separately, these metrics are not shared with other gateways.
+That means that the desired limits configured should be divide by the number of active Object Gateways.
+For example, if userA should be limited by 10 ops per minute and there are 2 Object Gateways in the cluster,
+the limit over userA should be 5 (10 ops per minute / 2 RGWs).
+if the requests are ``not`` balanced between RGWs, the rate limit may be underutilized.
+For example, if the ops limit is 5 and there are 2 RGWs, ``but`` the Load Balancer send load only to one of those RGWs,
+The effective limit would be 5 ops, because this limit is enforced per RGW.
+If there is a limit reached for bucket not for user or vice versa the request would be cancelled as well.
+The bandwidth counting happens after the request is being accepted, as a result, even if in the middle of the request the bucket/user has reached its bandwidth limit this request will proceed.
+The RGW will keep a "debt" of used bytes more than the configured value and will prevent this user/bucket from sending more requests until there "debt" is being paid.
+The "debt" maximum size is twice the max-read/write-bytes per minute.
+If userA has 1 byte read limit per minute and this user tries to GET 1 GB object, the user will be able to do it.
+After userA completes this 1GB operation, the RGW will block the user request for up to 2 minutes until userA will be able to send GET request again.
+
+
+- **Bucket:** The ``--bucket`` option allows you to specify a rate limit for a
+ bucket.
+
+- **User:** The ``--uid`` option allows you to specify a rate limit for a
+ user.
+
+- **Maximum Read Ops:** The ``--max-read-ops`` setting allows you to specify
+ the maximum number of read ops per minute per RGW. A 0 value disables this setting (which means unlimited access).
+
+- **Maximum Read Bytes:** The ``--max-read-bytes`` setting allows you to specify
+ the maximum number of read bytes per minute per RGW. A 0 value disables this setting (which means unlimited access).
+
+- **Maximum Write Ops:** The ``--max-write-ops`` setting allows you to specify
+ the maximum number of write ops per minute per RGW. A 0 value disables this setting (which means unlimited access).
+
+- **Maximum Write Bytes:** The ``--max-write-bytes`` setting allows you to specify
+ the maximum number of write bytes per minute per RGW. A 0 value disables this setting (which means unlimited access).
+
+- **Rate Limit Scope:** The ``--ratelimit-scope `` option sets the scope for the rate limit.
+ The options are ``bucket`` , ``user`` and ``anonymous``. Bucket rate limit apply to buckets.
+ The user rate limit applies to a user. Anonymous applies to an unauthenticated user.
+ Anonymous scope is only available for global rate limit.
+
+
+Set User Rate Limit
+-------------------
+
+Before you enable a rate limit, you must first set the rate limit parameters.
+For example::
+
+ radosgw-admin ratelimit set --ratelimit-scope=user --uid=<uid> <[--max-read-ops=<num ops>] [--max-read-bytes=<num bytes>]
+ [--max-write-ops=<num ops>] [--max-write-bytes=<num bytes>]>
+
+For example::
+
+ radosgw-admin ratelimit set --ratelimit-scope=user --uid=johndoe --max-read-ops=1024 --max-write-bytes=10240
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+Get User Rate Limit
+-------------------
+
+Get the current configured rate limit parameters
+For example::
+
+ radosgw-admin ratelimit set --ratelimit-scope=user --uid=<uid>
+
+For example::
+
+ radosgw-admin ratelimit get --ratelimit-scope=user --uid=johndoe
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+
+Enable/Disable User Rate Limit
+------------------------------
+
+Once you set a user rate limit, you may enable it. For example::
+
+ radosgw-admin ratelimit enable --ratelimit-scope=user --uid=<uid>
+
+You may disable an enabled user rate limit. For example::
+
+ radosgw-admin ratelimit disable --ratelimit-scope=user --uid=johndoe
+
+
+Set Bucket Rate Limit
+---------------------
+
+Before you enable a rate limit, you must first set the rate limit parameters.
+For example::
+
+ radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket=<bucket> <[--max-read-ops=<num ops>] [--max-read-bytes=<num bytes>]
+ [--max-write-ops=<num ops>] [--max-write-bytes=<num bytes>]>
+
+For example::
+
+ radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket=mybucket --max-read-ops=1024 --max-write-bytes=10240
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+Get Bucket Rate Limit
+---------------------
+
+Get the current configured rate limit parameters
+For example::
+
+ radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket=<bucket>
+
+For example::
+
+ radosgw-admin ratelimit get --ratelimit-scope=bucket --bucket=mybucket
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+
+Enable/Disable Bucket Rate Limit
+--------------------------------
+
+Once you set a bucket rate limit, you may enable it. For example::
+
+ radosgw-admin ratelimit enable --ratelimit-scope=bucket --bucket=<bucket>
+
+You may disable an enabled bucket rate limit. For example::
+
+ radosgw-admin ratelimit disable --ratelimit-scope=bucket --uid=mybucket
+
+
+Reading / Writing Global Rate Limit Configuration
+-------------------------------------------------
+
+You can read and write global rate limit settings in the period configuration. To
+view the global rate limit settings::
+
+ radosgw-admin global rate limit get
+
+The global rate limit settings can be manipulated with the ``global ratelimit``
+counterparts of the ``ratelimit set``, ``ratelimit enable``, and ``ratelimit disable``
+commands. ``Per user and per bucket ratelimit configuration is overriding the global configuration``::
+
+ radosgw-admin global ratelimit set --ratelimit-scope bucket --max-read-ops=1024
+ radosgw-admin global ratelimit enable --ratelimit-scope bucket
+
+The global rate limit can configure rate limit scope for all authenticated users::
+
+ radosgw-admin global ratelimit set --ratelimit-scope user --max-read-ops=1024
+ radosgw-admin global ratelimit enable --ratelimit-scope user
+
+The global rate limit can configure rate limit scope for all unauthenticated users::
+
+ radosgw-admin global ratelimit set --ratelimit-scope=anonymous --max-read-ops=1024
+ radosgw-admin global ratelimit enable --ratelimit-scope=anonymous
+
+.. note:: In a multisite configuration, where there is a realm and period
+ present, changes to the global quotas must be committed using ``period
+ update --commit``. If there is no period present, the rados gateway(s) must
+ be restarted for the changes to take effect.
+
Usage
=====
cout << " quota set set quota params\n";
cout << " quota enable enable quota\n";
cout << " quota disable disable quota\n";
+ cout << " ratelimit get get ratelimit params\n";
+ cout << " ratelimit set set ratelimit params\n";
+ cout << " ratelimit enable enable ratelimit\n";
+ cout << " ratelimit disable disable ratelimit\n";
cout << " global quota get view global quota params\n";
cout << " global quota set set global quota params\n";
cout << " global quota enable enable a global quota\n";
cout << " global quota disable disable a global quota\n";
+ cout << " global ratelimit get view global ratelimit params\n";
+ cout << " global ratelimit set set global ratelimit params\n";
+ cout << " global ratelimit enable enable a ratelimit quota\n";
+ cout << " global ratelimit disable disable a ratelimit quota\n";
cout << " realm create create a new realm\n";
cout << " realm rm remove a realm\n";
cout << " realm get show realm info\n";
cout << " --max-objects specify max objects (negative value to disable)\n";
cout << " --max-size specify max size (in B/K/M/G/T, negative value to disable)\n";
cout << " --quota-scope scope of quota (bucket, user)\n";
+ cout << "\nRate limiting options:\n";
+ cout << " --max-read-ops specify max requests per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited\n";
+ cout << " --max-read-bytes specify max bytes per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited\n";
+ cout << " --max-write-ops specify max requests per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited\n";
+ cout << " --max-write-bytes specify max bytes per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited\n";
+ cout << " --ratelimit-scope scope of rate limiting: bucket, user, anonymous\n";
+ cout << " anonymous can be configured only with global rate limit\n";
cout << "\nOrphans search options:\n";
cout << " --num-shards num of shards to use for keeping the temporary scan info\n";
cout << " --orphan-stale-secs num of seconds to wait before declaring an object to be an orphan (default: 86400)\n";
ORPHANS_FIND,
ORPHANS_FINISH,
ORPHANS_LIST_JOBS,
+ RATELIMIT_GET,
+ RATELIMIT_SET,
+ RATELIMIT_ENABLE,
+ RATELIMIT_DISABLE,
ZONEGROUP_ADD,
ZONEGROUP_CREATE,
ZONEGROUP_DEFAULT,
GLOBAL_QUOTA_SET,
GLOBAL_QUOTA_ENABLE,
GLOBAL_QUOTA_DISABLE,
+ GLOBAL_RATELIMIT_GET,
+ GLOBAL_RATELIMIT_SET,
+ GLOBAL_RATELIMIT_ENABLE,
+ GLOBAL_RATELIMIT_DISABLE,
SYNC_INFO,
SYNC_STATUS,
ROLE_CREATE,
{ "quota set", OPT::QUOTA_SET },
{ "quota enable", OPT::QUOTA_ENABLE },
{ "quota disable", OPT::QUOTA_DISABLE },
+ { "ratelimit get", OPT::RATELIMIT_GET },
+ { "ratelimit set", OPT::RATELIMIT_SET },
+ { "ratelimit enable", OPT::RATELIMIT_ENABLE },
+ { "ratelimit disable", OPT::RATELIMIT_DISABLE },
{ "gc list", OPT::GC_LIST },
{ "gc process", OPT::GC_PROCESS },
{ "lc list", OPT::LC_LIST },
{ "global quota set", OPT::GLOBAL_QUOTA_SET },
{ "global quota enable", OPT::GLOBAL_QUOTA_ENABLE },
{ "global quota disable", OPT::GLOBAL_QUOTA_DISABLE },
+ { "global ratelimit get", OPT::GLOBAL_RATELIMIT_GET },
+ { "global ratelimit set", OPT::GLOBAL_RATELIMIT_SET },
+ { "global ratelimit enable", OPT::GLOBAL_RATELIMIT_ENABLE },
+ { "global ratelimit disable", OPT::GLOBAL_RATELIMIT_DISABLE },
{ "sync info", OPT::SYNC_INFO },
{ "sync status", OPT::SYNC_STATUS },
{ "role create", OPT::ROLE_CREATE },
return true;
}
+bool set_ratelimit_info(RGWRateLimitInfo& ratelimit, OPT opt_cmd, int64_t max_read_ops, int64_t max_write_ops,
+ int64_t max_read_bytes, int64_t max_write_bytes,
+ bool have_max_read_ops, bool have_max_write_ops,
+ bool have_max_read_bytes, bool have_max_write_bytes)
+{
+ bool ratelimit_configured = true;
+ switch (opt_cmd) {
+ case OPT::RATELIMIT_ENABLE:
+ case OPT::GLOBAL_RATELIMIT_ENABLE:
+ ratelimit.enabled = true;
+ break;
+
+
+ case OPT::RATELIMIT_SET:
+ case OPT::GLOBAL_RATELIMIT_SET:
+ ratelimit_configured = false;
+ if (have_max_read_ops) {
+ if (max_read_ops >= 0) {
+ ratelimit.max_read_ops = max_read_ops;
+ ratelimit_configured = true;
+ }
+ }
+ if (have_max_write_ops) {
+ if (max_write_ops >= 0) {
+ ratelimit.max_write_ops = max_write_ops;
+ ratelimit_configured = true;
+ }
+ }
+ if (have_max_read_bytes) {
+ if (max_read_bytes >= 0) {
+ ratelimit.max_read_bytes = max_read_bytes;
+ ratelimit_configured = true;
+ }
+ }
+ if (have_max_write_bytes) {
+ if (max_write_bytes >= 0) {
+ ratelimit.max_write_bytes = max_write_bytes;
+ ratelimit_configured = true;
+ }
+ }
+ break;
+ case OPT::RATELIMIT_DISABLE:
+ case OPT::GLOBAL_RATELIMIT_DISABLE:
+ ratelimit.enabled = false;
+ break;
+ default:
+ break;
+ }
+ return ratelimit_configured;
+}
+
void set_quota_info(RGWQuotaInfo& quota, OPT opt_cmd, int64_t max_size, int64_t max_objects,
bool have_max_size, bool have_max_objects)
{
return 0;
}
+int set_bucket_ratelimit(rgw::sal::Store* store, OPT opt_cmd,
+ const string& tenant_name, const string& bucket_name,
+ int64_t max_read_ops, int64_t max_write_ops,
+ int64_t max_read_bytes, int64_t max_write_bytes,
+ bool have_max_read_ops, bool have_max_write_ops,
+ bool have_max_read_bytes, bool have_max_write_bytes)
+{
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ int r = store->get_bucket(dpp(), nullptr, tenant_name, bucket_name, &bucket, null_yield);
+ if (r < 0) {
+ cerr << "could not get bucket info for bucket=" << bucket_name << ": " << cpp_strerror(-r) << std::endl;
+ return -r;
+ }
+ RGWRateLimitInfo ratelimit_info;
+ auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != bucket->get_attrs().end()) {
+ try {
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(ratelimit_info, biter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ bool ratelimit_configured = set_ratelimit_info(ratelimit_info, opt_cmd, max_read_ops, max_write_ops,
+ max_read_bytes, max_write_bytes,
+ have_max_read_ops, have_max_write_ops,
+ have_max_read_bytes, have_max_write_bytes);
+ if (!ratelimit_configured) {
+ ldpp_dout(dpp(), 0) << "ERROR: no rate limit values have been specified" << dendl;
+ return -EINVAL;
+ }
+ bufferlist bl;
+ ratelimit_info.encode(bl);
+ rgw::sal::Attrs attr;
+ attr[RGW_ATTR_RATELIMIT] = bl;
+ r = bucket->merge_and_store_attrs(dpp(), attr, null_yield);
+ if (r < 0) {
+ cerr << "ERROR: failed writing bucket instance info: " << cpp_strerror(-r) << std::endl;
+ return -r;
+ }
+ return 0;
+}
+
+int set_user_ratelimit(OPT opt_cmd, std::unique_ptr<rgw::sal::User>& user,
+ int64_t max_read_ops, int64_t max_write_ops,
+ int64_t max_read_bytes, int64_t max_write_bytes,
+ bool have_max_read_ops, bool have_max_write_ops,
+ bool have_max_read_bytes, bool have_max_write_bytes)
+{
+ RGWRateLimitInfo ratelimit_info;
+ user->load_user(dpp(), null_yield);
+ auto iter = user->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != user->get_attrs().end()) {
+ try {
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(ratelimit_info, biter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ bool ratelimit_configured = set_ratelimit_info(ratelimit_info, opt_cmd, max_read_ops, max_write_ops,
+ max_read_bytes, max_write_bytes,
+ have_max_read_ops, have_max_write_ops,
+ have_max_read_bytes, have_max_write_bytes);
+ if (!ratelimit_configured) {
+ ldpp_dout(dpp(), 0) << "ERROR: no rate limit values have been specified" << dendl;
+ return -EINVAL;
+ }
+ bufferlist bl;
+ ratelimit_info.encode(bl);
+ rgw::sal::Attrs attr;
+ attr[RGW_ATTR_RATELIMIT] = bl;
+ int r = user->merge_and_store_attrs(dpp(), attr, null_yield);
+ if (r < 0) {
+ cerr << "ERROR: failed writing user instance info: " << cpp_strerror(-r) << std::endl;
+ return -r;
+ }
+ return 0;
+}
+
+int show_user_ratelimit(std::unique_ptr<rgw::sal::User>& user, Formatter *formatter)
+{
+ RGWRateLimitInfo ratelimit_info;
+ user->load_user(dpp(), null_yield);
+ auto iter = user->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != user->get_attrs().end()) {
+ try {
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(ratelimit_info, biter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ formatter->open_object_section("user_ratelimit");
+ encode_json("user_ratelimit", ratelimit_info, formatter);
+ formatter->close_section();
+ formatter->flush(cout);
+ cout << std::endl;
+ return 0;
+}
+
+int show_bucket_ratelimit(rgw::sal::Store* store, const string& tenant_name,
+ const string& bucket_name, Formatter *formatter)
+{
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ int r = store->get_bucket(dpp(), nullptr, tenant_name, bucket_name, &bucket, null_yield);
+ if (r < 0) {
+ cerr << "could not get bucket info for bucket=" << bucket_name << ": " << cpp_strerror(-r) << std::endl;
+ return -r;
+ }
+ RGWRateLimitInfo ratelimit_info;
+ auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if (iter != bucket->get_attrs().end()) {
+ try {
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(ratelimit_info, biter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ formatter->open_object_section("bucket_ratelimit");
+ encode_json("bucket_ratelimit", ratelimit_info, formatter);
+ formatter->close_section();
+ formatter->flush(cout);
+ cout << std::endl;
+ return 0;
+}
int set_user_bucket_quota(OPT opt_cmd, RGWUser& user, RGWUserAdminOpState& op_state, int64_t max_size, int64_t max_objects,
bool have_max_size, bool have_max_objects)
{
string op_id;
string op_mask_str;
string quota_scope;
+ string ratelimit_scope;
string object_version;
string placement_id;
std::optional<string> opt_storage_class;
int64_t max_objects = -1;
int64_t max_size = -1;
+ int64_t max_read_ops = 0;
+ int64_t max_write_ops = 0;
+ int64_t max_read_bytes = 0;
+ int64_t max_write_bytes = 0;
bool have_max_objects = false;
bool have_max_size = false;
+ bool have_max_write_ops = false;
+ bool have_max_read_ops = false;
+ bool have_max_write_bytes = false;
+ bool have_max_read_bytes = false;
int include_all = false;
int allow_unordered = false;
return EINVAL;
}
have_max_objects = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-read-ops", (char*)NULL)) {
+ max_read_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse max read requests: " << err << std::endl;
+ return EINVAL;
+ }
+ have_max_read_ops = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-write-ops", (char*)NULL)) {
+ max_write_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse max write requests: " << err << std::endl;
+ return EINVAL;
+ }
+ have_max_write_ops = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-read-bytes", (char*)NULL)) {
+ max_read_bytes = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse max read bytes: " << err << std::endl;
+ return EINVAL;
+ }
+ have_max_read_bytes = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-write-bytes", (char*)NULL)) {
+ max_write_bytes = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse max write bytes: " << err << std::endl;
+ return EINVAL;
+ }
+ have_max_write_bytes = true;
} else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) {
date = val;
if (end_date.empty())
end_marker = val;
} else if (ceph_argparse_witharg(args, i, &val, "--quota-scope", (char*)NULL)) {
quota_scope = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--ratelimit-scope", (char*)NULL)) {
+ ratelimit_scope = val;
} else if (ceph_argparse_witharg(args, i, &val, "--index-type", (char*)NULL)) {
string index_type_str = val;
bi_index_type = get_bi_index_type(index_type_str);
OPT::PERIOD_GET_CURRENT, OPT::PERIOD_LIST,
OPT::GLOBAL_QUOTA_GET, OPT::GLOBAL_QUOTA_SET,
OPT::GLOBAL_QUOTA_ENABLE, OPT::GLOBAL_QUOTA_DISABLE,
+ OPT::GLOBAL_RATELIMIT_GET, OPT::GLOBAL_RATELIMIT_SET,
+ OPT::GLOBAL_RATELIMIT_ENABLE, OPT::GLOBAL_RATELIMIT_DISABLE,
OPT::REALM_DELETE, OPT::REALM_GET, OPT::REALM_LIST,
OPT::REALM_LIST_PERIODS,
OPT::REALM_GET_DEFAULT,
OPT::PERIOD_GET_CURRENT,
OPT::PERIOD_LIST,
OPT::GLOBAL_QUOTA_GET,
+ OPT::GLOBAL_RATELIMIT_GET,
OPT::SYNC_INFO,
OPT::SYNC_STATUS,
OPT::ROLE_GET,
formatter->flush(cout);
}
break;
+ case OPT::GLOBAL_RATELIMIT_GET:
+ case OPT::GLOBAL_RATELIMIT_SET:
+ case OPT::GLOBAL_RATELIMIT_ENABLE:
+ case OPT::GLOBAL_RATELIMIT_DISABLE:
+ {
+ if (realm_id.empty()) {
+ RGWRealm realm(g_ceph_context, static_cast<rgw::sal::RadosStore*>(store)->svc()->sysobj);
+ if (!realm_name.empty()) {
+ // look up realm_id for the given realm_name
+ int ret = realm.read_id(dpp(), realm_name, realm_id, null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: failed to read realm for " << realm_name
+ << ": " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ } else {
+ // use default realm_id when none is given
+ int ret = realm.read_default_id(dpp(), realm_id, null_yield);
+ if (ret < 0 && ret != -ENOENT) { // on ENOENT, use empty realm_id
+ cerr << "ERROR: failed to read default realm: "
+ << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+ }
+
+ RGWPeriodConfig period_config;
+ int ret = period_config.read(dpp(), static_cast<rgw::sal::RadosStore*>(store)->svc()->sysobj, realm_id, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: failed to read period config: "
+ << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ bool ratelimit_configured = true;
+ formatter->open_object_section("period_config");
+ if (ratelimit_scope == "bucket") {
+ ratelimit_configured = set_ratelimit_info(period_config.bucket_ratelimit, opt_cmd,
+ max_read_ops, max_write_ops,
+ max_read_bytes, max_write_bytes,
+ have_max_read_ops, have_max_write_ops,
+ have_max_read_bytes, have_max_write_bytes);
+ encode_json("bucket_ratelimit", period_config.bucket_ratelimit, formatter.get());
+ } else if (ratelimit_scope == "user") {
+ ratelimit_configured = set_ratelimit_info(period_config.user_ratelimit, opt_cmd,
+ max_read_ops, max_write_ops,
+ max_read_bytes, max_write_bytes,
+ have_max_read_ops, have_max_write_ops,
+ have_max_read_bytes, have_max_write_bytes);
+ encode_json("user_ratelimit", period_config.user_ratelimit, formatter.get());
+ } else if (ratelimit_scope == "anonymous") {
+ ratelimit_configured = set_ratelimit_info(period_config.anon_ratelimit, opt_cmd,
+ max_read_ops, max_write_ops,
+ max_read_bytes, max_write_bytes,
+ have_max_read_ops, have_max_write_ops,
+ have_max_read_bytes, have_max_write_bytes);
+ encode_json("anonymous_ratelimit", period_config.anon_ratelimit, formatter.get());
+ } else if (ratelimit_scope.empty() && opt_cmd == OPT::GLOBAL_RATELIMIT_GET) {
+ // if no scope is given for GET, print both
+ encode_json("bucket_ratelimit", period_config.bucket_ratelimit, formatter.get());
+ encode_json("user_ratelimit", period_config.user_ratelimit, formatter.get());
+ encode_json("anonymous_ratelimit", period_config.anon_ratelimit, formatter.get());
+ } else {
+ cerr << "ERROR: invalid rate limit scope specification. Please specify "
+ "either --ratelimit-scope=bucket, or --ratelimit-scope=user or --ratelimit-scope=anonymous" << std::endl;
+ return EINVAL;
+ }
+ if (!ratelimit_configured) {
+ cerr << "ERROR: no rate limit values have been specified" << std::endl;
+ return EINVAL;
+ }
+
+ formatter->close_section();
+
+ if (opt_cmd != OPT::GLOBAL_RATELIMIT_GET) {
+ // write the modified period config
+ ret = period_config.write(dpp(), static_cast<rgw::sal::RadosStore*>(store)->svc()->sysobj, realm_id, null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: failed to write period config: "
+ << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ if (!realm_id.empty()) {
+ cout << "Global ratelimit changes saved. Use 'period update' to apply "
+ "them to the staging period, and 'period commit' to commit the "
+ "new period." << std::endl;
+ } else {
+ cout << "Global ratelimit changes saved. They will take effect as "
+ "the gateways are restarted." << std::endl;
+ }
+ }
+
+ formatter->flush(cout);
+ }
+ break;
case OPT::GLOBAL_QUOTA_GET:
case OPT::GLOBAL_QUOTA_SET:
case OPT::GLOBAL_QUOTA_ENABLE:
cerr << "failure: " << cpp_strerror(-r) << ": " << err << std::endl;
return -r;
}
+ RGWRateLimitInfo ratelimit_info;
+ std::unique_ptr<rgw::sal::Bucket> bucket_sal;
+ auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != bucket->get_attrs().end()) {
+ try {
+ std::cerr << "here" << std::endl;
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(ratelimit_info, biter);
+ } catch (buffer::error& err) {
+ cerr << "ERROR: failed to decode rate limit" << std::endl;
+ }
+ }
}
if (opt_cmd == OPT::BUCKET_LINK) {
}
}
+ bool ratelimit_op_set = (opt_cmd == OPT::RATELIMIT_SET || opt_cmd == OPT::RATELIMIT_ENABLE || opt_cmd == OPT::RATELIMIT_DISABLE);
+ bool ratelimit_op_get = opt_cmd == OPT::RATELIMIT_GET;
+ if (ratelimit_op_set) {
+ if (bucket_name.empty() && rgw::sal::User::empty(user)) {
+ cerr << "ERROR: bucket name or uid is required for ratelimit operation" << std::endl;
+ return EINVAL;
+ }
+
+ if (!bucket_name.empty()) {
+ if (!ratelimit_scope.empty() && ratelimit_scope != "bucket") {
+ cerr << "ERROR: invalid ratelimit scope specification. (bucket scope is not bucket but bucket has been specified)" << std::endl;
+ return EINVAL;
+ }
+ return set_bucket_ratelimit(store, opt_cmd, tenant, bucket_name,
+ max_read_ops, max_write_ops,
+ max_read_bytes, max_write_bytes,
+ have_max_read_ops, have_max_write_ops,
+ have_max_read_bytes, have_max_write_bytes);
+ } else if (!rgw::sal::User::empty(user)) {
+ } if (ratelimit_scope == "user") {
+ return set_user_ratelimit(opt_cmd, user, max_read_ops, max_write_ops,
+ max_read_bytes, max_write_bytes,
+ have_max_read_ops, have_max_write_ops,
+ have_max_read_bytes, have_max_write_bytes);
+ } else {
+ cerr << "ERROR: invalid ratelimit scope specification. Please specify either --ratelimit-scope=bucket, or --ratelimit-scope=user" << std::endl;
+ return EINVAL;
+ }
+ }
+
+ if (ratelimit_op_get) {
+ if (bucket_name.empty() && rgw::sal::User::empty(user)) {
+ cerr << "ERROR: bucket name or uid is required for ratelimit operation" << std::endl;
+ return EINVAL;
+ }
+
+ if (!bucket_name.empty()) {
+ if (!ratelimit_scope.empty() && ratelimit_scope != "bucket") {
+ cerr << "ERROR: invalid ratelimit scope specification. (bucket scope is not bucket but bucket has been specified)" << std::endl;
+ return EINVAL;
+ }
+ return show_bucket_ratelimit(store, tenant, bucket_name, formatter.get());
+ } else if (!rgw::sal::User::empty(user)) {
+ } if (ratelimit_scope == "user") {
+ return show_user_ratelimit(user, formatter.get());
+ } else {
+ cerr << "ERROR: invalid ratelimit scope specification. Please specify either --ratelimit-scope=bucket, or --ratelimit-scope=user" << std::endl;
+ return EINVAL;
+ }
+ }
+
if (opt_cmd == OPT::MFA_CREATE) {
rados::cls::otp::otp_info_t config;
string user = "-";
const auto started = ceph::coarse_real_clock::now();
ceph::coarse_real_clock::duration latency{};
-
process_request(env.store, env.rest, &req, env.uri_prefix,
*env.auth_registry, &client, env.olog, y,
- scheduler, &user, &latency, &http_ret);
+ scheduler, &user, &latency,
+ env.ratelimiting->get_active(),
+ &http_ret);
if (cct->_conf->subsys.should_gather(dout_subsys, 1)) {
// access log line elements begin per Apache Combined Log Format with additions following
std::unique_ptr<rgw::sal::Bucket> bucket;
RGWQuotaInfo quota;
+ RGWRateLimitInfo ratelimit_info;
void set_fetch_stats(bool value) { stat_buckets = value; }
void set_check_objects(bool value) { check_objects = value; }
void set_quota(RGWQuotaInfo& value) {
quota = value;
}
+ void set_bucket_ratelimit(RGWRateLimitInfo& value) {
+ ratelimit_info = value;
+ }
void set_sync_bucket(bool value) { sync_bucket = value; }
return mask_to_str(op_type_flags, mask, buf, len);
}
+void RGWRateLimitInfo::decode_json(JSONObj *obj)
+{
+ JSONDecoder::decode_json("max_read_ops", max_read_ops, obj);
+ JSONDecoder::decode_json("max_write_ops", max_write_ops, obj);
+ JSONDecoder::decode_json("max_read_bytes", max_read_ops, obj);
+ JSONDecoder::decode_json("max_write_bytes", max_write_ops, obj);
+ JSONDecoder::decode_json("enabled", enabled, obj);
+}
+
+void RGWRateLimitInfo::dump(Formatter *f) const
+{
+ f->dump_int("max_read_ops", max_read_ops);
+ f->dump_int("max_write_ops", max_write_ops);
+ f->dump_int("max_read_bytes", max_read_bytes);
+ f->dump_int("max_write_bytes", max_write_bytes);
+ f->dump_bool("enabled", enabled);
+}
+
void RGWUserInfo::dump(Formatter *f) const
{
#include <array>
#include <string_view>
+#include <atomic>
+#include <unordered_map>
#include "common/ceph_crypto.h"
#include "common/random_string.h"
#define RGW_SYS_PARAM_PREFIX "rgwx-"
#define RGW_ATTR_ACL RGW_ATTR_PREFIX "acl"
+#define RGW_ATTR_RATELIMIT RGW_ATTR_PREFIX "ratelimit"
#define RGW_ATTR_LC RGW_ATTR_PREFIX "lc"
#define RGW_ATTR_CORS RGW_ATTR_PREFIX "cors"
#define RGW_ATTR_ETAG RGW_ATTR_PREFIX "etag"
std::string message;
};
+
/* Helper class used for RGWHTTPArgs parsing */
class NameVal
{
inline std::ostream& operator<<(std::ostream& out, const rgw_placement_rule& rule) {
return out << rule.to_str();
}
+
+class RateLimiter;
+struct RGWRateLimitInfo {
+ int64_t max_write_ops;
+ int64_t max_read_ops;
+ int64_t max_write_bytes;
+ int64_t max_read_bytes;
+ bool enabled = false;
+ RGWRateLimitInfo()
+ : max_write_ops(0), max_read_ops(0), max_write_bytes(0), max_read_bytes(0) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(max_write_ops, bl);
+ encode(max_read_ops, bl);
+ encode(max_write_bytes, bl);
+ encode(max_read_bytes, bl);
+ encode(enabled, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(max_write_ops,bl);
+ decode(max_read_ops, bl);
+ decode(max_write_bytes,bl);
+ decode(max_read_bytes, bl);
+ decode(enabled, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+
+ void decode_json(JSONObj *obj);
+
+};
+WRITE_CLASS_ENCODER(RGWRateLimitInfo)
+
struct RGWUserInfo
{
rgw_user user_id;
rgw::io::BasicClient *cio{nullptr};
http_op op{OP_UNKNOWN};
RGWOpType op_type{};
+ std::shared_ptr<RateLimiter> ratelimit_data;
+ RGWRateLimitInfo user_ratelimit;
+ RGWRateLimitInfo bucket_ratelimit;
+ std::string ratelimit_bucket_marker;
+ std::string ratelimit_user_name;
bool content_started{false};
int format{0};
ceph::Formatter *formatter{nullptr};
RGWLoadGenIO real_client_io(&env);
RGWRestfulIO client_io(cct, &real_client_io);
-
+ ActiveRateLimiter ratelimit(cct);
int ret = process_request(store, rest, req, uri_prefix,
*auth_registry, &client_io, olog,
- null_yield, nullptr, nullptr, nullptr);
+ null_yield, nullptr, nullptr, nullptr,
+ ratelimit.get_active());
if (ret < 0) {
/* we don't really care about return code */
dout(20) << "process_request() returned " << ret << dendl;
rgw::dmclock::SchedulerCtx sched_ctx{cct.get()};
OpsLogManifold *olog = new OpsLogManifold();
+ ActiveRateLimiter ratelimiting{cct.get()};
+ ratelimiting.start();
+
if (!g_conf()->rgw_ops_log_socket_path.empty()) {
OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
olog_socket->init(g_conf()->rgw_ops_log_socket_path);
std::string uri_prefix;
config->get_val("prefix", "", &uri_prefix);
- RGWProcessEnv env = { store, &rest, olog, port, uri_prefix, auth_registry };
+ RGWProcessEnv env = { store, &rest, olog, port, uri_prefix, auth_registry, &ratelimiting };
fe = new RGWLoadGenFrontend(env, config);
}
config->get_val("port", 80, &port);
std::string uri_prefix;
config->get_val("prefix", "", &uri_prefix);
- RGWProcessEnv env{ store, &rest, olog, port, uri_prefix, auth_registry };
+ RGWProcessEnv env{ store, &rest, olog, port, uri_prefix, auth_registry, &ratelimiting };
fe = new RGWAsioFrontend(env, config, sched_ctx);
}
#include "common/WorkQueue.h"
#include "include/scope_guard.h"
+#include <utility>
#include "rgw_dmclock_scheduler.h"
#include "rgw_rest.h"
#include "rgw_frontend.h"
#include "rgw_lua.h"
#include "rgw_lua_request.h"
#include "rgw_tracer.h"
+#include "rgw_ratelimit.h"
+
#include "services/svc_zone_utils.h"
#define dout_subsys ceph_subsys_rgw
process->req_throttle.put(1);
perfcounter->inc(l_rgw_qactive, -1);
}
+bool rate_limit(rgw::sal::Store* store, req_state* s) {
+ // we dont want to limit health check or system or admin requests
+ const auto& is_admin_or_system = s->user->get_info();
+ if ((s->op_type == RGW_OP_GET_HEALTH_CHECK) || is_admin_or_system.admin || is_admin_or_system.system)
+ return false;
+ std::string userfind;
+ RGWRateLimitInfo global_user;
+ RGWRateLimitInfo global_bucket;
+ RGWRateLimitInfo global_anon;
+ RGWRateLimitInfo* bucket_ratelimit;
+ RGWRateLimitInfo* user_ratelimit;
+ store->get_ratelimit(global_bucket, global_user, global_anon);
+ bucket_ratelimit = &global_bucket;
+ user_ratelimit = &global_user;
+ s->user->get_id().to_str(userfind);
+ userfind = "u" + userfind;
+ s->ratelimit_user_name = userfind;
+ std::string bucketfind = !rgw::sal::Bucket::empty(s->bucket.get()) ? "b" + s->bucket->get_marker() : "";
+ s->ratelimit_bucket_marker = bucketfind;
+ const char *method = s->info.method;
+
+ auto iter = s->user->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != s->user->get_attrs().end()) {
+ try {
+ RGWRateLimitInfo user_ratelimit_temp;
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(user_ratelimit_temp, biter);
+ // override global rate limiting only if local rate limiting is enabled
+ if (user_ratelimit_temp.enabled)
+ *user_ratelimit = user_ratelimit_temp;
+ } catch (buffer::error& err) {
+ ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ if (s->user->get_id().id == RGW_USER_ANON_ID && global_anon.enabled) {
+ *user_ratelimit = global_anon;
+ }
+ bool limit_bucket = false;
+ bool limit_user = s->ratelimit_data->should_rate_limit(method, s->ratelimit_user_name, s->time, user_ratelimit);
+
+ if(!rgw::sal::Bucket::empty(s->bucket.get()))
+ {
+ iter = s->bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != s->bucket->get_attrs().end()) {
+ try {
+ RGWRateLimitInfo bucket_ratelimit_temp;
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(bucket_ratelimit_temp, biter);
+ // override global rate limiting only if local rate limiting is enabled
+ if (bucket_ratelimit_temp.enabled)
+ *bucket_ratelimit = bucket_ratelimit_temp;
+ } catch (buffer::error& err) {
+ ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ if (!limit_user) {
+ limit_bucket = s->ratelimit_data->should_rate_limit(method, s->ratelimit_bucket_marker, s->time, bucket_ratelimit);
+ }
+ }
+ if(limit_bucket && !limit_user) {
+ s->ratelimit_data->giveback_tokens(method, s->ratelimit_user_name);
+ }
+ s->user_ratelimit = *user_ratelimit;
+ s->bucket_ratelimit = *bucket_ratelimit;
+ return (limit_user || limit_bucket);
+}
int rgw_process_authenticated(RGWHandler_REST * const handler,
RGWOp *& op,
RGWRequest * const req,
req_state * const s,
- optional_yield y,
+ optional_yield y,
+ rgw::sal::Store* store,
const bool skip_retarget)
{
ldpp_dout(op, 2) << "init permissions" << dendl;
ldpp_dout(op, 2) << "pre-executing" << dendl;
op->pre_exec();
+ ldpp_dout(op, 2) << "check rate limiting" << dendl;
+ if (rate_limit(store, s)) {
+ return -ERR_RATE_LIMITED;
+ }
ldpp_dout(op, 2) << "executing" << dendl;
{
auto span = tracing::rgw::tracer.add_span("execute", s->trace);
rgw::dmclock::Scheduler *scheduler,
string* user,
ceph::coarse_real_clock::duration* latency,
+ std::shared_ptr<RateLimiter> ratelimit,
int* http_ret)
{
int ret = client_io->init(g_ceph_context);
struct req_state rstate(g_ceph_context, &rgw_env, req->id);
struct req_state *s = &rstate;
+ s->ratelimit_data = ratelimit;
std::unique_ptr<rgw::sal::User> u = store->get_user(rgw_user());
s->set_user(u);
s->trace->SetAttribute(tracing::rgw::OP, op->name());
s->trace->SetAttribute(tracing::rgw::TYPE, tracing::rgw::REQUEST);
- ret = rgw_process_authenticated(handler, op, req, s, yield);
+ ret = rgw_process_authenticated(handler, op, req, s, yield, store);
if (ret < 0) {
abort_early(s, op, ret, handler, yield);
goto done;
dout(0) << "ERROR: client_io->complete_request() returned "
<< e.what() << dendl;
}
-
if (should_log) {
rgw_log_op(rest, s, (op ? op->name() : "unknown"), olog);
}
if (latency) {
*latency = lat;
}
-
dout(1) << "====== req done req=" << hex << req << dec
<< " op status=" << op_ret
<< " http_status=" << s->err.http_ret
#include "rgw_user.h"
#include "rgw_op.h"
#include "rgw_rest.h"
-
+#include "rgw_ratelimit.h"
#include "include/ceph_assert.h"
#include "common/WorkQueue.h"
int port;
std::string uri_prefix;
std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
+ //maybe there is a better place to store the rate limit data structure
+ ActiveRateLimiter* ratelimiting;
};
class RGWFrontendConfig;
rgw::dmclock::Scheduler *scheduler,
std::string* user,
ceph::coarse_real_clock::duration* latency,
+ std::shared_ptr<RateLimiter> ratelimit,
int* http_ret = nullptr);
extern int rgw_process_authenticated(RGWHandler_REST* handler,
RGWOp*& op,
RGWRequest* req,
req_state* s,
- optional_yield y,
+ optional_yield y,
+ rgw::sal::Store* store,
bool skip_retarget = false);
#if defined(def_dout_subsys)
--- /dev/null
+#pragma once
+#include <chrono>
+#include <thread>
+#include <condition_variable>
+#include "rgw_common.h"
+
+
+class RateLimiterEntry {
+ /*
+ fixed_point_rgw_ratelimit is important to preserve the precision of the token calculation
+ for example: a user have a limit of single op per minute, the user will consume its single token and then will send another request, 1s after it.
+ in that case, without this method, the user will get 0 tokens although it should get 0.016 tokens.
+ using this method it will add 16 tokens to the user, and the user will have 16 tokens, each time rgw will do comparison rgw will divide by fixed_point_rgw_ratelimit, so the user will be blocked anyway until it has enough tokens.
+ */
+ static constexpr int64_t fixed_point_rgw_ratelimit = 1000;
+ // counters are tracked in multiples of fixed_point_rgw_ratelimit
+ struct counters {
+ int64_t ops = 0;
+ int64_t bytes = 0;
+ };
+ counters read;
+ counters write;
+ ceph::timespan ts;
+ bool first_run = true;
+ std::mutex ts_lock;
+ // Those functions are returning the integer value of the tokens
+ int64_t read_ops () const
+ {
+ return read.ops / fixed_point_rgw_ratelimit;
+ }
+ int64_t write_ops() const
+ {
+ return write.ops / fixed_point_rgw_ratelimit;
+ }
+ int64_t read_bytes() const
+ {
+ return read.bytes / fixed_point_rgw_ratelimit;
+ }
+ int64_t write_bytes() const
+ {
+ return write.bytes / fixed_point_rgw_ratelimit;
+ }
+ bool should_rate_limit_read(int64_t ops_limit, int64_t bw_limit) {
+ //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
+ if(((read_ops() - 1 < 0) && (ops_limit > 0)) ||
+ (read_bytes() < 0 && bw_limit > 0))
+ {
+ return true;
+ }
+ // we don't want to reduce ops' tokens if we've rejected it.
+ read.ops -= fixed_point_rgw_ratelimit;
+ return false;
+ }
+ bool should_rate_limit_write(int64_t ops_limit, int64_t bw_limit)
+ {
+ //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
+ if(((write_ops() - 1 < 0) && (ops_limit > 0)) ||
+ (write_bytes() < 0 && bw_limit > 0))
+ {
+ return true;
+ }
+
+ // we don't want to reduce ops' tokens if we've rejected it.
+ write.ops -= fixed_point_rgw_ratelimit;
+ return false;
+ }
+ /* The purpose of this function is to minimum time before overriding the stored timestamp
+ This function is necessary to force the increase tokens add at least 1 token when it updates the last stored timestamp.
+ That way the user/bucket will not lose tokens because of rounding
+ */
+ bool minimum_time_reached(ceph::timespan curr_timestamp) const
+ {
+ using namespace std::chrono;
+ constexpr auto min_duration = duration_cast<ceph::timespan>(seconds(60)) / fixed_point_rgw_ratelimit;
+ const auto delta = curr_timestamp - ts;
+ if (delta < min_duration)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ void increase_tokens(ceph::timespan curr_timestamp,
+ const RGWRateLimitInfo* info)
+ {
+ constexpr int fixed_point = fixed_point_rgw_ratelimit;
+ if (first_run)
+ {
+ write.ops = info->max_write_ops * fixed_point;
+ write.bytes = info->max_write_bytes * fixed_point;
+ read.ops = info->max_read_ops * fixed_point;
+ read.bytes = info->max_read_bytes * fixed_point;
+ ts = curr_timestamp;
+ first_run = false;
+ return;
+ }
+ else if(curr_timestamp > ts && minimum_time_reached(curr_timestamp))
+ {
+ const int64_t time_in_ms = std::chrono::duration_cast<std::chrono::milliseconds>(curr_timestamp - ts).count() / 60.0 / std::milli::den * fixed_point; // / 60 to make it work with 1 min token bucket
+ ts = curr_timestamp;
+ const int64_t write_ops = info->max_write_ops * time_in_ms;
+ const int64_t write_bw = info->max_write_bytes * time_in_ms;
+ const int64_t read_ops = info->max_read_ops * time_in_ms;
+ const int64_t read_bw = info->max_read_bytes * time_in_ms;
+ read.ops = std::min(info->max_read_ops * fixed_point, read_ops + read.ops);
+ read.bytes = std::min(info->max_read_bytes * fixed_point, read_bw + read.bytes);
+ write.ops = std::min(info->max_write_ops * fixed_point, write_ops + write.ops);
+ write.bytes = std::min(info->max_write_bytes * fixed_point, write_bw + write.bytes);
+ }
+ }
+
+ public:
+ bool should_rate_limit(bool is_read, const RGWRateLimitInfo* ratelimit_info, ceph::timespan curr_timestamp)
+ {
+ std::unique_lock lock(ts_lock);
+ increase_tokens(curr_timestamp, ratelimit_info);
+ if (is_read)
+ {
+ return should_rate_limit_read(ratelimit_info->max_read_ops, ratelimit_info->max_read_bytes);
+ }
+ return should_rate_limit_write(ratelimit_info->max_write_ops, ratelimit_info->max_write_bytes);
+ }
+ void decrease_bytes(bool is_read, int64_t amount, const RGWRateLimitInfo* info) {
+ std::unique_lock lock(ts_lock);
+ // we don't want the tenant to be with higher debt than 120 seconds(2 min) of its limit
+ if (is_read)
+ {
+ read.bytes = std::max(read.bytes - amount * fixed_point_rgw_ratelimit,info->max_read_bytes * fixed_point_rgw_ratelimit * -2);
+ } else {
+ write.bytes = std::max(write.bytes - amount * fixed_point_rgw_ratelimit,info->max_write_bytes * fixed_point_rgw_ratelimit * -2);
+ }
+ }
+ void giveback_tokens(bool is_read)
+ {
+ std::unique_lock lock(ts_lock);
+ if (is_read)
+ {
+ read.ops += fixed_point_rgw_ratelimit;
+ } else {
+ write.ops += fixed_point_rgw_ratelimit;
+ }
+ }
+};
+
+class RateLimiter {
+
+ static constexpr size_t map_size = 2000000; // will create it with the closest upper prime number
+ std::shared_mutex insert_lock;
+ std::atomic_bool& replacing;
+ std::condition_variable& cv;
+ typedef std::unordered_map<std::string, RateLimiterEntry> hash_map;
+ hash_map ratelimit_entries{map_size};
+ static bool is_read_op(const std::string_view method) {
+ if (method == "GET" || method == "HEAD")
+ {
+ return true;
+ }
+ return false;
+ }
+
+ // find or create an entry, and return its iterator
+ auto& find_or_create(const std::string& key) {
+ std::shared_lock rlock(insert_lock);
+ if (ratelimit_entries.size() > 0.9 * map_size && replacing == false)
+ {
+ replacing = true;
+ cv.notify_all();
+ }
+ auto ret = ratelimit_entries.find(key);
+ rlock.unlock();
+ if (ret == ratelimit_entries.end())
+ {
+ std::unique_lock wlock(insert_lock);
+ ret = ratelimit_entries.emplace(std::piecewise_construct,
+ std::forward_as_tuple(key),
+ std::forward_as_tuple()).first;
+ }
+ return ret->second;
+ }
+
+
+
+ public:
+ RateLimiter(const RateLimiter&) = delete;
+ RateLimiter& operator =(const RateLimiter&) = delete;
+ RateLimiter(RateLimiter&&) = delete;
+ RateLimiter& operator =(RateLimiter&&) = delete;
+ RateLimiter() = delete;
+ RateLimiter(std::atomic_bool& replacing, std::condition_variable& cv)
+ : replacing(replacing), cv(cv)
+ {
+ // prevents rehash, so no iterators invalidation
+ ratelimit_entries.max_load_factor(1000);
+ };
+
+ bool should_rate_limit(const char *method, const std::string& key, ceph::coarse_real_time curr_timestamp, const RGWRateLimitInfo* ratelimit_info) {
+ if (key.empty() || key.length() == 1 || !ratelimit_info->enabled)
+ {
+ return false;
+ }
+ bool is_read = is_read_op(method);
+ auto& it = find_or_create(key);
+ auto curr_ts = curr_timestamp.time_since_epoch();
+ return it.should_rate_limit(is_read ,ratelimit_info, curr_ts);
+ }
+ void giveback_tokens(const char *method, const std::string& key)
+ {
+ bool is_read = is_read_op(method);
+ auto& it = find_or_create(key);
+ it.giveback_tokens(is_read);
+ }
+ void decrease_bytes(const char *method, const std::string& key, const int64_t amount, const RGWRateLimitInfo* info) {
+ if (key.empty() || key.length() == 1 || !info->enabled)
+ {
+ return;
+ }
+ bool is_read = is_read_op(method);
+ if ((is_read && !info->max_read_bytes) || (!is_read && !info->max_write_bytes))
+ {
+ return;
+ }
+ auto& it = find_or_create(key);
+ it.decrease_bytes(is_read, amount, info);
+ }
+ void clear() {
+ ratelimit_entries.clear();
+ }
+};
+// This class purpose is to hold 2 RateLimiter instances, one active and one passive.
+// once the active has reached the watermark for clearing it will call the replace_active() thread using cv
+// The replace_active will clear the previous RateLimiter after all requests to it has been done (use_count() > 1)
+// In the meanwhile new requests will come into the newer active
+class ActiveRateLimiter : public DoutPrefix {
+ std::atomic_uint8_t stopped = {false};
+ std::condition_variable cv;
+ std::mutex cv_m;
+ std::thread runner;
+ std::atomic_bool replacing = false;
+ std::atomic_uint8_t current_active = 0;
+ std::shared_ptr<RateLimiter> ratelimit[2];
+ void replace_active() {
+ using namespace std::chrono_literals;
+ std::unique_lock<std::mutex> lk(cv_m);
+ while (!stopped) {
+ cv.wait(lk);
+ current_active = current_active ^ 1;
+ ldpp_dout(this, 20) << "replacing active ratelimit data structure" << dendl;
+ while (!stopped && ratelimit[(current_active ^ 1)].use_count() > 1 ) {
+ if (cv.wait_for(lk, 1min) != std::cv_status::timeout && stopped)
+ {
+ return;
+ }
+ }
+ if (stopped)
+ {
+ return;
+ }
+ ldpp_dout(this, 20) << "clearing passive ratelimit data structure" << dendl;
+ ratelimit[(current_active ^ 1)]->clear();
+ replacing = false;
+ }
+ }
+ public:
+ ActiveRateLimiter(const ActiveRateLimiter&) = delete;
+ ActiveRateLimiter& operator =(const ActiveRateLimiter&) = delete;
+ ActiveRateLimiter(ActiveRateLimiter&&) = delete;
+ ActiveRateLimiter& operator =(ActiveRateLimiter&&) = delete;
+ ActiveRateLimiter() = delete;
+ ActiveRateLimiter(CephContext* cct) :
+ DoutPrefix(cct, ceph_subsys_rgw, "rate limiter: ")
+ {
+ ratelimit[0] = std::make_shared<RateLimiter>(replacing, cv);
+ ratelimit[1] = std::make_shared<RateLimiter>(replacing, cv);
+ }
+ ~ActiveRateLimiter() {
+ ldpp_dout(this, 20) << "stopping ratelimit_gc thread" << dendl;
+ cv_m.lock();
+ stopped = true;
+ cv_m.unlock();
+ cv.notify_all();
+ runner.join();
+ }
+ std::shared_ptr<RateLimiter> get_active() {
+ return ratelimit[current_active];
+ }
+ void start() {
+ ldpp_dout(this, 20) << "starting ratelimit_gc thread" << dendl;
+ runner = std::thread(&ActiveRateLimiter::replace_active, this);
+ const auto rc = ceph_pthread_setname(runner.native_handle(), "ratelimit_gc");
+ ceph_assert(rc==0);
+ }
+};
#include "rgw_resolve.h"
#include "rgw_sal_rados.h"
+#include "rgw_ratelimit.h"
#include <numeric>
#define dout_subsys ceph_subsys_rgw
const char* const buf,
const size_t len)
{
+ bool healthchk = false;
+ // we dont want to limit health checks
+ if(s->op_type == RGW_OP_GET_HEALTH_CHECK)
+ healthchk = true;
+ if(len > 0 && !healthchk) {
+ const char *method = s->info.method;
+ s->ratelimit_data->decrease_bytes(method, s->ratelimit_user_name, len, &s->user_ratelimit);
+ if(!rgw::sal::Bucket::empty(s->bucket.get()))
+ s->ratelimit_data->decrease_bytes(method, s->ratelimit_bucket_marker, len, &s->bucket_ratelimit);
+ }
try {
return RESTFUL_IO(s)->send_body(buf, len);
} catch (rgw::io::Exception& e) {
char* const buf,
const size_t max)
{
+ int len;
try {
- return RESTFUL_IO(s)->recv_body(buf, max);
+ len = RESTFUL_IO(s)->recv_body(buf, max);
} catch (rgw::io::Exception& e) {
return -e.code().value();
}
+ bool healthchk = false;
+ // we dont want to limit health checks
+ if(s->op_type == RGW_OP_GET_HEALTH_CHECK)
+ healthchk = true;
+ if(len > 0 && !healthchk) {
+ const char *method = s->info.method;
+ s->ratelimit_data->decrease_bytes(method, s->ratelimit_user_name, len, &s->user_ratelimit);
+ if(!rgw::sal::Bucket::empty(s->bucket.get()))
+ s->ratelimit_data->decrease_bytes(method, s->ratelimit_bucket_marker, len, &s->bucket_ratelimit);
+ }
+ return len;
+
}
int RGWGetObj_ObjStore::get_params(optional_yield y)
RGWOp* newop = &get_errpage_op;
RGWRequest req(0);
- return rgw_process_authenticated(handler, newop, &req, s, y, true);
+ return rgw_process_authenticated(handler, newop, &req, s, y, store, true);
}
int RGWSwiftWebsiteHandler::error_handler(const int err_no,
const std::map<std::string, std::string>& meta) = 0;
/** Get default quota info. Used as fallback if a user or bucket has no quota set*/
virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) = 0;
+ /** Get global rate limit configuration*/
+ virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) = 0;
/** Enable or disable a set of bucket. e.g. if a User is suspended */
virtual int set_buckets_enabled(const DoutPrefixProvider* dpp, std::vector<rgw_bucket>& buckets, bool enabled) = 0;
/** Get a new request ID */
virtual int read_attrs(const DoutPrefixProvider* dpp, optional_yield y) = 0;
/** Set the attributes in attrs, leaving any other existing attrs set, and
* write them to the backing store; a merge operation */
- virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) {
- for(auto& it : new_attrs) {
- attrs[it.first] = it.second;
- }
- return 0;
- }
+ virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0;
virtual int read_stats(const DoutPrefixProvider *dpp,
optional_yield y, RGWStorageStats* stats,
ceph::real_time* last_stats_sync = nullptr,
std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) = 0;
/** Trim User usage stats to the given epoch range */
virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) = 0;
- virtual RGWObjVersionTracker& get_version_tracker() { return objv_tracker; }
- virtual Attrs& get_attrs() { return attrs; }
+
+ /** Load this User from the backing store. requires ID to be set, fills all other fields. */
+ virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) = 0;
+ /** Store this User to the backing store */
virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info = nullptr) = 0;
/** Remove this User from the backing store */
virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) = 0;
virtual int check_quota(const DoutPrefixProvider *dpp, RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) = 0;
/** Set the attributes in attrs, leaving any other existing attrs set, and
* write them to the backing store; a merge operation */
- virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) {
- for(auto& it : new_attrs) {
- attrs[it.first] = it.second;
- }
- return 0;
- }
+ virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0;
/** Try to refresh the cached bucket info from the backing store. Used in
* read-modify-update loop. */
virtual int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime) = 0;
return ret;
}
-
+ int DBUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
+ {
+ for(auto& it : new_attrs) {
+ attrs[it.first] = it.second;
+ }
+ return store_user(dpp, y, false);
+ }
int DBUser::store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info)
{
int ret = 0;
{
int ret = 0;
- Bucket::merge_and_store_attrs(dpp, new_attrs, y);
+ for(auto& it : new_attrs) {
+ attrs[it.first] = it.second;
+ }
/* XXX: handle has_instance_obj like in set_bucket_instance_attrs() */
return 0;
}
+ void DBStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit)
+ {
+ return;
+ }
+
void DBStore::get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota)
{
// XXX: Not handled for the first pass
virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
/* Placeholders */
+ virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) override;
virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) override;
virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info = nullptr) override;
virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
virtual int register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type,
const map<string, string>& meta) override;
+ virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) override;
virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) override;
virtual int set_buckets_enabled(const DoutPrefixProvider *dpp, vector<rgw_bucket>& buckets, bool enabled) override;
virtual uint64_t get_new_req_id() override { return 0; }
int RadosUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
{
- User::merge_and_store_attrs(dpp, new_attrs, y);
+ for(auto& it : new_attrs) {
+ attrs[it.first] = it.second;
+ }
return store_user(dpp, y, false);
}
int RadosBucket::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
{
- Bucket::merge_and_store_attrs(dpp, new_attrs, y);
+ for(auto& it : new_attrs) {
+ attrs[it.first] = it.second;
+ }
return store->ctl()->bucket->set_bucket_instance_attrs(get_info(),
new_attrs, &get_info().objv_tracker, y, dpp);
}
user_quota = svc()->quota->get_user_quota();
}
+void RadosStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit)
+{
+ bucket_ratelimit = svc()->zone->get_current_period().get_config().bucket_ratelimit;
+ user_ratelimit = svc()->zone->get_current_period().get_config().user_ratelimit;
+ anon_ratelimit = svc()->zone->get_current_period().get_config().anon_ratelimit;
+}
+
int RadosStore::set_buckets_enabled(const DoutPrefixProvider* dpp, vector<rgw_bucket>& buckets, bool enabled)
{
return rados->set_buckets_enabled(buckets, enabled, dpp);
virtual int register_to_service_map(const DoutPrefixProvider *dpp, const std::string& daemon_type,
const std::map<std::string, std::string>& meta) override;
virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) override;
+ virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) override;
virtual int set_buckets_enabled(const DoutPrefixProvider* dpp, std::vector<rgw_bucket>& buckets, bool enabled) override;
virtual uint64_t get_new_req_id() override { return rados->get_new_req_id(); }
virtual int get_sync_policy_handler(const DoutPrefixProvider* dpp,
bool bucket_quota_specified{false};
bool user_quota_specified{false};
+ bool bucket_ratelimit_specified{false};
+ bool user_ratelimit_specified{false};
RGWQuotaInfo bucket_quota;
RGWQuotaInfo user_quota;
+ RGWRateLimitInfo user_ratelimit;
+ RGWRateLimitInfo bucket_ratelimit;
// req parameters for listing user
std::string marker{""};
user_quota_specified = true;
}
+ void set_bucket_ratelimit(RGWRateLimitInfo& ratelimit) {
+ bucket_ratelimit = ratelimit;
+ bucket_ratelimit_specified = true;
+ }
+
+ void set_user_ratelimit(RGWRateLimitInfo& ratelimit) {
+ user_ratelimit = ratelimit;
+ user_ratelimit_specified = true;
+ }
+
void set_mfa_ids(const std::set<std::string>& ids) {
mfa_ids = ids;
mfa_ids_specified = true;
{
encode_json("bucket_quota", bucket_quota, f);
encode_json("user_quota", user_quota, f);
+ encode_json("user_ratelimit", user_ratelimit, f);
+ encode_json("bucket_ratelimit", bucket_ratelimit, f);
+ encode_json("anonymous_ratelimit", anon_ratelimit, f);
}
void RGWPeriodConfig::decode_json(JSONObj *obj)
{
JSONDecoder::decode_json("bucket_quota", bucket_quota, obj);
JSONDecoder::decode_json("user_quota", user_quota, obj);
+ JSONDecoder::decode_json("user_ratelimit", user_ratelimit, obj);
+ JSONDecoder::decode_json("bucket_ratelimit", bucket_ratelimit, obj);
+ JSONDecoder::decode_json("anonymous_ratelimit", anon_ratelimit, obj);
}
void RGWRegionMap::dump(Formatter *f) const
{
RGWQuotaInfo bucket_quota;
RGWQuotaInfo user_quota;
+ RGWRateLimitInfo user_ratelimit;
+ RGWRateLimitInfo bucket_ratelimit;
+ // rate limit unauthenticated user
+ RGWRateLimitInfo anon_ratelimit;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(bucket_quota, bl);
encode(user_quota, bl);
+ encode(bucket_ratelimit, bl);
+ encode(user_ratelimit, bl);
+ encode(anon_ratelimit, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(bucket_quota, bl);
decode(user_quota, bl);
+ if (struct_v >= 2) {
+ decode(bucket_ratelimit, bl);
+ decode(user_ratelimit, bl);
+ decode(anon_ratelimit, bl);
+ }
DECODE_FINISH(bl);
}
quota set set quota params
quota enable enable quota
quota disable disable quota
+ ratelimit get get ratelimit params
+ ratelimit set set ratelimit params
+ ratelimit enable enable ratelimit
+ ratelimit disable disable ratelimit
global quota get view global quota params
global quota set set global quota params
global quota enable enable a global quota
global quota disable disable a global quota
+ global ratelimit get view global ratelimit params
+ global ratelimit set set global ratelimit params
+ global ratelimit enable enable a ratelimit quota
+ global ratelimit disable disable a ratelimit quota
realm create create a new realm
realm rm remove a realm
realm get show realm info
--max-size specify max size (in B/K/M/G/T, negative value to disable)
--quota-scope scope of quota (bucket, user)
+ Rate limiting options:
+ --max-read-ops specify max requests per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited
+ --max-read-bytes specify max bytes per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited
+ --max-write-ops specify max requests per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited
+ --max-write-bytes specify max bytes per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited
+ --ratelimit-scope scope of rate limiting: bucket, user, anonymous
+ anonymous can be configured only with global rate limit
+
Orphans search options:
--num-shards num of shards to use for keeping the temporary scan info
--orphan-stale-secs num of seconds to wait before declaring an object to be an orphan (default: 86400)
add_library(test_rgw_a STATIC ${test_rgw_a_src})
target_link_libraries(test_rgw_a ${rgw_libs})
+add_executable(bench_rgw_ratelimit bench_rgw_ratelimit.cc)
+target_link_libraries(bench_rgw_ratelimit ${rgw_libs})
+
+add_executable(bench_rgw_ratelimit_gc bench_rgw_ratelimit_gc.cc )
+target_link_libraries(bench_rgw_ratelimit_gc ${rgw_libs})
+
+add_executable(unittest_rgw_ratelimit test_rgw_ratelimit.cc $<TARGET_OBJECTS:unit-main>)
+target_link_libraries(unittest_rgw_ratelimit ${rgw_libs})
+add_ceph_unittest(unittest_rgw_ratelimit)
+
# ceph_test_rgw_manifest
set(test_rgw_manifest_srcs test_rgw_manifest.cc)
add_executable(ceph_test_rgw_manifest
--- /dev/null
+#include "rgw/rgw_ratelimit.h"
+#include "rgw/rgw_common.h"
+#include "random"
+#include <cstdlib>
+#include <string>
+#include <boost/asio.hpp>
+#include <spawn/spawn.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <chrono>
+#include <mutex>
+#include <unordered_map>
+#include <atomic>
+#include <boost/program_options.hpp>
+
+
+using Executor = boost::asio::io_context::executor_type;
+std::uniform_int_distribution<unsigned int> dist(0, 1);
+std::random_device rd;
+std::default_random_engine rng{rd()};
+std::uniform_int_distribution<unsigned long long> disttenant(2, 100000000);
+struct client_info {
+ uint64_t accepted = 0;
+ uint64_t rejected = 0;
+ uint64_t ops = 0;
+ uint64_t bytes = 0;
+ uint64_t num_retries = 0;
+ std::string tenant;
+};
+
+struct parameters {
+ int64_t req_size = 1;
+ int64_t backend_bandwidth = 1;
+ size_t wait_between_retries_ms = 1;
+ int num_clients = 1;
+};
+std::shared_ptr<std::vector<client_info>> ds = std::make_shared<std::vector<client_info>>(std::vector<client_info>());
+
+std::string method[2] = {"PUT", "GET"};
+void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx)
+{
+ auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
+ boost::asio::steady_timer timer(ioctx);
+ int rw = 0; // will always use PUT method as there is no difference
+ std::string methodop(method[rw]);
+ auto req_size = params.req_size;
+ auto backend_bandwidth = params.backend_bandwidth;
+// the 4 * 1024 * 1024 is the RGW default we are sending in a typical environment
+ while (req_size) {
+ if (req_size <= backend_bandwidth) {
+ while (req_size > 0) {
+ if(req_size > 4*1024*1024) {
+ ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info);
+ it.bytes += 4*1024*1024;
+ req_size = req_size - 4*1024*1024;
+ }
+ else {
+ ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info);
+ req_size = 0;
+ }
+ }
+ } else {
+ int64_t total_bytes = 0;
+ while (req_size > 0) {
+ if (req_size >= 4*1024*1024) {
+ if (total_bytes >= backend_bandwidth)
+ {
+ timer.expires_after(std::chrono::seconds(1));
+ timer.async_wait(yield);
+ total_bytes = 0;
+ }
+ ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info);
+ it.bytes += 4*1024*1024;
+ req_size = req_size - 4*1024*1024;
+ total_bytes += 4*1024*1024;
+ }
+ else {
+ ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info);
+ it.bytes += req_size;
+ total_bytes += req_size;
+ req_size = 0;
+ }
+ }
+ }
+ }
+}
+bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit)
+{
+ boost::asio::io_context context;
+ auto time = ceph::coarse_real_clock::now();
+ int rw = 0; // will always use PUT method as there is no different
+ std::string methodop = method[rw];
+ auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
+ bool to_fail = ratelimit->should_rate_limit(methodop.c_str(), it.tenant, time, &info);
+ if(to_fail)
+ {
+ it.rejected++;
+ it.ops++;
+ return true;
+ }
+ it.accepted++;
+ return false;
+}
+void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
+{
+ for (;;)
+ {
+ bool to_retry = simulate_request(it, info, ratelimit);
+ while (to_retry && to_run)
+ {
+ if (params.wait_between_retries_ms)
+ {
+ boost::asio::steady_timer timer(ioctx);
+ timer.expires_after(std::chrono::milliseconds(params.wait_between_retries_ms));
+ timer.async_wait(ctx);
+ }
+ to_retry = simulate_request(it, info, ratelimit);
+ }
+ if (!to_run)
+ {
+ return;
+ }
+ simulate_transfer(it, &info, ratelimit, params, ctx, ioctx);
+ }
+}
+void simulate_clients(boost::asio::io_context& context, std::string tenant, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, bool& to_run)
+{
+ for (int i = 0; i < params.num_clients; i++)
+ {
+ auto& it = ds->emplace_back(client_info());
+ it.tenant = tenant;
+ int x = ds->size() - 1;
+ spawn::spawn(context,
+ [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx)
+ {
+ auto& it = ds.get()->operator[](x);
+ simulate_client(it, info, ratelimit, params, ctx, to_run, context);
+ });
+ }
+}
+int main(int argc, char **argv)
+{
+ int num_ratelimit_classes = 1;
+ int64_t ops_limit = 1;
+ int64_t bw_limit = 1;
+ int thread_count = 512;
+ int runtime = 60;
+ parameters params;
+ try
+ {
+ using namespace boost::program_options;
+ options_description desc{"Options"};
+ desc.add_options()
+ ("help,h", "Help screen")
+ ("num_ratelimit_classes", value<int>()->default_value(1), "how many ratelimit tenants")
+ ("request_size", value<int64_t>()->default_value(1), "what is the request size we are testing if 0, it will be randomized")
+ ("backend_bandwidth", value<int64_t>()->default_value(1), "what is the backend bandwidth, so there will be wait between decrease_bytes")
+ ("wait_between_retries_ms", value<size_t>()->default_value(1), "time in seconds to wait between retries")
+ ("ops_limit", value<int64_t>()->default_value(1), "ops limit for the tenants")
+ ("bw_limit", value<int64_t>()->default_value(1), "bytes per second limit")
+ ("threads", value<int>()->default_value(512), "server's threads count")
+ ("runtime", value<int>()->default_value(60), "For how many seconds the test will run")
+ ("num_clients", value<int>()->default_value(1), "number of clients per tenant to run");
+ variables_map vm;
+ store(parse_command_line(argc, argv, desc), vm);
+ if (vm.count("help")) {
+ std::cout << desc << std::endl;
+ return EXIT_SUCCESS;
+ }
+ num_ratelimit_classes = vm["num_ratelimit_classes"].as<int>();
+ params.req_size = vm["request_size"].as<int64_t>();
+ params.backend_bandwidth = vm["backend_bandwidth"].as<int64_t>();
+ params.wait_between_retries_ms = vm["wait_between_retries_ms"].as<size_t>();
+ params.num_clients = vm["num_clients"].as<int>();
+ ops_limit = vm["ops_limit"].as<int64_t>();
+ bw_limit = vm["bw_limit"].as<int64_t>();
+ thread_count = vm["threads"].as<int>();
+ runtime = vm["runtime"].as<int>();
+ }
+ catch (const boost::program_options::error &ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ return EXIT_FAILURE;
+ }
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = bw_limit;
+ info.max_write_bytes = bw_limit;
+ info.max_read_ops = ops_limit;
+ info.max_write_ops = ops_limit;
+ std::unique_ptr<CephContext> cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_ANY);
+ if (!g_ceph_context)
+ {
+ g_ceph_context = cct.get();
+ }
+ std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+ ratelimit->start();
+ std::vector<std::thread> threads;
+ using Executor = boost::asio::io_context::executor_type;
+ std::optional<boost::asio::executor_work_guard<Executor>> work;
+ threads.reserve(thread_count);
+ boost::asio::io_context context;
+ boost::asio::io_context stopme;
+ work.emplace(boost::asio::make_work_guard(context));
+ // server execution
+ for (int i = 0; i < thread_count; i++) {
+ threads.emplace_back([&]() noexcept {
+ context.run();
+ });
+ }
+ //client execution
+ bool to_run = true;
+ ds->reserve(num_ratelimit_classes*params.num_clients);
+ for (int i = 0; i < num_ratelimit_classes; i++)
+ {
+ unsigned long long tenantid = disttenant(rng);
+ std::string tenantuser = "uuser" + std::to_string(tenantid);
+ simulate_clients(context, tenantuser, info, ratelimit->get_active(), params, to_run);
+ }
+ boost::asio::steady_timer timer_runtime(stopme);
+ timer_runtime.expires_after(std::chrono::seconds(runtime));
+ timer_runtime.wait();
+ work.reset();
+ context.stop();
+ to_run = false;
+
+ for (auto& i : threads)
+ {
+ i.join();
+ }
+ std::unordered_map<std::string,client_info> metrics_by_tenant;
+ for(auto& i : *ds.get())
+ {
+ auto it = metrics_by_tenant.emplace(i.tenant, client_info()).first;
+ std::cout << i.accepted << std::endl;
+ it->second.accepted += i.accepted;
+ it->second.rejected += i.rejected;
+ }
+ // TODO sum the results by tenant
+ for(auto& i : metrics_by_tenant)
+ {
+ std::cout << "Tenant is: " << i.first << std::endl;
+ std::cout << "Simulator finished accepted sum : " << i.second.accepted << std::endl;
+ std::cout << "Simulator finished rejected sum : " << i.second.rejected << std::endl;
+ }
+
+ return 0;
+}
--- /dev/null
+#include "rgw/rgw_ratelimit.h"
+#include "rgw/rgw_common.h"
+#include "random"
+#include <cstdlib>
+#include <string>
+#include <chrono>
+#include <boost/program_options.hpp>
+int main(int argc, char **argv)
+{
+ int num_qos_classes = 1;
+ try
+ {
+ using namespace boost::program_options;
+ options_description desc{"Options"};
+ desc.add_options()
+ ("help,h", "Help screen")
+ ("num_qos_classes", value<int>()->default_value(1), "how many qos tenants");
+ variables_map vm;
+ store(parse_command_line(argc, argv, desc), vm);
+ if (vm.count("help")) {
+ std::cout << desc << std::endl;
+ return EXIT_SUCCESS;
+ }
+ num_qos_classes = vm["num_qos_classes"].as<int>();
+ }
+ catch (const boost::program_options::error &ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ return EXIT_FAILURE;
+ }
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = 0;
+ info.max_write_bytes = 0;
+ info.max_read_ops = 0;
+ info.max_write_ops = 0;
+ std::unique_ptr<CephContext> cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_ANY);
+ if (!g_ceph_context)
+ {
+ g_ceph_context = cct.get();
+ }
+ std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+ ratelimit->start();
+ auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
+ for(int i = 0; i < num_qos_classes; i++)
+ {
+ std::string tenant = "uuser" + std::to_string(i);
+ auto time = ceph::coarse_real_clock::now();
+ ratelimit->get_active()->should_rate_limit("PUT", tenant, time, &info);
+ }
+
+}
virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) override {
return 0;
}
-
+ virtual int merge_and_store_attrs(const DoutPrefixProvider *dpp, rgw::sal::Attrs& attrs, optional_yield y) override {
+ return 0;
+ }
virtual ~TestUser() = default;
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <gtest/gtest.h>
+#include "rgw/rgw_ratelimit.h"
+
+
+using namespace std::chrono_literals;
+
+TEST(RGWRateLimit, op_limit_not_enabled)
+{
+ // info.enabled = false, so no limit
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("PUT", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, reject_op_over_limit)
+{
+ // check that request is being rejected because there are not enough tokens
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ time = ceph::coarse_real_clock::now();
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimit, accept_op_after_giveback)
+{
+ // check that giveback is working fine
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ ratelimit.giveback_tokens("GET", key);
+ time = ceph::coarse_real_clock::now();
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, accept_op_after_refill)
+{
+ // check that tokens are being filled properly
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ time += 61s;
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, reject_bw_over_limit)
+{
+ // check that a newer request is rejected if there is no enough tokens (bw)
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = 1;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ ratelimit.decrease_bytes("GET",key, 2, &info);
+ time = ceph::coarse_real_clock::now();
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimit, accept_bw)
+{
+ // check that when there are enough tokens (bw) the request is still being served
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = 2;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ ratelimit.decrease_bytes("GET",key, 1, &info);
+ time = ceph::coarse_real_clock::now();
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, check_bw_debt_at_max_120secs)
+{
+ // check that the bandwidth debt is not larger than 120 seconds
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = 2;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ ratelimit.decrease_bytes("GET",key, 100, &info);
+ time += 121s;
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, check_that_bw_limit_not_affect_ops)
+{
+ // check that high read bytes limit, does not affect ops limit
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ info.max_read_bytes = 100000000;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ ratelimit.decrease_bytes("GET",key, 10000, &info);
+ time = ceph::coarse_real_clock::now();
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimit, read_limit_does_not_affect_writes)
+{
+ // read limit does not affect writes
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ info.max_read_bytes = 100000000;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("PUT", key, time, &info);
+ ratelimit.decrease_bytes("PUT",key, 10000, &info);
+ time = ceph::coarse_real_clock::now();
+ success = ratelimit.should_rate_limit("PUT", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, write_limit_does_not_affect_reads)
+{
+ // write limit does not affect reads
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_write_ops = 1;
+ info.max_write_bytes = 100000000;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ ratelimit.decrease_bytes("GET",key, 10000, &info);
+ time = ceph::coarse_real_clock::now();
+ success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+
+TEST(RGWRateLimit, allow_unlimited_access)
+{
+ // 0 values in RGWRateLimitInfo should allow unlimited access
+ std::atomic_bool replacing;
+ std::condition_variable cv;
+ RateLimiter ratelimit(replacing, cv);
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+ EXPECT_EQ(false, success);
+}
+
+TEST(RGWRateLimitGC, NO_GC_AHEAD_OF_TIME)
+{
+ // Test if GC is not starting the replace before getting to map_size * 0.9
+ // Please make sure to change those values when you change the map_size in the code
+
+ std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+ ratelimit->start();
+ auto active = ratelimit->get_active();
+ RGWRateLimitInfo info;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "uuser123";
+ active->should_rate_limit("GET", key, time, &info);
+ auto activegc = ratelimit->get_active();
+ EXPECT_EQ(activegc, active);
+}
+TEST(RGWRateLimiterGC, GC_IS_WORKING)
+{
+ // Test if GC is replacing the active RateLimiter
+ // Please make sure to change those values when you change the map_size in the code
+
+ std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+ ratelimit->start();
+ auto active = ratelimit->get_active();
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ auto time = ceph::coarse_real_clock::now();
+ std::string key = "-1";
+ for(int i = 0; i < 2000000; i++)
+ {
+ active->should_rate_limit("GET", key, time, &info);
+ key = std::to_string(i);
+ }
+ auto activegc = ratelimit->get_active();
+ EXPECT_NE(activegc, active);
+}
+
+
+TEST(RGWRateLimitEntry, op_limit_not_enabled)
+{
+ // info.enabled = false, so no limit
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(false, &info, time);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, reject_op_over_limit)
+{
+ // check that request is being rejected because there are not enough tokens
+
+ RGWRateLimitInfo info;
+ RateLimiterEntry entry;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ time = ceph::coarse_real_clock::now().time_since_epoch();
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimitEntry, accept_op_after_giveback)
+{
+ // check that giveback is working fine
+ RGWRateLimitInfo info;
+ RateLimiterEntry entry;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ entry.giveback_tokens(true);
+ time = ceph::coarse_real_clock::now().time_since_epoch();
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, accept_op_after_refill)
+{
+ // check that tokens are being filled properly
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ time += 61s;
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, reject_bw_over_limit)
+{
+ // check that a newer request is rejected if there is no enough tokens (bw)
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = 1;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ entry.decrease_bytes(true, 2, &info);
+ time = ceph::coarse_real_clock::now().time_since_epoch();
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimitEntry, accept_bw)
+{
+ // check that when there are enough tokens (bw) the request is still being served
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = 2;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ entry.decrease_bytes(true, 1, &info);
+ time = ceph::coarse_real_clock::now().time_since_epoch();
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, check_bw_debt_at_max_120secs)
+{
+ // check that the bandwidth debt is not larger than 120 seconds
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_bytes = 2;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ entry.decrease_bytes(true, 100, &info);
+ time += 121s;
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, check_that_bw_limit_not_affect_ops)
+{
+ // check that high read bytes limit, does not affect ops limit
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ info.max_read_bytes = 100000000;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ entry.decrease_bytes(true, 10000, &info);
+ time = ceph::coarse_real_clock::now().time_since_epoch();
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimitEntry, read_limit_does_not_affect_writes)
+{
+ // read limit does not affect writes
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_read_ops = 1;
+ info.max_read_bytes = 100000000;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(false, &info, time);
+ entry.decrease_bytes(false, 10000, &info);
+ time = ceph::coarse_real_clock::now().time_since_epoch();
+ success = entry.should_rate_limit(false, &info, time);
+ EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, write_limit_does_not_affect_reads)
+{
+ // write limit does not affect reads
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ info.max_write_ops = 1;
+ info.max_write_bytes = 100000000;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ std::string key = "uuser123";
+ bool success = entry.should_rate_limit(true, &info, time);
+ entry.decrease_bytes(true, 10000, &info);
+ time = ceph::coarse_real_clock::now().time_since_epoch();
+ success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(false, success);
+}
+
+TEST(RGWRateLimitEntry, allow_unlimited_access)
+{
+ // 0 values in RGWRateLimitInfo should allow unlimited access (default value)
+ RateLimiterEntry entry;
+ RGWRateLimitInfo info;
+ info.enabled = true;
+ auto time = ceph::coarse_real_clock::now().time_since_epoch();
+ bool success = entry.should_rate_limit(true, &info, time);
+ EXPECT_EQ(false, success);
+}