From 71ffbaee08c0a1d31e3f6676ebdf2a2c22cc8950 Mon Sep 17 00:00:00 2001 From: Or Friedmann Date: Tue, 1 Jun 2021 15:45:09 +0300 Subject: [PATCH] rgw: Add rgw rate limiting per user and per bucket Add rgw rate limiting per user and per bucket Signed-off-by: Or Friedmann --- doc/radosgw/admin.rst | 170 +++++++++- src/rgw/rgw_admin.cc | 417 +++++++++++++++++++++++++ src/rgw/rgw_asio_frontend.cc | 5 +- src/rgw/rgw_bucket.h | 4 + src/rgw/rgw_common.cc | 18 ++ src/rgw/rgw_common.h | 46 +++ src/rgw/rgw_loadgen_process.cc | 5 +- src/rgw/rgw_main.cc | 7 +- src/rgw/rgw_process.cc | 86 ++++- src/rgw/rgw_process.h | 8 +- src/rgw/rgw_ratelimit.h | 292 +++++++++++++++++ src/rgw/rgw_rest.cc | 26 +- src/rgw/rgw_rest_swift.cc | 2 +- src/rgw/rgw_sal.h | 22 +- src/rgw/rgw_sal_dbstore.cc | 17 +- src/rgw/rgw_sal_dbstore.h | 2 + src/rgw/rgw_sal_rados.cc | 15 +- src/rgw/rgw_sal_rados.h | 1 + src/rgw/rgw_user.h | 14 + src/rgw/rgw_zone.cc | 6 + src/rgw/rgw_zone.h | 16 +- src/test/cli/radosgw-admin/help.t | 16 + src/test/rgw/CMakeLists.txt | 10 + src/test/rgw/bench_rgw_ratelimit.cc | 247 +++++++++++++++ src/test/rgw/bench_rgw_ratelimit_gc.cc | 52 +++ src/test/rgw/test_rgw_lua.cc | 4 +- src/test/rgw/test_rgw_ratelimit.cc | 376 ++++++++++++++++++++++ 27 files changed, 1848 insertions(+), 36 deletions(-) create mode 100644 src/rgw/rgw_ratelimit.h create mode 100644 src/test/rgw/bench_rgw_ratelimit.cc create mode 100644 src/test/rgw/bench_rgw_ratelimit_gc.cc create mode 100644 src/test/rgw/test_rgw_ratelimit.cc diff --git a/doc/radosgw/admin.rst b/doc/radosgw/admin.rst index 5f47471acf7fd..fa00a5d03bca9 100644 --- a/doc/radosgw/admin.rst +++ b/doc/radosgw/admin.rst @@ -311,7 +311,7 @@ To add administrative capabilities to a user, execute the following:: You can add read, write or all capabilities to users, buckets, metadata and usage (utilization). For example:: - --caps="[users|buckets|metadata|usage|zone]=[*|read|write|read, write]" + --caps="[users|buckets|metadata|usage|zone|amz-cache|info|bilog|mdlog|datalog|user-policy|oidc-provider|roles]=[*|read|write|read, write]" For example:: @@ -473,6 +473,174 @@ commands. :: be restarted for the changes to take effect. +Rate Limit Management +===================== + +The Ceph Object Gateway enables you to set rate limits on users and buckets. +Rate limit includes the maximum number of read ops and write ops per minute +and how many bytes per minute could be written or read per user or per bucket. +Requests that are using GET or HEAD method in the REST request are considered as "read requests", otherwise they are considered as "write requests". +Every Object Gateway tracks per user and bucket metrics separately, these metrics are not shared with other gateways. +That means that the desired limits configured should be divide by the number of active Object Gateways. +For example, if userA should be limited by 10 ops per minute and there are 2 Object Gateways in the cluster, +the limit over userA should be 5 (10 ops per minute / 2 RGWs). +if the requests are ``not`` balanced between RGWs, the rate limit may be underutilized. +For example, if the ops limit is 5 and there are 2 RGWs, ``but`` the Load Balancer send load only to one of those RGWs, +The effective limit would be 5 ops, because this limit is enforced per RGW. +If there is a limit reached for bucket not for user or vice versa the request would be cancelled as well. +The bandwidth counting happens after the request is being accepted, as a result, even if in the middle of the request the bucket/user has reached its bandwidth limit this request will proceed. +The RGW will keep a "debt" of used bytes more than the configured value and will prevent this user/bucket from sending more requests until there "debt" is being paid. +The "debt" maximum size is twice the max-read/write-bytes per minute. +If userA has 1 byte read limit per minute and this user tries to GET 1 GB object, the user will be able to do it. +After userA completes this 1GB operation, the RGW will block the user request for up to 2 minutes until userA will be able to send GET request again. + + +- **Bucket:** The ``--bucket`` option allows you to specify a rate limit for a + bucket. + +- **User:** The ``--uid`` option allows you to specify a rate limit for a + user. + +- **Maximum Read Ops:** The ``--max-read-ops`` setting allows you to specify + the maximum number of read ops per minute per RGW. A 0 value disables this setting (which means unlimited access). + +- **Maximum Read Bytes:** The ``--max-read-bytes`` setting allows you to specify + the maximum number of read bytes per minute per RGW. A 0 value disables this setting (which means unlimited access). + +- **Maximum Write Ops:** The ``--max-write-ops`` setting allows you to specify + the maximum number of write ops per minute per RGW. A 0 value disables this setting (which means unlimited access). + +- **Maximum Write Bytes:** The ``--max-write-bytes`` setting allows you to specify + the maximum number of write bytes per minute per RGW. A 0 value disables this setting (which means unlimited access). + +- **Rate Limit Scope:** The ``--ratelimit-scope `` option sets the scope for the rate limit. + The options are ``bucket`` , ``user`` and ``anonymous``. Bucket rate limit apply to buckets. + The user rate limit applies to a user. Anonymous applies to an unauthenticated user. + Anonymous scope is only available for global rate limit. + + +Set User Rate Limit +------------------- + +Before you enable a rate limit, you must first set the rate limit parameters. +For example:: + + radosgw-admin ratelimit set --ratelimit-scope=user --uid= <[--max-read-ops=] [--max-read-bytes=] + [--max-write-ops=] [--max-write-bytes=]> + +For example:: + + radosgw-admin ratelimit set --ratelimit-scope=user --uid=johndoe --max-read-ops=1024 --max-write-bytes=10240 + + +A 0 value for num ops and / or num bytes means that the +specific rate limit attribute check is disabled. + +Get User Rate Limit +------------------- + +Get the current configured rate limit parameters +For example:: + + radosgw-admin ratelimit set --ratelimit-scope=user --uid= + +For example:: + + radosgw-admin ratelimit get --ratelimit-scope=user --uid=johndoe + + +A 0 value for num ops and / or num bytes means that the +specific rate limit attribute check is disabled. + + +Enable/Disable User Rate Limit +------------------------------ + +Once you set a user rate limit, you may enable it. For example:: + + radosgw-admin ratelimit enable --ratelimit-scope=user --uid= + +You may disable an enabled user rate limit. For example:: + + radosgw-admin ratelimit disable --ratelimit-scope=user --uid=johndoe + + +Set Bucket Rate Limit +--------------------- + +Before you enable a rate limit, you must first set the rate limit parameters. +For example:: + + radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket= <[--max-read-ops=] [--max-read-bytes=] + [--max-write-ops=] [--max-write-bytes=]> + +For example:: + + radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket=mybucket --max-read-ops=1024 --max-write-bytes=10240 + + +A 0 value for num ops and / or num bytes means that the +specific rate limit attribute check is disabled. + +Get Bucket Rate Limit +--------------------- + +Get the current configured rate limit parameters +For example:: + + radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket= + +For example:: + + radosgw-admin ratelimit get --ratelimit-scope=bucket --bucket=mybucket + + +A 0 value for num ops and / or num bytes means that the +specific rate limit attribute check is disabled. + + +Enable/Disable Bucket Rate Limit +-------------------------------- + +Once you set a bucket rate limit, you may enable it. For example:: + + radosgw-admin ratelimit enable --ratelimit-scope=bucket --bucket= + +You may disable an enabled bucket rate limit. For example:: + + radosgw-admin ratelimit disable --ratelimit-scope=bucket --uid=mybucket + + +Reading / Writing Global Rate Limit Configuration +------------------------------------------------- + +You can read and write global rate limit settings in the period configuration. To +view the global rate limit settings:: + + radosgw-admin global rate limit get + +The global rate limit settings can be manipulated with the ``global ratelimit`` +counterparts of the ``ratelimit set``, ``ratelimit enable``, and ``ratelimit disable`` +commands. ``Per user and per bucket ratelimit configuration is overriding the global configuration``:: + + radosgw-admin global ratelimit set --ratelimit-scope bucket --max-read-ops=1024 + radosgw-admin global ratelimit enable --ratelimit-scope bucket + +The global rate limit can configure rate limit scope for all authenticated users:: + + radosgw-admin global ratelimit set --ratelimit-scope user --max-read-ops=1024 + radosgw-admin global ratelimit enable --ratelimit-scope user + +The global rate limit can configure rate limit scope for all unauthenticated users:: + + radosgw-admin global ratelimit set --ratelimit-scope=anonymous --max-read-ops=1024 + radosgw-admin global ratelimit enable --ratelimit-scope=anonymous + +.. note:: In a multisite configuration, where there is a realm and period + present, changes to the global quotas must be committed using ``period + update --commit``. If there is no period present, the rados gateway(s) must + be restarted for the changes to take effect. + Usage ===== diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 1c21d7267002d..f72e6f7204fdd 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -164,10 +164,18 @@ void usage() cout << " quota set set quota params\n"; cout << " quota enable enable quota\n"; cout << " quota disable disable quota\n"; + cout << " ratelimit get get ratelimit params\n"; + cout << " ratelimit set set ratelimit params\n"; + cout << " ratelimit enable enable ratelimit\n"; + cout << " ratelimit disable disable ratelimit\n"; cout << " global quota get view global quota params\n"; cout << " global quota set set global quota params\n"; cout << " global quota enable enable a global quota\n"; cout << " global quota disable disable a global quota\n"; + cout << " global ratelimit get view global ratelimit params\n"; + cout << " global ratelimit set set global ratelimit params\n"; + cout << " global ratelimit enable enable a ratelimit quota\n"; + cout << " global ratelimit disable disable a ratelimit quota\n"; cout << " realm create create a new realm\n"; cout << " realm rm remove a realm\n"; cout << " realm get show realm info\n"; @@ -416,6 +424,13 @@ void usage() cout << " --max-objects specify max objects (negative value to disable)\n"; cout << " --max-size specify max size (in B/K/M/G/T, negative value to disable)\n"; cout << " --quota-scope scope of quota (bucket, user)\n"; + cout << "\nRate limiting options:\n"; + cout << " --max-read-ops specify max requests per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited\n"; + cout << " --max-read-bytes specify max bytes per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited\n"; + cout << " --max-write-ops specify max requests per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited\n"; + cout << " --max-write-bytes specify max bytes per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited\n"; + cout << " --ratelimit-scope scope of rate limiting: bucket, user, anonymous\n"; + cout << " anonymous can be configured only with global rate limit\n"; cout << "\nOrphans search options:\n"; cout << " --num-shards num of shards to use for keeping the temporary scan info\n"; cout << " --orphan-stale-secs num of seconds to wait before declaring an object to be an orphan (default: 86400)\n"; @@ -660,6 +675,10 @@ enum class OPT { ORPHANS_FIND, ORPHANS_FINISH, ORPHANS_LIST_JOBS, + RATELIMIT_GET, + RATELIMIT_SET, + RATELIMIT_ENABLE, + RATELIMIT_DISABLE, ZONEGROUP_ADD, ZONEGROUP_CREATE, ZONEGROUP_DEFAULT, @@ -750,6 +769,10 @@ enum class OPT { GLOBAL_QUOTA_SET, GLOBAL_QUOTA_ENABLE, GLOBAL_QUOTA_DISABLE, + GLOBAL_RATELIMIT_GET, + GLOBAL_RATELIMIT_SET, + GLOBAL_RATELIMIT_ENABLE, + GLOBAL_RATELIMIT_DISABLE, SYNC_INFO, SYNC_STATUS, ROLE_CREATE, @@ -859,6 +882,10 @@ static SimpleCmd::Commands all_cmds = { { "quota set", OPT::QUOTA_SET }, { "quota enable", OPT::QUOTA_ENABLE }, { "quota disable", OPT::QUOTA_DISABLE }, + { "ratelimit get", OPT::RATELIMIT_GET }, + { "ratelimit set", OPT::RATELIMIT_SET }, + { "ratelimit enable", OPT::RATELIMIT_ENABLE }, + { "ratelimit disable", OPT::RATELIMIT_DISABLE }, { "gc list", OPT::GC_LIST }, { "gc process", OPT::GC_PROCESS }, { "lc list", OPT::LC_LIST }, @@ -965,6 +992,10 @@ static SimpleCmd::Commands all_cmds = { { "global quota set", OPT::GLOBAL_QUOTA_SET }, { "global quota enable", OPT::GLOBAL_QUOTA_ENABLE }, { "global quota disable", OPT::GLOBAL_QUOTA_DISABLE }, + { "global ratelimit get", OPT::GLOBAL_RATELIMIT_GET }, + { "global ratelimit set", OPT::GLOBAL_RATELIMIT_SET }, + { "global ratelimit enable", OPT::GLOBAL_RATELIMIT_ENABLE }, + { "global ratelimit disable", OPT::GLOBAL_RATELIMIT_DISABLE }, { "sync info", OPT::SYNC_INFO }, { "sync status", OPT::SYNC_STATUS }, { "role create", OPT::ROLE_CREATE }, @@ -1261,6 +1292,57 @@ static bool dump_string(const char *field_name, bufferlist& bl, Formatter *f) return true; } +bool set_ratelimit_info(RGWRateLimitInfo& ratelimit, OPT opt_cmd, int64_t max_read_ops, int64_t max_write_ops, + int64_t max_read_bytes, int64_t max_write_bytes, + bool have_max_read_ops, bool have_max_write_ops, + bool have_max_read_bytes, bool have_max_write_bytes) +{ + bool ratelimit_configured = true; + switch (opt_cmd) { + case OPT::RATELIMIT_ENABLE: + case OPT::GLOBAL_RATELIMIT_ENABLE: + ratelimit.enabled = true; + break; + + + case OPT::RATELIMIT_SET: + case OPT::GLOBAL_RATELIMIT_SET: + ratelimit_configured = false; + if (have_max_read_ops) { + if (max_read_ops >= 0) { + ratelimit.max_read_ops = max_read_ops; + ratelimit_configured = true; + } + } + if (have_max_write_ops) { + if (max_write_ops >= 0) { + ratelimit.max_write_ops = max_write_ops; + ratelimit_configured = true; + } + } + if (have_max_read_bytes) { + if (max_read_bytes >= 0) { + ratelimit.max_read_bytes = max_read_bytes; + ratelimit_configured = true; + } + } + if (have_max_write_bytes) { + if (max_write_bytes >= 0) { + ratelimit.max_write_bytes = max_write_bytes; + ratelimit_configured = true; + } + } + break; + case OPT::RATELIMIT_DISABLE: + case OPT::GLOBAL_RATELIMIT_DISABLE: + ratelimit.enabled = false; + break; + default: + break; + } + return ratelimit_configured; +} + void set_quota_info(RGWQuotaInfo& quota, OPT opt_cmd, int64_t max_size, int64_t max_objects, bool have_max_size, bool have_max_objects) { @@ -1319,6 +1401,141 @@ int set_bucket_quota(rgw::sal::Store* store, OPT opt_cmd, return 0; } +int set_bucket_ratelimit(rgw::sal::Store* store, OPT opt_cmd, + const string& tenant_name, const string& bucket_name, + int64_t max_read_ops, int64_t max_write_ops, + int64_t max_read_bytes, int64_t max_write_bytes, + bool have_max_read_ops, bool have_max_write_ops, + bool have_max_read_bytes, bool have_max_write_bytes) +{ + std::unique_ptr bucket; + int r = store->get_bucket(dpp(), nullptr, tenant_name, bucket_name, &bucket, null_yield); + if (r < 0) { + cerr << "could not get bucket info for bucket=" << bucket_name << ": " << cpp_strerror(-r) << std::endl; + return -r; + } + RGWRateLimitInfo ratelimit_info; + auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT); + if(iter != bucket->get_attrs().end()) { + try { + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + decode(ratelimit_info, biter); + } catch (buffer::error& err) { + ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl; + return -EIO; + } + } + bool ratelimit_configured = set_ratelimit_info(ratelimit_info, opt_cmd, max_read_ops, max_write_ops, + max_read_bytes, max_write_bytes, + have_max_read_ops, have_max_write_ops, + have_max_read_bytes, have_max_write_bytes); + if (!ratelimit_configured) { + ldpp_dout(dpp(), 0) << "ERROR: no rate limit values have been specified" << dendl; + return -EINVAL; + } + bufferlist bl; + ratelimit_info.encode(bl); + rgw::sal::Attrs attr; + attr[RGW_ATTR_RATELIMIT] = bl; + r = bucket->merge_and_store_attrs(dpp(), attr, null_yield); + if (r < 0) { + cerr << "ERROR: failed writing bucket instance info: " << cpp_strerror(-r) << std::endl; + return -r; + } + return 0; +} + +int set_user_ratelimit(OPT opt_cmd, std::unique_ptr& user, + int64_t max_read_ops, int64_t max_write_ops, + int64_t max_read_bytes, int64_t max_write_bytes, + bool have_max_read_ops, bool have_max_write_ops, + bool have_max_read_bytes, bool have_max_write_bytes) +{ + RGWRateLimitInfo ratelimit_info; + user->load_user(dpp(), null_yield); + auto iter = user->get_attrs().find(RGW_ATTR_RATELIMIT); + if(iter != user->get_attrs().end()) { + try { + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + decode(ratelimit_info, biter); + } catch (buffer::error& err) { + ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl; + return -EIO; + } + } + bool ratelimit_configured = set_ratelimit_info(ratelimit_info, opt_cmd, max_read_ops, max_write_ops, + max_read_bytes, max_write_bytes, + have_max_read_ops, have_max_write_ops, + have_max_read_bytes, have_max_write_bytes); + if (!ratelimit_configured) { + ldpp_dout(dpp(), 0) << "ERROR: no rate limit values have been specified" << dendl; + return -EINVAL; + } + bufferlist bl; + ratelimit_info.encode(bl); + rgw::sal::Attrs attr; + attr[RGW_ATTR_RATELIMIT] = bl; + int r = user->merge_and_store_attrs(dpp(), attr, null_yield); + if (r < 0) { + cerr << "ERROR: failed writing user instance info: " << cpp_strerror(-r) << std::endl; + return -r; + } + return 0; +} + +int show_user_ratelimit(std::unique_ptr& user, Formatter *formatter) +{ + RGWRateLimitInfo ratelimit_info; + user->load_user(dpp(), null_yield); + auto iter = user->get_attrs().find(RGW_ATTR_RATELIMIT); + if(iter != user->get_attrs().end()) { + try { + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + decode(ratelimit_info, biter); + } catch (buffer::error& err) { + ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl; + return -EIO; + } + } + formatter->open_object_section("user_ratelimit"); + encode_json("user_ratelimit", ratelimit_info, formatter); + formatter->close_section(); + formatter->flush(cout); + cout << std::endl; + return 0; +} + +int show_bucket_ratelimit(rgw::sal::Store* store, const string& tenant_name, + const string& bucket_name, Formatter *formatter) +{ + std::unique_ptr bucket; + int r = store->get_bucket(dpp(), nullptr, tenant_name, bucket_name, &bucket, null_yield); + if (r < 0) { + cerr << "could not get bucket info for bucket=" << bucket_name << ": " << cpp_strerror(-r) << std::endl; + return -r; + } + RGWRateLimitInfo ratelimit_info; + auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT); + if (iter != bucket->get_attrs().end()) { + try { + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + decode(ratelimit_info, biter); + } catch (buffer::error& err) { + ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl; + return -EIO; + } + } + formatter->open_object_section("bucket_ratelimit"); + encode_json("bucket_ratelimit", ratelimit_info, formatter); + formatter->close_section(); + formatter->flush(cout); + cout << std::endl; + return 0; +} int set_user_bucket_quota(OPT opt_cmd, RGWUser& user, RGWUserAdminOpState& op_state, int64_t max_size, int64_t max_objects, bool have_max_size, bool have_max_objects) { @@ -3255,6 +3472,7 @@ int main(int argc, const char **argv) string op_id; string op_mask_str; string quota_scope; + string ratelimit_scope; string object_version; string placement_id; std::optional opt_storage_class; @@ -3264,8 +3482,16 @@ int main(int argc, const char **argv) int64_t max_objects = -1; int64_t max_size = -1; + int64_t max_read_ops = 0; + int64_t max_write_ops = 0; + int64_t max_read_bytes = 0; + int64_t max_write_bytes = 0; bool have_max_objects = false; bool have_max_size = false; + bool have_max_write_ops = false; + bool have_max_read_ops = false; + bool have_max_write_bytes = false; + bool have_max_read_bytes = false; int include_all = false; int allow_unordered = false; @@ -3487,6 +3713,34 @@ int main(int argc, const char **argv) return EINVAL; } have_max_objects = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-read-ops", (char*)NULL)) { + max_read_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max read requests: " << err << std::endl; + return EINVAL; + } + have_max_read_ops = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-write-ops", (char*)NULL)) { + max_write_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max write requests: " << err << std::endl; + return EINVAL; + } + have_max_write_ops = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-read-bytes", (char*)NULL)) { + max_read_bytes = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max read bytes: " << err << std::endl; + return EINVAL; + } + have_max_read_bytes = true; + } else if (ceph_argparse_witharg(args, i, &val, "--max-write-bytes", (char*)NULL)) { + max_write_bytes = (int64_t)strict_strtoll(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse max write bytes: " << err << std::endl; + return EINVAL; + } + have_max_write_bytes = true; } else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) { date = val; if (end_date.empty()) @@ -3604,6 +3858,8 @@ int main(int argc, const char **argv) end_marker = val; } else if (ceph_argparse_witharg(args, i, &val, "--quota-scope", (char*)NULL)) { quota_scope = val; + } else if (ceph_argparse_witharg(args, i, &val, "--ratelimit-scope", (char*)NULL)) { + ratelimit_scope = val; } else if (ceph_argparse_witharg(args, i, &val, "--index-type", (char*)NULL)) { string index_type_str = val; bi_index_type = get_bi_index_type(index_type_str); @@ -3902,6 +4158,8 @@ int main(int argc, const char **argv) OPT::PERIOD_GET_CURRENT, OPT::PERIOD_LIST, OPT::GLOBAL_QUOTA_GET, OPT::GLOBAL_QUOTA_SET, OPT::GLOBAL_QUOTA_ENABLE, OPT::GLOBAL_QUOTA_DISABLE, + OPT::GLOBAL_RATELIMIT_GET, OPT::GLOBAL_RATELIMIT_SET, + OPT::GLOBAL_RATELIMIT_ENABLE, OPT::GLOBAL_RATELIMIT_DISABLE, OPT::REALM_DELETE, OPT::REALM_GET, OPT::REALM_LIST, OPT::REALM_LIST_PERIODS, OPT::REALM_GET_DEFAULT, @@ -3958,6 +4216,7 @@ int main(int argc, const char **argv) OPT::PERIOD_GET_CURRENT, OPT::PERIOD_LIST, OPT::GLOBAL_QUOTA_GET, + OPT::GLOBAL_RATELIMIT_GET, OPT::SYNC_INFO, OPT::SYNC_STATUS, OPT::ROLE_GET, @@ -4252,6 +4511,100 @@ int main(int argc, const char **argv) formatter->flush(cout); } break; + case OPT::GLOBAL_RATELIMIT_GET: + case OPT::GLOBAL_RATELIMIT_SET: + case OPT::GLOBAL_RATELIMIT_ENABLE: + case OPT::GLOBAL_RATELIMIT_DISABLE: + { + if (realm_id.empty()) { + RGWRealm realm(g_ceph_context, static_cast(store)->svc()->sysobj); + if (!realm_name.empty()) { + // look up realm_id for the given realm_name + int ret = realm.read_id(dpp(), realm_name, realm_id, null_yield); + if (ret < 0) { + cerr << "ERROR: failed to read realm for " << realm_name + << ": " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } else { + // use default realm_id when none is given + int ret = realm.read_default_id(dpp(), realm_id, null_yield); + if (ret < 0 && ret != -ENOENT) { // on ENOENT, use empty realm_id + cerr << "ERROR: failed to read default realm: " + << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + } + + RGWPeriodConfig period_config; + int ret = period_config.read(dpp(), static_cast(store)->svc()->sysobj, realm_id, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: failed to read period config: " + << cpp_strerror(-ret) << std::endl; + return -ret; + } + bool ratelimit_configured = true; + formatter->open_object_section("period_config"); + if (ratelimit_scope == "bucket") { + ratelimit_configured = set_ratelimit_info(period_config.bucket_ratelimit, opt_cmd, + max_read_ops, max_write_ops, + max_read_bytes, max_write_bytes, + have_max_read_ops, have_max_write_ops, + have_max_read_bytes, have_max_write_bytes); + encode_json("bucket_ratelimit", period_config.bucket_ratelimit, formatter.get()); + } else if (ratelimit_scope == "user") { + ratelimit_configured = set_ratelimit_info(period_config.user_ratelimit, opt_cmd, + max_read_ops, max_write_ops, + max_read_bytes, max_write_bytes, + have_max_read_ops, have_max_write_ops, + have_max_read_bytes, have_max_write_bytes); + encode_json("user_ratelimit", period_config.user_ratelimit, formatter.get()); + } else if (ratelimit_scope == "anonymous") { + ratelimit_configured = set_ratelimit_info(period_config.anon_ratelimit, opt_cmd, + max_read_ops, max_write_ops, + max_read_bytes, max_write_bytes, + have_max_read_ops, have_max_write_ops, + have_max_read_bytes, have_max_write_bytes); + encode_json("anonymous_ratelimit", period_config.anon_ratelimit, formatter.get()); + } else if (ratelimit_scope.empty() && opt_cmd == OPT::GLOBAL_RATELIMIT_GET) { + // if no scope is given for GET, print both + encode_json("bucket_ratelimit", period_config.bucket_ratelimit, formatter.get()); + encode_json("user_ratelimit", period_config.user_ratelimit, formatter.get()); + encode_json("anonymous_ratelimit", period_config.anon_ratelimit, formatter.get()); + } else { + cerr << "ERROR: invalid rate limit scope specification. Please specify " + "either --ratelimit-scope=bucket, or --ratelimit-scope=user or --ratelimit-scope=anonymous" << std::endl; + return EINVAL; + } + if (!ratelimit_configured) { + cerr << "ERROR: no rate limit values have been specified" << std::endl; + return EINVAL; + } + + formatter->close_section(); + + if (opt_cmd != OPT::GLOBAL_RATELIMIT_GET) { + // write the modified period config + ret = period_config.write(dpp(), static_cast(store)->svc()->sysobj, realm_id, null_yield); + if (ret < 0) { + cerr << "ERROR: failed to write period config: " + << cpp_strerror(-ret) << std::endl; + return -ret; + } + if (!realm_id.empty()) { + cout << "Global ratelimit changes saved. Use 'period update' to apply " + "them to the staging period, and 'period commit' to commit the " + "new period." << std::endl; + } else { + cout << "Global ratelimit changes saved. They will take effect as " + "the gateways are restarted." << std::endl; + } + } + + formatter->flush(cout); + } + break; case OPT::GLOBAL_QUOTA_GET: case OPT::GLOBAL_QUOTA_SET: case OPT::GLOBAL_QUOTA_ENABLE: @@ -6413,6 +6766,19 @@ int main(int argc, const char **argv) cerr << "failure: " << cpp_strerror(-r) << ": " << err << std::endl; return -r; } + RGWRateLimitInfo ratelimit_info; + std::unique_ptr bucket_sal; + auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT); + if(iter != bucket->get_attrs().end()) { + try { + std::cerr << "here" << std::endl; + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + decode(ratelimit_info, biter); + } catch (buffer::error& err) { + cerr << "ERROR: failed to decode rate limit" << std::endl; + } + } } if (opt_cmd == OPT::BUCKET_LINK) { @@ -9134,6 +9500,57 @@ next: } } + bool ratelimit_op_set = (opt_cmd == OPT::RATELIMIT_SET || opt_cmd == OPT::RATELIMIT_ENABLE || opt_cmd == OPT::RATELIMIT_DISABLE); + bool ratelimit_op_get = opt_cmd == OPT::RATELIMIT_GET; + if (ratelimit_op_set) { + if (bucket_name.empty() && rgw::sal::User::empty(user)) { + cerr << "ERROR: bucket name or uid is required for ratelimit operation" << std::endl; + return EINVAL; + } + + if (!bucket_name.empty()) { + if (!ratelimit_scope.empty() && ratelimit_scope != "bucket") { + cerr << "ERROR: invalid ratelimit scope specification. (bucket scope is not bucket but bucket has been specified)" << std::endl; + return EINVAL; + } + return set_bucket_ratelimit(store, opt_cmd, tenant, bucket_name, + max_read_ops, max_write_ops, + max_read_bytes, max_write_bytes, + have_max_read_ops, have_max_write_ops, + have_max_read_bytes, have_max_write_bytes); + } else if (!rgw::sal::User::empty(user)) { + } if (ratelimit_scope == "user") { + return set_user_ratelimit(opt_cmd, user, max_read_ops, max_write_ops, + max_read_bytes, max_write_bytes, + have_max_read_ops, have_max_write_ops, + have_max_read_bytes, have_max_write_bytes); + } else { + cerr << "ERROR: invalid ratelimit scope specification. Please specify either --ratelimit-scope=bucket, or --ratelimit-scope=user" << std::endl; + return EINVAL; + } + } + + if (ratelimit_op_get) { + if (bucket_name.empty() && rgw::sal::User::empty(user)) { + cerr << "ERROR: bucket name or uid is required for ratelimit operation" << std::endl; + return EINVAL; + } + + if (!bucket_name.empty()) { + if (!ratelimit_scope.empty() && ratelimit_scope != "bucket") { + cerr << "ERROR: invalid ratelimit scope specification. (bucket scope is not bucket but bucket has been specified)" << std::endl; + return EINVAL; + } + return show_bucket_ratelimit(store, tenant, bucket_name, formatter.get()); + } else if (!rgw::sal::User::empty(user)) { + } if (ratelimit_scope == "user") { + return show_user_ratelimit(user, formatter.get()); + } else { + cerr << "ERROR: invalid ratelimit scope specification. Please specify either --ratelimit-scope=bucket, or --ratelimit-scope=user" << std::endl; + return EINVAL; + } + } + if (opt_cmd == OPT::MFA_CREATE) { rados::cls::otp::otp_info_t config; diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index fca29fddbd271..d6001a67f16ca 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -265,10 +265,11 @@ void handle_connection(boost::asio::io_context& context, string user = "-"; const auto started = ceph::coarse_real_clock::now(); ceph::coarse_real_clock::duration latency{}; - process_request(env.store, env.rest, &req, env.uri_prefix, *env.auth_registry, &client, env.olog, y, - scheduler, &user, &latency, &http_ret); + scheduler, &user, &latency, + env.ratelimiting->get_active(), + &http_ret); if (cct->_conf->subsys.should_gather(dout_subsys, 1)) { // access log line elements begin per Apache Combined Log Format with additions following diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 1f0c1ef94f853..a2797b47d0c22 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -247,6 +247,7 @@ struct RGWBucketAdminOpState { std::unique_ptr bucket; RGWQuotaInfo quota; + RGWRateLimitInfo ratelimit_info; void set_fetch_stats(bool value) { stat_buckets = value; } void set_check_objects(bool value) { check_objects = value; } @@ -274,6 +275,9 @@ struct RGWBucketAdminOpState { void set_quota(RGWQuotaInfo& value) { quota = value; } + void set_bucket_ratelimit(RGWRateLimitInfo& value) { + ratelimit_info = value; + } void set_sync_bucket(bool value) { sync_bucket = value; } diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index 6fb22dbb22581..5262460905c82 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -2628,6 +2628,24 @@ void op_type_to_str(uint32_t mask, char *buf, int len) return mask_to_str(op_type_flags, mask, buf, len); } +void RGWRateLimitInfo::decode_json(JSONObj *obj) +{ + JSONDecoder::decode_json("max_read_ops", max_read_ops, obj); + JSONDecoder::decode_json("max_write_ops", max_write_ops, obj); + JSONDecoder::decode_json("max_read_bytes", max_read_ops, obj); + JSONDecoder::decode_json("max_write_bytes", max_write_ops, obj); + JSONDecoder::decode_json("enabled", enabled, obj); +} + +void RGWRateLimitInfo::dump(Formatter *f) const +{ + f->dump_int("max_read_ops", max_read_ops); + f->dump_int("max_write_ops", max_write_ops); + f->dump_int("max_read_bytes", max_read_bytes); + f->dump_int("max_write_bytes", max_write_bytes); + f->dump_bool("enabled", enabled); +} + void RGWUserInfo::dump(Formatter *f) const { diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index f7170a2af285d..1f7fa4f26f8cc 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -18,6 +18,8 @@ #include #include +#include +#include #include "common/ceph_crypto.h" #include "common/random_string.h" @@ -66,6 +68,7 @@ using ceph::crypto::MD5; #define RGW_SYS_PARAM_PREFIX "rgwx-" #define RGW_ATTR_ACL RGW_ATTR_PREFIX "acl" +#define RGW_ATTR_RATELIMIT RGW_ATTR_PREFIX "ratelimit" #define RGW_ATTR_LC RGW_ATTR_PREFIX "lc" #define RGW_ATTR_CORS RGW_ATTR_PREFIX "cors" #define RGW_ATTR_ETAG RGW_ATTR_PREFIX "etag" @@ -304,6 +307,7 @@ struct rgw_err { std::string message; }; + /* Helper class used for RGWHTTPArgs parsing */ class NameVal { @@ -691,6 +695,43 @@ void decode_json_obj(rgw_placement_rule& v, JSONObj *obj); inline std::ostream& operator<<(std::ostream& out, const rgw_placement_rule& rule) { return out << rule.to_str(); } + +class RateLimiter; +struct RGWRateLimitInfo { + int64_t max_write_ops; + int64_t max_read_ops; + int64_t max_write_bytes; + int64_t max_read_bytes; + bool enabled = false; + RGWRateLimitInfo() + : max_write_ops(0), max_read_ops(0), max_write_bytes(0), max_read_bytes(0) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(max_write_ops, bl); + encode(max_read_ops, bl); + encode(max_write_bytes, bl); + encode(max_read_bytes, bl); + encode(enabled, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(max_write_ops,bl); + decode(max_read_ops, bl); + decode(max_write_bytes,bl); + decode(max_read_bytes, bl); + decode(enabled, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + + void decode_json(JSONObj *obj); + +}; +WRITE_CLASS_ENCODER(RGWRateLimitInfo) + struct RGWUserInfo { rgw_user user_id; @@ -1510,6 +1551,11 @@ struct req_state : DoutPrefixProvider { rgw::io::BasicClient *cio{nullptr}; http_op op{OP_UNKNOWN}; RGWOpType op_type{}; + std::shared_ptr ratelimit_data; + RGWRateLimitInfo user_ratelimit; + RGWRateLimitInfo bucket_ratelimit; + std::string ratelimit_bucket_marker; + std::string ratelimit_user_name; bool content_started{false}; int format{0}; ceph::Formatter *formatter{nullptr}; diff --git a/src/rgw/rgw_loadgen_process.cc b/src/rgw/rgw_loadgen_process.cc index 547e74ef4b984..27078f04c4e89 100644 --- a/src/rgw/rgw_loadgen_process.cc +++ b/src/rgw/rgw_loadgen_process.cc @@ -133,10 +133,11 @@ void RGWLoadGenProcess::handle_request(const DoutPrefixProvider *dpp, RGWRequest RGWLoadGenIO real_client_io(&env); RGWRestfulIO client_io(cct, &real_client_io); - + ActiveRateLimiter ratelimit(cct); int ret = process_request(store, rest, req, uri_prefix, *auth_registry, &client_io, olog, - null_yield, nullptr, nullptr, nullptr); + null_yield, nullptr, nullptr, nullptr, + ratelimit.get_active()); if (ret < 0) { /* we don't really care about return code */ dout(20) << "process_request() returned " << ret << dendl; diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 5442d3d117543..a932433e8c41c 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -536,6 +536,9 @@ int radosgw_Main(int argc, const char **argv) rgw::dmclock::SchedulerCtx sched_ctx{cct.get()}; OpsLogManifold *olog = new OpsLogManifold(); + ActiveRateLimiter ratelimiting{cct.get()}; + ratelimiting.start(); + if (!g_conf()->rgw_ops_log_socket_path.empty()) { OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog); olog_socket->init(g_conf()->rgw_ops_log_socket_path); @@ -603,7 +606,7 @@ int radosgw_Main(int argc, const char **argv) std::string uri_prefix; config->get_val("prefix", "", &uri_prefix); - RGWProcessEnv env = { store, &rest, olog, port, uri_prefix, auth_registry }; + RGWProcessEnv env = { store, &rest, olog, port, uri_prefix, auth_registry, &ratelimiting }; fe = new RGWLoadGenFrontend(env, config); } @@ -612,7 +615,7 @@ int radosgw_Main(int argc, const char **argv) config->get_val("port", 80, &port); std::string uri_prefix; config->get_val("prefix", "", &uri_prefix); - RGWProcessEnv env{ store, &rest, olog, port, uri_prefix, auth_registry }; + RGWProcessEnv env{ store, &rest, olog, port, uri_prefix, auth_registry, &ratelimiting }; fe = new RGWAsioFrontend(env, config, sched_ctx); } diff --git a/src/rgw/rgw_process.cc b/src/rgw/rgw_process.cc index 80338086c7509..f8d43a3258d22 100644 --- a/src/rgw/rgw_process.cc +++ b/src/rgw/rgw_process.cc @@ -6,6 +6,7 @@ #include "common/WorkQueue.h" #include "include/scope_guard.h" +#include #include "rgw_dmclock_scheduler.h" #include "rgw_rest.h" #include "rgw_frontend.h" @@ -18,6 +19,8 @@ #include "rgw_lua.h" #include "rgw_lua_request.h" #include "rgw_tracer.h" +#include "rgw_ratelimit.h" + #include "services/svc_zone_utils.h" #define dout_subsys ceph_subsys_rgw @@ -87,12 +90,83 @@ void RGWProcess::RGWWQ::_process(RGWRequest *req, ThreadPool::TPHandle &) { process->req_throttle.put(1); perfcounter->inc(l_rgw_qactive, -1); } +bool rate_limit(rgw::sal::Store* store, req_state* s) { + // we dont want to limit health check or system or admin requests + const auto& is_admin_or_system = s->user->get_info(); + if ((s->op_type == RGW_OP_GET_HEALTH_CHECK) || is_admin_or_system.admin || is_admin_or_system.system) + return false; + std::string userfind; + RGWRateLimitInfo global_user; + RGWRateLimitInfo global_bucket; + RGWRateLimitInfo global_anon; + RGWRateLimitInfo* bucket_ratelimit; + RGWRateLimitInfo* user_ratelimit; + store->get_ratelimit(global_bucket, global_user, global_anon); + bucket_ratelimit = &global_bucket; + user_ratelimit = &global_user; + s->user->get_id().to_str(userfind); + userfind = "u" + userfind; + s->ratelimit_user_name = userfind; + std::string bucketfind = !rgw::sal::Bucket::empty(s->bucket.get()) ? "b" + s->bucket->get_marker() : ""; + s->ratelimit_bucket_marker = bucketfind; + const char *method = s->info.method; + + auto iter = s->user->get_attrs().find(RGW_ATTR_RATELIMIT); + if(iter != s->user->get_attrs().end()) { + try { + RGWRateLimitInfo user_ratelimit_temp; + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + decode(user_ratelimit_temp, biter); + // override global rate limiting only if local rate limiting is enabled + if (user_ratelimit_temp.enabled) + *user_ratelimit = user_ratelimit_temp; + } catch (buffer::error& err) { + ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl; + return -EIO; + } + } + if (s->user->get_id().id == RGW_USER_ANON_ID && global_anon.enabled) { + *user_ratelimit = global_anon; + } + bool limit_bucket = false; + bool limit_user = s->ratelimit_data->should_rate_limit(method, s->ratelimit_user_name, s->time, user_ratelimit); + + if(!rgw::sal::Bucket::empty(s->bucket.get())) + { + iter = s->bucket->get_attrs().find(RGW_ATTR_RATELIMIT); + if(iter != s->bucket->get_attrs().end()) { + try { + RGWRateLimitInfo bucket_ratelimit_temp; + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + decode(bucket_ratelimit_temp, biter); + // override global rate limiting only if local rate limiting is enabled + if (bucket_ratelimit_temp.enabled) + *bucket_ratelimit = bucket_ratelimit_temp; + } catch (buffer::error& err) { + ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl; + return -EIO; + } + } + if (!limit_user) { + limit_bucket = s->ratelimit_data->should_rate_limit(method, s->ratelimit_bucket_marker, s->time, bucket_ratelimit); + } + } + if(limit_bucket && !limit_user) { + s->ratelimit_data->giveback_tokens(method, s->ratelimit_user_name); + } + s->user_ratelimit = *user_ratelimit; + s->bucket_ratelimit = *bucket_ratelimit; + return (limit_user || limit_bucket); +} int rgw_process_authenticated(RGWHandler_REST * const handler, RGWOp *& op, RGWRequest * const req, req_state * const s, - optional_yield y, + optional_yield y, + rgw::sal::Store* store, const bool skip_retarget) { ldpp_dout(op, 2) << "init permissions" << dendl; @@ -169,6 +243,10 @@ int rgw_process_authenticated(RGWHandler_REST * const handler, ldpp_dout(op, 2) << "pre-executing" << dendl; op->pre_exec(); + ldpp_dout(op, 2) << "check rate limiting" << dendl; + if (rate_limit(store, s)) { + return -ERR_RATE_LIMITED; + } ldpp_dout(op, 2) << "executing" << dendl; { auto span = tracing::rgw::tracer.add_span("execute", s->trace); @@ -194,6 +272,7 @@ int process_request(rgw::sal::Store* const store, rgw::dmclock::Scheduler *scheduler, string* user, ceph::coarse_real_clock::duration* latency, + std::shared_ptr ratelimit, int* http_ret) { int ret = client_io->init(g_ceph_context); @@ -207,6 +286,7 @@ int process_request(rgw::sal::Store* const store, struct req_state rstate(g_ceph_context, &rgw_env, req->id); struct req_state *s = &rstate; + s->ratelimit_data = ratelimit; std::unique_ptr u = store->get_user(rgw_user()); s->set_user(u); @@ -311,7 +391,7 @@ int process_request(rgw::sal::Store* const store, s->trace->SetAttribute(tracing::rgw::OP, op->name()); s->trace->SetAttribute(tracing::rgw::TYPE, tracing::rgw::REQUEST); - ret = rgw_process_authenticated(handler, op, req, s, yield); + ret = rgw_process_authenticated(handler, op, req, s, yield, store); if (ret < 0) { abort_early(s, op, ret, handler, yield); goto done; @@ -353,7 +433,6 @@ done: dout(0) << "ERROR: client_io->complete_request() returned " << e.what() << dendl; } - if (should_log) { rgw_log_op(rest, s, (op ? op->name() : "unknown"), olog); } @@ -382,7 +461,6 @@ done: if (latency) { *latency = lat; } - dout(1) << "====== req done req=" << hex << req << dec << " op status=" << op_ret << " http_status=" << s->err.http_ret diff --git a/src/rgw/rgw_process.h b/src/rgw/rgw_process.h index 1aac4a6cdf1bc..edf76a443e30f 100644 --- a/src/rgw/rgw_process.h +++ b/src/rgw/rgw_process.h @@ -10,7 +10,7 @@ #include "rgw_user.h" #include "rgw_op.h" #include "rgw_rest.h" - +#include "rgw_ratelimit.h" #include "include/ceph_assert.h" #include "common/WorkQueue.h" @@ -38,6 +38,8 @@ struct RGWProcessEnv { int port; std::string uri_prefix; std::shared_ptr auth_registry; + //maybe there is a better place to store the rate limit data structure + ActiveRateLimiter* ratelimiting; }; class RGWFrontendConfig; @@ -174,13 +176,15 @@ extern int process_request(rgw::sal::Store* store, rgw::dmclock::Scheduler *scheduler, std::string* user, ceph::coarse_real_clock::duration* latency, + std::shared_ptr ratelimit, int* http_ret = nullptr); extern int rgw_process_authenticated(RGWHandler_REST* handler, RGWOp*& op, RGWRequest* req, req_state* s, - optional_yield y, + optional_yield y, + rgw::sal::Store* store, bool skip_retarget = false); #if defined(def_dout_subsys) diff --git a/src/rgw/rgw_ratelimit.h b/src/rgw/rgw_ratelimit.h new file mode 100644 index 0000000000000..2639d4d42749f --- /dev/null +++ b/src/rgw/rgw_ratelimit.h @@ -0,0 +1,292 @@ +#pragma once +#include +#include +#include +#include "rgw_common.h" + + +class RateLimiterEntry { + /* + fixed_point_rgw_ratelimit is important to preserve the precision of the token calculation + for example: a user have a limit of single op per minute, the user will consume its single token and then will send another request, 1s after it. + in that case, without this method, the user will get 0 tokens although it should get 0.016 tokens. + using this method it will add 16 tokens to the user, and the user will have 16 tokens, each time rgw will do comparison rgw will divide by fixed_point_rgw_ratelimit, so the user will be blocked anyway until it has enough tokens. + */ + static constexpr int64_t fixed_point_rgw_ratelimit = 1000; + // counters are tracked in multiples of fixed_point_rgw_ratelimit + struct counters { + int64_t ops = 0; + int64_t bytes = 0; + }; + counters read; + counters write; + ceph::timespan ts; + bool first_run = true; + std::mutex ts_lock; + // Those functions are returning the integer value of the tokens + int64_t read_ops () const + { + return read.ops / fixed_point_rgw_ratelimit; + } + int64_t write_ops() const + { + return write.ops / fixed_point_rgw_ratelimit; + } + int64_t read_bytes() const + { + return read.bytes / fixed_point_rgw_ratelimit; + } + int64_t write_bytes() const + { + return write.bytes / fixed_point_rgw_ratelimit; + } + bool should_rate_limit_read(int64_t ops_limit, int64_t bw_limit) { + //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited) + if(((read_ops() - 1 < 0) && (ops_limit > 0)) || + (read_bytes() < 0 && bw_limit > 0)) + { + return true; + } + // we don't want to reduce ops' tokens if we've rejected it. + read.ops -= fixed_point_rgw_ratelimit; + return false; + } + bool should_rate_limit_write(int64_t ops_limit, int64_t bw_limit) + { + //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited) + if(((write_ops() - 1 < 0) && (ops_limit > 0)) || + (write_bytes() < 0 && bw_limit > 0)) + { + return true; + } + + // we don't want to reduce ops' tokens if we've rejected it. + write.ops -= fixed_point_rgw_ratelimit; + return false; + } + /* The purpose of this function is to minimum time before overriding the stored timestamp + This function is necessary to force the increase tokens add at least 1 token when it updates the last stored timestamp. + That way the user/bucket will not lose tokens because of rounding + */ + bool minimum_time_reached(ceph::timespan curr_timestamp) const + { + using namespace std::chrono; + constexpr auto min_duration = duration_cast(seconds(60)) / fixed_point_rgw_ratelimit; + const auto delta = curr_timestamp - ts; + if (delta < min_duration) + { + return false; + } + return true; + } + + void increase_tokens(ceph::timespan curr_timestamp, + const RGWRateLimitInfo* info) + { + constexpr int fixed_point = fixed_point_rgw_ratelimit; + if (first_run) + { + write.ops = info->max_write_ops * fixed_point; + write.bytes = info->max_write_bytes * fixed_point; + read.ops = info->max_read_ops * fixed_point; + read.bytes = info->max_read_bytes * fixed_point; + ts = curr_timestamp; + first_run = false; + return; + } + else if(curr_timestamp > ts && minimum_time_reached(curr_timestamp)) + { + const int64_t time_in_ms = std::chrono::duration_cast(curr_timestamp - ts).count() / 60.0 / std::milli::den * fixed_point; // / 60 to make it work with 1 min token bucket + ts = curr_timestamp; + const int64_t write_ops = info->max_write_ops * time_in_ms; + const int64_t write_bw = info->max_write_bytes * time_in_ms; + const int64_t read_ops = info->max_read_ops * time_in_ms; + const int64_t read_bw = info->max_read_bytes * time_in_ms; + read.ops = std::min(info->max_read_ops * fixed_point, read_ops + read.ops); + read.bytes = std::min(info->max_read_bytes * fixed_point, read_bw + read.bytes); + write.ops = std::min(info->max_write_ops * fixed_point, write_ops + write.ops); + write.bytes = std::min(info->max_write_bytes * fixed_point, write_bw + write.bytes); + } + } + + public: + bool should_rate_limit(bool is_read, const RGWRateLimitInfo* ratelimit_info, ceph::timespan curr_timestamp) + { + std::unique_lock lock(ts_lock); + increase_tokens(curr_timestamp, ratelimit_info); + if (is_read) + { + return should_rate_limit_read(ratelimit_info->max_read_ops, ratelimit_info->max_read_bytes); + } + return should_rate_limit_write(ratelimit_info->max_write_ops, ratelimit_info->max_write_bytes); + } + void decrease_bytes(bool is_read, int64_t amount, const RGWRateLimitInfo* info) { + std::unique_lock lock(ts_lock); + // we don't want the tenant to be with higher debt than 120 seconds(2 min) of its limit + if (is_read) + { + read.bytes = std::max(read.bytes - amount * fixed_point_rgw_ratelimit,info->max_read_bytes * fixed_point_rgw_ratelimit * -2); + } else { + write.bytes = std::max(write.bytes - amount * fixed_point_rgw_ratelimit,info->max_write_bytes * fixed_point_rgw_ratelimit * -2); + } + } + void giveback_tokens(bool is_read) + { + std::unique_lock lock(ts_lock); + if (is_read) + { + read.ops += fixed_point_rgw_ratelimit; + } else { + write.ops += fixed_point_rgw_ratelimit; + } + } +}; + +class RateLimiter { + + static constexpr size_t map_size = 2000000; // will create it with the closest upper prime number + std::shared_mutex insert_lock; + std::atomic_bool& replacing; + std::condition_variable& cv; + typedef std::unordered_map hash_map; + hash_map ratelimit_entries{map_size}; + static bool is_read_op(const std::string_view method) { + if (method == "GET" || method == "HEAD") + { + return true; + } + return false; + } + + // find or create an entry, and return its iterator + auto& find_or_create(const std::string& key) { + std::shared_lock rlock(insert_lock); + if (ratelimit_entries.size() > 0.9 * map_size && replacing == false) + { + replacing = true; + cv.notify_all(); + } + auto ret = ratelimit_entries.find(key); + rlock.unlock(); + if (ret == ratelimit_entries.end()) + { + std::unique_lock wlock(insert_lock); + ret = ratelimit_entries.emplace(std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple()).first; + } + return ret->second; + } + + + + public: + RateLimiter(const RateLimiter&) = delete; + RateLimiter& operator =(const RateLimiter&) = delete; + RateLimiter(RateLimiter&&) = delete; + RateLimiter& operator =(RateLimiter&&) = delete; + RateLimiter() = delete; + RateLimiter(std::atomic_bool& replacing, std::condition_variable& cv) + : replacing(replacing), cv(cv) + { + // prevents rehash, so no iterators invalidation + ratelimit_entries.max_load_factor(1000); + }; + + bool should_rate_limit(const char *method, const std::string& key, ceph::coarse_real_time curr_timestamp, const RGWRateLimitInfo* ratelimit_info) { + if (key.empty() || key.length() == 1 || !ratelimit_info->enabled) + { + return false; + } + bool is_read = is_read_op(method); + auto& it = find_or_create(key); + auto curr_ts = curr_timestamp.time_since_epoch(); + return it.should_rate_limit(is_read ,ratelimit_info, curr_ts); + } + void giveback_tokens(const char *method, const std::string& key) + { + bool is_read = is_read_op(method); + auto& it = find_or_create(key); + it.giveback_tokens(is_read); + } + void decrease_bytes(const char *method, const std::string& key, const int64_t amount, const RGWRateLimitInfo* info) { + if (key.empty() || key.length() == 1 || !info->enabled) + { + return; + } + bool is_read = is_read_op(method); + if ((is_read && !info->max_read_bytes) || (!is_read && !info->max_write_bytes)) + { + return; + } + auto& it = find_or_create(key); + it.decrease_bytes(is_read, amount, info); + } + void clear() { + ratelimit_entries.clear(); + } +}; +// This class purpose is to hold 2 RateLimiter instances, one active and one passive. +// once the active has reached the watermark for clearing it will call the replace_active() thread using cv +// The replace_active will clear the previous RateLimiter after all requests to it has been done (use_count() > 1) +// In the meanwhile new requests will come into the newer active +class ActiveRateLimiter : public DoutPrefix { + std::atomic_uint8_t stopped = {false}; + std::condition_variable cv; + std::mutex cv_m; + std::thread runner; + std::atomic_bool replacing = false; + std::atomic_uint8_t current_active = 0; + std::shared_ptr ratelimit[2]; + void replace_active() { + using namespace std::chrono_literals; + std::unique_lock lk(cv_m); + while (!stopped) { + cv.wait(lk); + current_active = current_active ^ 1; + ldpp_dout(this, 20) << "replacing active ratelimit data structure" << dendl; + while (!stopped && ratelimit[(current_active ^ 1)].use_count() > 1 ) { + if (cv.wait_for(lk, 1min) != std::cv_status::timeout && stopped) + { + return; + } + } + if (stopped) + { + return; + } + ldpp_dout(this, 20) << "clearing passive ratelimit data structure" << dendl; + ratelimit[(current_active ^ 1)]->clear(); + replacing = false; + } + } + public: + ActiveRateLimiter(const ActiveRateLimiter&) = delete; + ActiveRateLimiter& operator =(const ActiveRateLimiter&) = delete; + ActiveRateLimiter(ActiveRateLimiter&&) = delete; + ActiveRateLimiter& operator =(ActiveRateLimiter&&) = delete; + ActiveRateLimiter() = delete; + ActiveRateLimiter(CephContext* cct) : + DoutPrefix(cct, ceph_subsys_rgw, "rate limiter: ") + { + ratelimit[0] = std::make_shared(replacing, cv); + ratelimit[1] = std::make_shared(replacing, cv); + } + ~ActiveRateLimiter() { + ldpp_dout(this, 20) << "stopping ratelimit_gc thread" << dendl; + cv_m.lock(); + stopped = true; + cv_m.unlock(); + cv.notify_all(); + runner.join(); + } + std::shared_ptr get_active() { + return ratelimit[current_active]; + } + void start() { + ldpp_dout(this, 20) << "starting ratelimit_gc thread" << dendl; + runner = std::thread(&ActiveRateLimiter::replace_active, this); + const auto rc = ceph_pthread_setname(runner.native_handle(), "ratelimit_gc"); + ceph_assert(rc==0); + } +}; diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 51e89fd10a48b..fd2b781fa2095 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -28,6 +28,7 @@ #include "rgw_resolve.h" #include "rgw_sal_rados.h" +#include "rgw_ratelimit.h" #include #define dout_subsys ceph_subsys_rgw @@ -759,6 +760,16 @@ int dump_body(struct req_state* const s, const char* const buf, const size_t len) { + bool healthchk = false; + // we dont want to limit health checks + if(s->op_type == RGW_OP_GET_HEALTH_CHECK) + healthchk = true; + if(len > 0 && !healthchk) { + const char *method = s->info.method; + s->ratelimit_data->decrease_bytes(method, s->ratelimit_user_name, len, &s->user_ratelimit); + if(!rgw::sal::Bucket::empty(s->bucket.get())) + s->ratelimit_data->decrease_bytes(method, s->ratelimit_bucket_marker, len, &s->bucket_ratelimit); + } try { return RESTFUL_IO(s)->send_body(buf, len); } catch (rgw::io::Exception& e) { @@ -780,11 +791,24 @@ int recv_body(struct req_state* const s, char* const buf, const size_t max) { + int len; try { - return RESTFUL_IO(s)->recv_body(buf, max); + len = RESTFUL_IO(s)->recv_body(buf, max); } catch (rgw::io::Exception& e) { return -e.code().value(); } + bool healthchk = false; + // we dont want to limit health checks + if(s->op_type == RGW_OP_GET_HEALTH_CHECK) + healthchk = true; + if(len > 0 && !healthchk) { + const char *method = s->info.method; + s->ratelimit_data->decrease_bytes(method, s->ratelimit_user_name, len, &s->user_ratelimit); + if(!rgw::sal::Bucket::empty(s->bucket.get())) + s->ratelimit_data->decrease_bytes(method, s->ratelimit_bucket_marker, len, &s->bucket_ratelimit); + } + return len; + } int RGWGetObj_ObjStore::get_params(optional_yield y) diff --git a/src/rgw/rgw_rest_swift.cc b/src/rgw/rgw_rest_swift.cc index 414f3a31c1a4c..f8a966d851c46 100644 --- a/src/rgw/rgw_rest_swift.cc +++ b/src/rgw/rgw_rest_swift.cc @@ -2378,7 +2378,7 @@ int RGWSwiftWebsiteHandler::serve_errordoc(const int http_ret, RGWOp* newop = &get_errpage_op; RGWRequest req(0); - return rgw_process_authenticated(handler, newop, &req, s, y, true); + return rgw_process_authenticated(handler, newop, &req, s, y, store, true); } int RGWSwiftWebsiteHandler::error_handler(const int err_no, diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index c815032164f11..baa535058a558 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -311,6 +311,8 @@ class Store { const std::map& meta) = 0; /** Get default quota info. Used as fallback if a user or bucket has no quota set*/ virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) = 0; + /** Get global rate limit configuration*/ + virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) = 0; /** Enable or disable a set of bucket. e.g. if a User is suspended */ virtual int set_buckets_enabled(const DoutPrefixProvider* dpp, std::vector& buckets, bool enabled) = 0; /** Get a new request ID */ @@ -486,12 +488,7 @@ class User { virtual int read_attrs(const DoutPrefixProvider* dpp, optional_yield y) = 0; /** Set the attributes in attrs, leaving any other existing attrs set, and * write them to the backing store; a merge operation */ - virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) { - for(auto& it : new_attrs) { - attrs[it.first] = it.second; - } - return 0; - } + virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0; virtual int read_stats(const DoutPrefixProvider *dpp, optional_yield y, RGWStorageStats* stats, ceph::real_time* last_stats_sync = nullptr, @@ -507,8 +504,10 @@ class User { std::map& usage) = 0; /** Trim User usage stats to the given epoch range */ virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) = 0; - virtual RGWObjVersionTracker& get_version_tracker() { return objv_tracker; } - virtual Attrs& get_attrs() { return attrs; } + + /** Load this User from the backing store. requires ID to be set, fills all other fields. */ + virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) = 0; + /** Store this User to the backing store */ virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info = nullptr) = 0; /** Remove this User from the backing store */ virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) = 0; @@ -681,12 +680,7 @@ class Bucket { virtual int check_quota(const DoutPrefixProvider *dpp, RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) = 0; /** Set the attributes in attrs, leaving any other existing attrs set, and * write them to the backing store; a merge operation */ - virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) { - for(auto& it : new_attrs) { - attrs[it.first] = it.second; - } - return 0; - } + virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0; /** Try to refresh the cached bucket info from the backing store. Used in * read-modify-update loop. */ virtual int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime) = 0; diff --git a/src/rgw/rgw_sal_dbstore.cc b/src/rgw/rgw_sal_dbstore.cc index a81fc94685b18..2026c8e5b7ef4 100644 --- a/src/rgw/rgw_sal_dbstore.cc +++ b/src/rgw/rgw_sal_dbstore.cc @@ -199,7 +199,13 @@ namespace rgw::sal { return ret; } - + int DBUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) + { + for(auto& it : new_attrs) { + attrs[it.first] = it.second; + } + return store_user(dpp, y, false); + } int DBUser::store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info) { int ret = 0; @@ -322,7 +328,9 @@ namespace rgw::sal { { int ret = 0; - Bucket::merge_and_store_attrs(dpp, new_attrs, y); + for(auto& it : new_attrs) { + attrs[it.first] = it.second; + } /* XXX: handle has_instance_obj like in set_bucket_instance_attrs() */ @@ -1753,6 +1761,11 @@ namespace rgw::sal { return 0; } + void DBStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) + { + return; + } + void DBStore::get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) { // XXX: Not handled for the first pass diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 30da100d1084a..02d622a0682e2 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -114,6 +114,7 @@ protected: virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override; /* Placeholders */ + virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) override; virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) override; virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info = nullptr) override; virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) override; @@ -731,6 +732,7 @@ public: virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override; virtual int register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type, const map& meta) override; + virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) override; virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) override; virtual int set_buckets_enabled(const DoutPrefixProvider *dpp, vector& buckets, bool enabled) override; virtual uint64_t get_new_req_id() override { return 0; } diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc index be602880d0228..638b2270ab05b 100644 --- a/src/rgw/rgw_sal_rados.cc +++ b/src/rgw/rgw_sal_rados.cc @@ -285,7 +285,9 @@ int RadosUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y) int RadosUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) { - User::merge_and_store_attrs(dpp, new_attrs, y); + for(auto& it : new_attrs) { + attrs[it.first] = it.second; + } return store_user(dpp, y, false); } @@ -728,7 +730,9 @@ int RadosBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuotaInfo& user_q int RadosBucket::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) { - Bucket::merge_and_store_attrs(dpp, new_attrs, y); + for(auto& it : new_attrs) { + attrs[it.first] = it.second; + } return store->ctl()->bucket->set_bucket_instance_attrs(get_info(), new_attrs, &get_info().objv_tracker, y, dpp); } @@ -1213,6 +1217,13 @@ void RadosStore::get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) user_quota = svc()->quota->get_user_quota(); } +void RadosStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) +{ + bucket_ratelimit = svc()->zone->get_current_period().get_config().bucket_ratelimit; + user_ratelimit = svc()->zone->get_current_period().get_config().user_ratelimit; + anon_ratelimit = svc()->zone->get_current_period().get_config().anon_ratelimit; +} + int RadosStore::set_buckets_enabled(const DoutPrefixProvider* dpp, vector& buckets, bool enabled) { return rados->set_buckets_enabled(buckets, enabled, dpp); diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h index 981bb2335b21f..4f5ef28988a02 100644 --- a/src/rgw/rgw_sal_rados.h +++ b/src/rgw/rgw_sal_rados.h @@ -411,6 +411,7 @@ class RadosStore : public Store { virtual int register_to_service_map(const DoutPrefixProvider *dpp, const std::string& daemon_type, const std::map& meta) override; virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) override; + virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) override; virtual int set_buckets_enabled(const DoutPrefixProvider* dpp, std::vector& buckets, bool enabled) override; virtual uint64_t get_new_req_id() override { return rados->get_new_req_id(); } virtual int get_sync_policy_handler(const DoutPrefixProvider* dpp, diff --git a/src/rgw/rgw_user.h b/src/rgw/rgw_user.h index c29224cbeb48e..68bbcb552042b 100644 --- a/src/rgw/rgw_user.h +++ b/src/rgw/rgw_user.h @@ -173,9 +173,13 @@ struct RGWUserAdminOpState { bool bucket_quota_specified{false}; bool user_quota_specified{false}; + bool bucket_ratelimit_specified{false}; + bool user_ratelimit_specified{false}; RGWQuotaInfo bucket_quota; RGWQuotaInfo user_quota; + RGWRateLimitInfo user_ratelimit; + RGWRateLimitInfo bucket_ratelimit; // req parameters for listing user std::string marker{""}; @@ -339,6 +343,16 @@ struct RGWUserAdminOpState { user_quota_specified = true; } + void set_bucket_ratelimit(RGWRateLimitInfo& ratelimit) { + bucket_ratelimit = ratelimit; + bucket_ratelimit_specified = true; + } + + void set_user_ratelimit(RGWRateLimitInfo& ratelimit) { + user_ratelimit = ratelimit; + user_ratelimit_specified = true; + } + void set_mfa_ids(const std::set& ids) { mfa_ids = ids; mfa_ids_specified = true; diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index e5ea2d625f8e2..5daffe58a8ac9 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -2755,12 +2755,18 @@ void RGWPeriodConfig::dump(Formatter *f) const { encode_json("bucket_quota", bucket_quota, f); encode_json("user_quota", user_quota, f); + encode_json("user_ratelimit", user_ratelimit, f); + encode_json("bucket_ratelimit", bucket_ratelimit, f); + encode_json("anonymous_ratelimit", anon_ratelimit, f); } void RGWPeriodConfig::decode_json(JSONObj *obj) { JSONDecoder::decode_json("bucket_quota", bucket_quota, obj); JSONDecoder::decode_json("user_quota", user_quota, obj); + JSONDecoder::decode_json("user_ratelimit", user_ratelimit, obj); + JSONDecoder::decode_json("bucket_ratelimit", bucket_ratelimit, obj); + JSONDecoder::decode_json("anonymous_ratelimit", anon_ratelimit, obj); } void RGWRegionMap::dump(Formatter *f) const diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 9a0066d9ad152..a84d492e1f52c 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -1028,18 +1028,30 @@ struct RGWPeriodConfig { RGWQuotaInfo bucket_quota; RGWQuotaInfo user_quota; + RGWRateLimitInfo user_ratelimit; + RGWRateLimitInfo bucket_ratelimit; + // rate limit unauthenticated user + RGWRateLimitInfo anon_ratelimit; void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(bucket_quota, bl); encode(user_quota, bl); + encode(bucket_ratelimit, bl); + encode(user_ratelimit, bl); + encode(anon_ratelimit, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(bucket_quota, bl); decode(user_quota, bl); + if (struct_v >= 2) { + decode(bucket_ratelimit, bl); + decode(user_ratelimit, bl); + decode(anon_ratelimit, bl); + } DECODE_FINISH(bl); } diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 1fc852247f400..6784b966ca445 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -56,10 +56,18 @@ quota set set quota params quota enable enable quota quota disable disable quota + ratelimit get get ratelimit params + ratelimit set set ratelimit params + ratelimit enable enable ratelimit + ratelimit disable disable ratelimit global quota get view global quota params global quota set set global quota params global quota enable enable a global quota global quota disable disable a global quota + global ratelimit get view global ratelimit params + global ratelimit set set global ratelimit params + global ratelimit enable enable a ratelimit quota + global ratelimit disable disable a ratelimit quota realm create create a new realm realm rm remove a realm realm get show realm info @@ -310,6 +318,14 @@ --max-size specify max size (in B/K/M/G/T, negative value to disable) --quota-scope scope of quota (bucket, user) + Rate limiting options: + --max-read-ops specify max requests per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited + --max-read-bytes specify max bytes per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited + --max-write-ops specify max requests per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited + --max-write-bytes specify max bytes per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited + --ratelimit-scope scope of rate limiting: bucket, user, anonymous + anonymous can be configured only with global rate limit + Orphans search options: --num-shards num of shards to use for keeping the temporary scan info --orphan-stale-secs num of seconds to wait before declaring an object to be an orphan (default: 86400) diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 641fd69cd91cf..2df70d88e2ef1 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -52,6 +52,16 @@ set(test_rgw_a_src test_rgw_common.cc) add_library(test_rgw_a STATIC ${test_rgw_a_src}) target_link_libraries(test_rgw_a ${rgw_libs}) +add_executable(bench_rgw_ratelimit bench_rgw_ratelimit.cc) +target_link_libraries(bench_rgw_ratelimit ${rgw_libs}) + +add_executable(bench_rgw_ratelimit_gc bench_rgw_ratelimit_gc.cc ) +target_link_libraries(bench_rgw_ratelimit_gc ${rgw_libs}) + +add_executable(unittest_rgw_ratelimit test_rgw_ratelimit.cc $) +target_link_libraries(unittest_rgw_ratelimit ${rgw_libs}) +add_ceph_unittest(unittest_rgw_ratelimit) + # ceph_test_rgw_manifest set(test_rgw_manifest_srcs test_rgw_manifest.cc) add_executable(ceph_test_rgw_manifest diff --git a/src/test/rgw/bench_rgw_ratelimit.cc b/src/test/rgw/bench_rgw_ratelimit.cc new file mode 100644 index 0000000000000..6c682fcc01b4f --- /dev/null +++ b/src/test/rgw/bench_rgw_ratelimit.cc @@ -0,0 +1,247 @@ +#include "rgw/rgw_ratelimit.h" +#include "rgw/rgw_common.h" +#include "random" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +using Executor = boost::asio::io_context::executor_type; +std::uniform_int_distribution dist(0, 1); +std::random_device rd; +std::default_random_engine rng{rd()}; +std::uniform_int_distribution disttenant(2, 100000000); +struct client_info { + uint64_t accepted = 0; + uint64_t rejected = 0; + uint64_t ops = 0; + uint64_t bytes = 0; + uint64_t num_retries = 0; + std::string tenant; +}; + +struct parameters { + int64_t req_size = 1; + int64_t backend_bandwidth = 1; + size_t wait_between_retries_ms = 1; + int num_clients = 1; +}; +std::shared_ptr> ds = std::make_shared>(std::vector()); + +std::string method[2] = {"PUT", "GET"}; +void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx) +{ + auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); + boost::asio::steady_timer timer(ioctx); + int rw = 0; // will always use PUT method as there is no difference + std::string methodop(method[rw]); + auto req_size = params.req_size; + auto backend_bandwidth = params.backend_bandwidth; +// the 4 * 1024 * 1024 is the RGW default we are sending in a typical environment + while (req_size) { + if (req_size <= backend_bandwidth) { + while (req_size > 0) { + if(req_size > 4*1024*1024) { + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info); + it.bytes += 4*1024*1024; + req_size = req_size - 4*1024*1024; + } + else { + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info); + req_size = 0; + } + } + } else { + int64_t total_bytes = 0; + while (req_size > 0) { + if (req_size >= 4*1024*1024) { + if (total_bytes >= backend_bandwidth) + { + timer.expires_after(std::chrono::seconds(1)); + timer.async_wait(yield); + total_bytes = 0; + } + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info); + it.bytes += 4*1024*1024; + req_size = req_size - 4*1024*1024; + total_bytes += 4*1024*1024; + } + else { + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info); + it.bytes += req_size; + total_bytes += req_size; + req_size = 0; + } + } + } + } +} +bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit) +{ + boost::asio::io_context context; + auto time = ceph::coarse_real_clock::now(); + int rw = 0; // will always use PUT method as there is no different + std::string methodop = method[rw]; + auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); + bool to_fail = ratelimit->should_rate_limit(methodop.c_str(), it.tenant, time, &info); + if(to_fail) + { + it.rejected++; + it.ops++; + return true; + } + it.accepted++; + return false; +} +void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx) +{ + for (;;) + { + bool to_retry = simulate_request(it, info, ratelimit); + while (to_retry && to_run) + { + if (params.wait_between_retries_ms) + { + boost::asio::steady_timer timer(ioctx); + timer.expires_after(std::chrono::milliseconds(params.wait_between_retries_ms)); + timer.async_wait(ctx); + } + to_retry = simulate_request(it, info, ratelimit); + } + if (!to_run) + { + return; + } + simulate_transfer(it, &info, ratelimit, params, ctx, ioctx); + } +} +void simulate_clients(boost::asio::io_context& context, std::string tenant, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, bool& to_run) +{ + for (int i = 0; i < params.num_clients; i++) + { + auto& it = ds->emplace_back(client_info()); + it.tenant = tenant; + int x = ds->size() - 1; + spawn::spawn(context, + [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx) + { + auto& it = ds.get()->operator[](x); + simulate_client(it, info, ratelimit, params, ctx, to_run, context); + }); + } +} +int main(int argc, char **argv) +{ + int num_ratelimit_classes = 1; + int64_t ops_limit = 1; + int64_t bw_limit = 1; + int thread_count = 512; + int runtime = 60; + parameters params; + try + { + using namespace boost::program_options; + options_description desc{"Options"}; + desc.add_options() + ("help,h", "Help screen") + ("num_ratelimit_classes", value()->default_value(1), "how many ratelimit tenants") + ("request_size", value()->default_value(1), "what is the request size we are testing if 0, it will be randomized") + ("backend_bandwidth", value()->default_value(1), "what is the backend bandwidth, so there will be wait between decrease_bytes") + ("wait_between_retries_ms", value()->default_value(1), "time in seconds to wait between retries") + ("ops_limit", value()->default_value(1), "ops limit for the tenants") + ("bw_limit", value()->default_value(1), "bytes per second limit") + ("threads", value()->default_value(512), "server's threads count") + ("runtime", value()->default_value(60), "For how many seconds the test will run") + ("num_clients", value()->default_value(1), "number of clients per tenant to run"); + variables_map vm; + store(parse_command_line(argc, argv, desc), vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return EXIT_SUCCESS; + } + num_ratelimit_classes = vm["num_ratelimit_classes"].as(); + params.req_size = vm["request_size"].as(); + params.backend_bandwidth = vm["backend_bandwidth"].as(); + params.wait_between_retries_ms = vm["wait_between_retries_ms"].as(); + params.num_clients = vm["num_clients"].as(); + ops_limit = vm["ops_limit"].as(); + bw_limit = vm["bw_limit"].as(); + thread_count = vm["threads"].as(); + runtime = vm["runtime"].as(); + } + catch (const boost::program_options::error &ex) + { + std::cerr << ex.what() << std::endl; + return EXIT_FAILURE; + } + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = bw_limit; + info.max_write_bytes = bw_limit; + info.max_read_ops = ops_limit; + info.max_write_ops = ops_limit; + std::unique_ptr cct = std::make_unique(CEPH_ENTITY_TYPE_ANY); + if (!g_ceph_context) + { + g_ceph_context = cct.get(); + } + std::shared_ptr ratelimit(new ActiveRateLimiter(g_ceph_context)); + ratelimit->start(); + std::vector threads; + using Executor = boost::asio::io_context::executor_type; + std::optional> work; + threads.reserve(thread_count); + boost::asio::io_context context; + boost::asio::io_context stopme; + work.emplace(boost::asio::make_work_guard(context)); + // server execution + for (int i = 0; i < thread_count; i++) { + threads.emplace_back([&]() noexcept { + context.run(); + }); + } + //client execution + bool to_run = true; + ds->reserve(num_ratelimit_classes*params.num_clients); + for (int i = 0; i < num_ratelimit_classes; i++) + { + unsigned long long tenantid = disttenant(rng); + std::string tenantuser = "uuser" + std::to_string(tenantid); + simulate_clients(context, tenantuser, info, ratelimit->get_active(), params, to_run); + } + boost::asio::steady_timer timer_runtime(stopme); + timer_runtime.expires_after(std::chrono::seconds(runtime)); + timer_runtime.wait(); + work.reset(); + context.stop(); + to_run = false; + + for (auto& i : threads) + { + i.join(); + } + std::unordered_map metrics_by_tenant; + for(auto& i : *ds.get()) + { + auto it = metrics_by_tenant.emplace(i.tenant, client_info()).first; + std::cout << i.accepted << std::endl; + it->second.accepted += i.accepted; + it->second.rejected += i.rejected; + } + // TODO sum the results by tenant + for(auto& i : metrics_by_tenant) + { + std::cout << "Tenant is: " << i.first << std::endl; + std::cout << "Simulator finished accepted sum : " << i.second.accepted << std::endl; + std::cout << "Simulator finished rejected sum : " << i.second.rejected << std::endl; + } + + return 0; +} diff --git a/src/test/rgw/bench_rgw_ratelimit_gc.cc b/src/test/rgw/bench_rgw_ratelimit_gc.cc new file mode 100644 index 0000000000000..7b07dc2b0fc8e --- /dev/null +++ b/src/test/rgw/bench_rgw_ratelimit_gc.cc @@ -0,0 +1,52 @@ +#include "rgw/rgw_ratelimit.h" +#include "rgw/rgw_common.h" +#include "random" +#include +#include +#include +#include +int main(int argc, char **argv) +{ + int num_qos_classes = 1; + try + { + using namespace boost::program_options; + options_description desc{"Options"}; + desc.add_options() + ("help,h", "Help screen") + ("num_qos_classes", value()->default_value(1), "how many qos tenants"); + variables_map vm; + store(parse_command_line(argc, argv, desc), vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return EXIT_SUCCESS; + } + num_qos_classes = vm["num_qos_classes"].as(); + } + catch (const boost::program_options::error &ex) + { + std::cerr << ex.what() << std::endl; + return EXIT_FAILURE; + } + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = 0; + info.max_write_bytes = 0; + info.max_read_ops = 0; + info.max_write_ops = 0; + std::unique_ptr cct = std::make_unique(CEPH_ENTITY_TYPE_ANY); + if (!g_ceph_context) + { + g_ceph_context = cct.get(); + } + std::shared_ptr ratelimit(new ActiveRateLimiter(g_ceph_context)); + ratelimit->start(); + auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); + for(int i = 0; i < num_qos_classes; i++) + { + std::string tenant = "uuser" + std::to_string(i); + auto time = ceph::coarse_real_clock::now(); + ratelimit->get_active()->should_rate_limit("PUT", tenant, time, &info); + } + +} diff --git a/src/test/rgw/test_rgw_lua.cc b/src/test/rgw/test_rgw_lua.cc index 32ec599985d73..501144d5a34d2 100644 --- a/src/test/rgw/test_rgw_lua.cc +++ b/src/test/rgw/test_rgw_lua.cc @@ -115,7 +115,9 @@ public: virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) override { return 0; } - + virtual int merge_and_store_attrs(const DoutPrefixProvider *dpp, rgw::sal::Attrs& attrs, optional_yield y) override { + return 0; + } virtual ~TestUser() = default; }; diff --git a/src/test/rgw/test_rgw_ratelimit.cc b/src/test/rgw/test_rgw_ratelimit.cc new file mode 100644 index 0000000000000..875ec8392ae48 --- /dev/null +++ b/src/test/rgw/test_rgw_ratelimit.cc @@ -0,0 +1,376 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include "rgw/rgw_ratelimit.h" + + +using namespace std::chrono_literals; + +TEST(RGWRateLimit, op_limit_not_enabled) +{ + // info.enabled = false, so no limit + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("PUT", key, time, &info); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimit, reject_op_over_limit) +{ + // check that request is being rejected because there are not enough tokens + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + time = ceph::coarse_real_clock::now(); + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(true, success); +} +TEST(RGWRateLimit, accept_op_after_giveback) +{ + // check that giveback is working fine + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + ratelimit.giveback_tokens("GET", key); + time = ceph::coarse_real_clock::now(); + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimit, accept_op_after_refill) +{ + // check that tokens are being filled properly + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + time += 61s; + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimit, reject_bw_over_limit) +{ + // check that a newer request is rejected if there is no enough tokens (bw) + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = 1; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + ratelimit.decrease_bytes("GET",key, 2, &info); + time = ceph::coarse_real_clock::now(); + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(true, success); +} +TEST(RGWRateLimit, accept_bw) +{ + // check that when there are enough tokens (bw) the request is still being served + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = 2; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + ratelimit.decrease_bytes("GET",key, 1, &info); + time = ceph::coarse_real_clock::now(); + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimit, check_bw_debt_at_max_120secs) +{ + // check that the bandwidth debt is not larger than 120 seconds + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = 2; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + ratelimit.decrease_bytes("GET",key, 100, &info); + time += 121s; + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimit, check_that_bw_limit_not_affect_ops) +{ + // check that high read bytes limit, does not affect ops limit + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + info.max_read_bytes = 100000000; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + ratelimit.decrease_bytes("GET",key, 10000, &info); + time = ceph::coarse_real_clock::now(); + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(true, success); +} +TEST(RGWRateLimit, read_limit_does_not_affect_writes) +{ + // read limit does not affect writes + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + info.max_read_bytes = 100000000; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("PUT", key, time, &info); + ratelimit.decrease_bytes("PUT",key, 10000, &info); + time = ceph::coarse_real_clock::now(); + success = ratelimit.should_rate_limit("PUT", key, time, &info); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimit, write_limit_does_not_affect_reads) +{ + // write limit does not affect reads + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + info.max_write_ops = 1; + info.max_write_bytes = 100000000; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + ratelimit.decrease_bytes("GET",key, 10000, &info); + time = ceph::coarse_real_clock::now(); + success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(false, success); +} + +TEST(RGWRateLimit, allow_unlimited_access) +{ + // 0 values in RGWRateLimitInfo should allow unlimited access + std::atomic_bool replacing; + std::condition_variable cv; + RateLimiter ratelimit(replacing, cv); + RGWRateLimitInfo info; + info.enabled = true; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + bool success = ratelimit.should_rate_limit("GET", key, time, &info); + EXPECT_EQ(false, success); +} + +TEST(RGWRateLimitGC, NO_GC_AHEAD_OF_TIME) +{ + // Test if GC is not starting the replace before getting to map_size * 0.9 + // Please make sure to change those values when you change the map_size in the code + + std::shared_ptr ratelimit(new ActiveRateLimiter(g_ceph_context)); + ratelimit->start(); + auto active = ratelimit->get_active(); + RGWRateLimitInfo info; + auto time = ceph::coarse_real_clock::now(); + std::string key = "uuser123"; + active->should_rate_limit("GET", key, time, &info); + auto activegc = ratelimit->get_active(); + EXPECT_EQ(activegc, active); +} +TEST(RGWRateLimiterGC, GC_IS_WORKING) +{ + // Test if GC is replacing the active RateLimiter + // Please make sure to change those values when you change the map_size in the code + + std::shared_ptr ratelimit(new ActiveRateLimiter(g_ceph_context)); + ratelimit->start(); + auto active = ratelimit->get_active(); + RGWRateLimitInfo info; + info.enabled = true; + auto time = ceph::coarse_real_clock::now(); + std::string key = "-1"; + for(int i = 0; i < 2000000; i++) + { + active->should_rate_limit("GET", key, time, &info); + key = std::to_string(i); + } + auto activegc = ratelimit->get_active(); + EXPECT_NE(activegc, active); +} + + +TEST(RGWRateLimitEntry, op_limit_not_enabled) +{ + // info.enabled = false, so no limit + RateLimiterEntry entry; + RGWRateLimitInfo info; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(false, &info, time); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimitEntry, reject_op_over_limit) +{ + // check that request is being rejected because there are not enough tokens + + RGWRateLimitInfo info; + RateLimiterEntry entry; + info.enabled = true; + info.max_read_ops = 1; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + time = ceph::coarse_real_clock::now().time_since_epoch(); + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(true, success); +} +TEST(RGWRateLimitEntry, accept_op_after_giveback) +{ + // check that giveback is working fine + RGWRateLimitInfo info; + RateLimiterEntry entry; + info.enabled = true; + info.max_read_ops = 1; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + entry.giveback_tokens(true); + time = ceph::coarse_real_clock::now().time_since_epoch(); + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimitEntry, accept_op_after_refill) +{ + // check that tokens are being filled properly + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + time += 61s; + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimitEntry, reject_bw_over_limit) +{ + // check that a newer request is rejected if there is no enough tokens (bw) + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = 1; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + entry.decrease_bytes(true, 2, &info); + time = ceph::coarse_real_clock::now().time_since_epoch(); + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(true, success); +} +TEST(RGWRateLimitEntry, accept_bw) +{ + // check that when there are enough tokens (bw) the request is still being served + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = 2; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + entry.decrease_bytes(true, 1, &info); + time = ceph::coarse_real_clock::now().time_since_epoch(); + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimitEntry, check_bw_debt_at_max_120secs) +{ + // check that the bandwidth debt is not larger than 120 seconds + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = 2; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + entry.decrease_bytes(true, 100, &info); + time += 121s; + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimitEntry, check_that_bw_limit_not_affect_ops) +{ + // check that high read bytes limit, does not affect ops limit + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + info.max_read_bytes = 100000000; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + entry.decrease_bytes(true, 10000, &info); + time = ceph::coarse_real_clock::now().time_since_epoch(); + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(true, success); +} +TEST(RGWRateLimitEntry, read_limit_does_not_affect_writes) +{ + // read limit does not affect writes + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_ops = 1; + info.max_read_bytes = 100000000; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(false, &info, time); + entry.decrease_bytes(false, 10000, &info); + time = ceph::coarse_real_clock::now().time_since_epoch(); + success = entry.should_rate_limit(false, &info, time); + EXPECT_EQ(false, success); +} +TEST(RGWRateLimitEntry, write_limit_does_not_affect_reads) +{ + // write limit does not affect reads + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + info.max_write_ops = 1; + info.max_write_bytes = 100000000; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + std::string key = "uuser123"; + bool success = entry.should_rate_limit(true, &info, time); + entry.decrease_bytes(true, 10000, &info); + time = ceph::coarse_real_clock::now().time_since_epoch(); + success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(false, success); +} + +TEST(RGWRateLimitEntry, allow_unlimited_access) +{ + // 0 values in RGWRateLimitInfo should allow unlimited access (default value) + RateLimiterEntry entry; + RGWRateLimitInfo info; + info.enabled = true; + auto time = ceph::coarse_real_clock::now().time_since_epoch(); + bool success = entry.should_rate_limit(true, &info, time); + EXPECT_EQ(false, success); +} -- 2.39.5