]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: Add rgw rate limiting per user and per bucket
authorOr Friedmann <ofriedma@redhat.com>
Tue, 1 Jun 2021 12:45:09 +0000 (15:45 +0300)
committerOr Friedmann <ofriedma@redhat.com>
Wed, 5 Jan 2022 15:22:13 +0000 (15:22 +0000)
Add rgw rate limiting per user and per bucket

Signed-off-by: Or Friedmann <ofriedma@redhat.com>
27 files changed:
doc/radosgw/admin.rst
src/rgw/rgw_admin.cc
src/rgw/rgw_asio_frontend.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_loadgen_process.cc
src/rgw/rgw_main.cc
src/rgw/rgw_process.cc
src/rgw/rgw_process.h
src/rgw/rgw_ratelimit.h [new file with mode: 0644]
src/rgw/rgw_rest.cc
src/rgw/rgw_rest_swift.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_rados.cc
src/rgw/rgw_sal_rados.h
src/rgw/rgw_user.h
src/rgw/rgw_zone.cc
src/rgw/rgw_zone.h
src/test/cli/radosgw-admin/help.t
src/test/rgw/CMakeLists.txt
src/test/rgw/bench_rgw_ratelimit.cc [new file with mode: 0644]
src/test/rgw/bench_rgw_ratelimit_gc.cc [new file with mode: 0644]
src/test/rgw/test_rgw_lua.cc
src/test/rgw/test_rgw_ratelimit.cc [new file with mode: 0644]

index 5f47471acf7fd8f9921bb13461f4c19c9d676c31..fa00a5d03bca96211f80ecba203eb2ec3a038818 100644 (file)
@@ -311,7 +311,7 @@ To add administrative capabilities to a user, execute the following::
 You can add read, write or all capabilities to users, buckets, metadata and 
 usage (utilization). For example::
 
-       --caps="[users|buckets|metadata|usage|zone]=[*|read|write|read, write]"
+       --caps="[users|buckets|metadata|usage|zone|amz-cache|info|bilog|mdlog|datalog|user-policy|oidc-provider|roles]=[*|read|write|read, write]"
 
 For example::
 
@@ -473,6 +473,174 @@ commands. ::
    be restarted for the changes to take effect.
 
 
+Rate Limit Management
+=====================
+
+The Ceph Object Gateway enables you to set rate limits on users and buckets. 
+Rate limit includes the maximum number of read ops and write ops per minute
+and how many bytes per minute could be written or read per user or per bucket.
+Requests that are using GET or HEAD method in the REST request are considered as "read requests", otherwise they are considered as "write requests".
+Every Object Gateway tracks per user and bucket metrics separately, these metrics are not shared with other gateways.
+That means that the desired limits configured should be divide by the number of active Object Gateways.
+For example, if userA should be limited by 10 ops per minute and there are 2 Object Gateways in the cluster,
+the limit over userA should be 5 (10 ops per minute / 2 RGWs).
+if the requests are ``not`` balanced between RGWs, the rate limit may be underutilized.
+For example, if the ops limit is 5 and there are 2 RGWs, ``but`` the Load Balancer send load only to one of those RGWs,
+The effective limit would be 5 ops, because this limit is enforced per RGW.
+If there is a limit reached for bucket not for user or vice versa the request would be cancelled as well.
+The bandwidth counting happens after the request is being accepted, as a result, even if in the middle of the request the bucket/user has reached its bandwidth limit this request will proceed.
+The RGW will keep a "debt" of used bytes more than the configured value and will prevent this user/bucket from sending more requests until there "debt" is being paid.
+The "debt" maximum size is twice the max-read/write-bytes per minute.
+If userA has 1 byte read limit per minute and this user tries to GET 1 GB object, the user will be able to do it.
+After userA completes this 1GB operation, the RGW will block the user request for up to 2 minutes until userA will be able to send GET request again.
+
+
+- **Bucket:** The ``--bucket`` option allows you to specify a rate limit for a
+  bucket.
+
+- **User:** The ``--uid`` option allows you to specify a rate limit for a
+  user.
+
+- **Maximum Read Ops:** The ``--max-read-ops`` setting allows you to specify
+  the maximum number of read ops per minute per RGW. A 0 value disables this setting (which means unlimited access).
+  
+- **Maximum Read Bytes:** The ``--max-read-bytes`` setting allows you to specify
+  the maximum number of read bytes per minute per RGW. A 0 value disables this setting (which means unlimited access).
+
+- **Maximum Write Ops:** The ``--max-write-ops`` setting allows you to specify
+  the maximum number of write ops per minute per RGW. A 0 value disables this setting (which means unlimited access).
+  
+- **Maximum Write Bytes:** The ``--max-write-bytes`` setting allows you to specify
+  the maximum number of write bytes per minute per RGW. A 0 value disables this setting (which means unlimited access).
+- **Rate Limit Scope:** The ``--ratelimit-scope `` option sets the scope for the rate limit.
+  The options are ``bucket`` , ``user`` and ``anonymous``. Bucket rate limit apply to buckets. 
+  The user rate limit applies to a user. Anonymous applies to an unauthenticated user.
+  Anonymous scope is only available for global rate limit.
+
+
+Set User Rate Limit
+-------------------
+
+Before you enable a rate limit, you must first set the rate limit parameters.
+For example:: 
+
+       radosgw-admin ratelimit set --ratelimit-scope=user --uid=<uid> <[--max-read-ops=<num ops>] [--max-read-bytes=<num bytes>]
+  [--max-write-ops=<num ops>] [--max-write-bytes=<num bytes>]>
+
+For example:: 
+
+       radosgw-admin ratelimit set --ratelimit-scope=user --uid=johndoe --max-read-ops=1024 --max-write-bytes=10240
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+Get User Rate Limit
+-------------------
+
+Get the current configured rate limit parameters
+For example:: 
+
+       radosgw-admin ratelimit set --ratelimit-scope=user --uid=<uid>
+
+For example:: 
+
+       radosgw-admin ratelimit get --ratelimit-scope=user --uid=johndoe
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+
+Enable/Disable User Rate Limit
+------------------------------
+
+Once you set a user rate limit, you may enable it. For example:: 
+
+       radosgw-admin ratelimit enable --ratelimit-scope=user --uid=<uid>
+
+You may disable an enabled user rate limit. For example:: 
+
+       radosgw-admin ratelimit disable --ratelimit-scope=user --uid=johndoe
+
+
+Set Bucket Rate Limit
+---------------------
+
+Before you enable a rate limit, you must first set the rate limit parameters.
+For example:: 
+
+       radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket=<bucket> <[--max-read-ops=<num ops>] [--max-read-bytes=<num bytes>]
+  [--max-write-ops=<num ops>] [--max-write-bytes=<num bytes>]>
+
+For example:: 
+
+       radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket=mybucket --max-read-ops=1024 --max-write-bytes=10240
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+Get Bucket Rate Limit
+---------------------
+
+Get the current configured rate limit parameters
+For example:: 
+
+       radosgw-admin ratelimit set --ratelimit-scope=bucket --bucket=<bucket>
+
+For example:: 
+
+       radosgw-admin ratelimit get --ratelimit-scope=bucket --bucket=mybucket
+
+
+A 0 value for num ops and / or num bytes means that the
+specific rate limit attribute check is disabled.
+
+
+Enable/Disable Bucket Rate Limit
+--------------------------------
+
+Once you set a bucket rate limit, you may enable it. For example:: 
+
+       radosgw-admin ratelimit enable --ratelimit-scope=bucket --bucket=<bucket>
+
+You may disable an enabled bucket rate limit. For example:: 
+
+       radosgw-admin ratelimit disable --ratelimit-scope=bucket --uid=mybucket
+
+
+Reading / Writing Global Rate Limit Configuration
+-------------------------------------------------
+
+You can read and write global rate limit settings in the period configuration. To
+view the global rate limit settings::
+
+       radosgw-admin global rate limit get
+
+The global rate limit settings can be manipulated with the ``global ratelimit``
+counterparts of the ``ratelimit set``, ``ratelimit enable``, and ``ratelimit disable``
+commands. ``Per user and per bucket ratelimit configuration is overriding the global configuration``::
+
+       radosgw-admin global ratelimit set --ratelimit-scope bucket --max-read-ops=1024
+       radosgw-admin global ratelimit enable --ratelimit-scope bucket
+
+The global rate limit can configure rate limit scope for all authenticated users::
+
+  radosgw-admin global ratelimit set --ratelimit-scope user --max-read-ops=1024
+  radosgw-admin global ratelimit enable --ratelimit-scope user
+
+The global rate limit can configure rate limit scope for all unauthenticated users::
+  
+  radosgw-admin global ratelimit set --ratelimit-scope=anonymous --max-read-ops=1024
+  radosgw-admin global ratelimit enable --ratelimit-scope=anonymous
+
+.. note:: In a multisite configuration, where there is a realm and period
+   present, changes to the global quotas must be committed using ``period
+   update --commit``. If there is no period present, the rados gateway(s) must
+   be restarted for the changes to take effect.
+
 Usage
 =====
 
index 1c21d7267002d9fe0d6f6111bfb0c2769dc8dd56..f72e6f7204fdd97326af4c10fb3375e4481ea887 100644 (file)
@@ -164,10 +164,18 @@ void usage()
   cout << "  quota set                  set quota params\n";
   cout << "  quota enable               enable quota\n";
   cout << "  quota disable              disable quota\n";
+  cout << "  ratelimit get              get ratelimit params\n";
+  cout << "  ratelimit set              set ratelimit params\n";
+  cout << "  ratelimit enable           enable ratelimit\n";
+  cout << "  ratelimit disable          disable ratelimit\n";
   cout << "  global quota get           view global quota params\n";
   cout << "  global quota set           set global quota params\n";
   cout << "  global quota enable        enable a global quota\n";
   cout << "  global quota disable       disable a global quota\n";
+  cout << "  global ratelimit get       view global ratelimit params\n";
+  cout << "  global ratelimit set       set global ratelimit params\n";
+  cout << "  global ratelimit enable    enable a ratelimit quota\n";
+  cout << "  global ratelimit disable   disable a ratelimit quota\n";
   cout << "  realm create               create a new realm\n";
   cout << "  realm rm                   remove a realm\n";
   cout << "  realm get                  show realm info\n";
@@ -416,6 +424,13 @@ void usage()
   cout << "   --max-objects             specify max objects (negative value to disable)\n";
   cout << "   --max-size                specify max size (in B/K/M/G/T, negative value to disable)\n";
   cout << "   --quota-scope             scope of quota (bucket, user)\n";
+  cout << "\nRate limiting options:\n";
+  cout << "   --max-read-ops            specify max requests per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited\n";
+  cout << "   --max-read-bytes          specify max bytes per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited\n";
+  cout << "   --max-write-ops           specify max requests per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited\n";
+  cout << "   --max-write-bytes         specify max bytes per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited\n";
+  cout << "   --ratelimit-scope         scope of rate limiting: bucket, user, anonymous\n";
+  cout << "                             anonymous can be configured only with global rate limit\n";
   cout << "\nOrphans search options:\n";
   cout << "   --num-shards              num of shards to use for keeping the temporary scan info\n";
   cout << "   --orphan-stale-secs       num of seconds to wait before declaring an object to be an orphan (default: 86400)\n";
@@ -660,6 +675,10 @@ enum class OPT {
   ORPHANS_FIND,
   ORPHANS_FINISH,
   ORPHANS_LIST_JOBS,
+  RATELIMIT_GET,
+  RATELIMIT_SET,
+  RATELIMIT_ENABLE,
+  RATELIMIT_DISABLE,
   ZONEGROUP_ADD,
   ZONEGROUP_CREATE,
   ZONEGROUP_DEFAULT,
@@ -750,6 +769,10 @@ enum class OPT {
   GLOBAL_QUOTA_SET,
   GLOBAL_QUOTA_ENABLE,
   GLOBAL_QUOTA_DISABLE,
+  GLOBAL_RATELIMIT_GET,
+  GLOBAL_RATELIMIT_SET,
+  GLOBAL_RATELIMIT_ENABLE,
+  GLOBAL_RATELIMIT_DISABLE,
   SYNC_INFO,
   SYNC_STATUS,
   ROLE_CREATE,
@@ -859,6 +882,10 @@ static SimpleCmd::Commands all_cmds = {
   { "quota set", OPT::QUOTA_SET },
   { "quota enable", OPT::QUOTA_ENABLE },
   { "quota disable", OPT::QUOTA_DISABLE },
+  { "ratelimit get", OPT::RATELIMIT_GET },
+  { "ratelimit set", OPT::RATELIMIT_SET },
+  { "ratelimit enable", OPT::RATELIMIT_ENABLE },
+  { "ratelimit disable", OPT::RATELIMIT_DISABLE },
   { "gc list", OPT::GC_LIST },
   { "gc process", OPT::GC_PROCESS },
   { "lc list", OPT::LC_LIST },
@@ -965,6 +992,10 @@ static SimpleCmd::Commands all_cmds = {
   { "global quota set", OPT::GLOBAL_QUOTA_SET },
   { "global quota enable", OPT::GLOBAL_QUOTA_ENABLE },
   { "global quota disable", OPT::GLOBAL_QUOTA_DISABLE },
+  { "global ratelimit get", OPT::GLOBAL_RATELIMIT_GET },
+  { "global ratelimit set", OPT::GLOBAL_RATELIMIT_SET },
+  { "global ratelimit enable", OPT::GLOBAL_RATELIMIT_ENABLE },
+  { "global ratelimit disable", OPT::GLOBAL_RATELIMIT_DISABLE },
   { "sync info", OPT::SYNC_INFO },
   { "sync status", OPT::SYNC_STATUS },
   { "role create", OPT::ROLE_CREATE },
@@ -1261,6 +1292,57 @@ static bool dump_string(const char *field_name, bufferlist& bl, Formatter *f)
   return true;
 }
 
+bool set_ratelimit_info(RGWRateLimitInfo& ratelimit, OPT opt_cmd, int64_t max_read_ops, int64_t max_write_ops,
+                    int64_t max_read_bytes, int64_t max_write_bytes,
+                    bool have_max_read_ops, bool have_max_write_ops,
+                    bool have_max_read_bytes, bool have_max_write_bytes)
+{
+  bool ratelimit_configured = true;
+  switch (opt_cmd) {
+    case OPT::RATELIMIT_ENABLE:
+    case OPT::GLOBAL_RATELIMIT_ENABLE:
+      ratelimit.enabled = true;
+      break;
+
+
+    case OPT::RATELIMIT_SET:
+    case OPT::GLOBAL_RATELIMIT_SET:
+      ratelimit_configured = false;
+      if (have_max_read_ops) {
+        if (max_read_ops >= 0) {
+          ratelimit.max_read_ops = max_read_ops;
+          ratelimit_configured = true;
+        }
+      }
+      if (have_max_write_ops) {
+        if (max_write_ops >= 0) {
+          ratelimit.max_write_ops = max_write_ops;
+          ratelimit_configured = true;
+        }
+      }
+      if (have_max_read_bytes) {
+        if (max_read_bytes >= 0) {
+          ratelimit.max_read_bytes = max_read_bytes;
+          ratelimit_configured = true;
+        }
+      }
+      if (have_max_write_bytes) {
+        if (max_write_bytes >= 0) {
+          ratelimit.max_write_bytes = max_write_bytes;
+          ratelimit_configured = true;
+        }
+      }
+      break;
+    case OPT::RATELIMIT_DISABLE:
+    case OPT::GLOBAL_RATELIMIT_DISABLE:
+      ratelimit.enabled = false;
+      break;
+    default:
+      break;
+  }
+  return ratelimit_configured;
+}
+
 void set_quota_info(RGWQuotaInfo& quota, OPT opt_cmd, int64_t max_size, int64_t max_objects,
                     bool have_max_size, bool have_max_objects)
 {
@@ -1319,6 +1401,141 @@ int set_bucket_quota(rgw::sal::Store* store, OPT opt_cmd,
   return 0;
 }
 
+int set_bucket_ratelimit(rgw::sal::Store* store, OPT opt_cmd,
+                     const string& tenant_name, const string& bucket_name,
+                     int64_t max_read_ops, int64_t max_write_ops,
+                     int64_t max_read_bytes, int64_t max_write_bytes,
+                     bool have_max_read_ops, bool have_max_write_ops,
+                     bool have_max_read_bytes, bool have_max_write_bytes)
+{
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  int r = store->get_bucket(dpp(), nullptr, tenant_name, bucket_name, &bucket, null_yield);
+  if (r < 0) {
+    cerr << "could not get bucket info for bucket=" << bucket_name << ": " << cpp_strerror(-r) << std::endl;
+    return -r;
+  }
+  RGWRateLimitInfo ratelimit_info;
+  auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+  if(iter != bucket->get_attrs().end()) {
+    try {
+      bufferlist& bl = iter->second;
+      auto biter = bl.cbegin();
+      decode(ratelimit_info, biter);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+      return -EIO;
+    }
+  }
+  bool ratelimit_configured = set_ratelimit_info(ratelimit_info, opt_cmd, max_read_ops, max_write_ops,
+                         max_read_bytes, max_write_bytes,
+                         have_max_read_ops, have_max_write_ops,
+                         have_max_read_bytes, have_max_write_bytes);
+  if (!ratelimit_configured) {
+    ldpp_dout(dpp(), 0) << "ERROR: no rate limit values have been specified" << dendl;
+    return -EINVAL;
+  }
+  bufferlist bl;
+  ratelimit_info.encode(bl);
+  rgw::sal::Attrs attr;
+  attr[RGW_ATTR_RATELIMIT] = bl;
+  r = bucket->merge_and_store_attrs(dpp(), attr, null_yield);
+  if (r < 0) {
+    cerr << "ERROR: failed writing bucket instance info: " << cpp_strerror(-r) << std::endl;
+    return -r;
+  }
+  return 0;
+}
+
+int set_user_ratelimit(OPT opt_cmd, std::unique_ptr<rgw::sal::User>& user,
+                     int64_t max_read_ops, int64_t max_write_ops,
+                     int64_t max_read_bytes, int64_t max_write_bytes,
+                     bool have_max_read_ops, bool have_max_write_ops,
+                     bool have_max_read_bytes, bool have_max_write_bytes)
+{
+  RGWRateLimitInfo ratelimit_info;
+  user->load_user(dpp(), null_yield);
+  auto iter = user->get_attrs().find(RGW_ATTR_RATELIMIT);
+  if(iter != user->get_attrs().end()) {
+    try {
+      bufferlist& bl = iter->second;
+      auto biter = bl.cbegin();
+      decode(ratelimit_info, biter);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+      return -EIO;
+    }
+  }
+  bool ratelimit_configured = set_ratelimit_info(ratelimit_info, opt_cmd, max_read_ops, max_write_ops,
+                         max_read_bytes, max_write_bytes,
+                         have_max_read_ops, have_max_write_ops,
+                         have_max_read_bytes, have_max_write_bytes);
+  if (!ratelimit_configured) {
+    ldpp_dout(dpp(), 0) << "ERROR: no rate limit values have been specified" << dendl;
+    return -EINVAL;
+  }
+  bufferlist bl;
+  ratelimit_info.encode(bl);
+  rgw::sal::Attrs attr;
+  attr[RGW_ATTR_RATELIMIT] = bl;
+  int r = user->merge_and_store_attrs(dpp(), attr, null_yield);
+  if (r < 0) {
+    cerr << "ERROR: failed writing user instance info: " << cpp_strerror(-r) << std::endl;
+    return -r;
+  }
+  return 0;
+}
+
+int show_user_ratelimit(std::unique_ptr<rgw::sal::User>& user, Formatter *formatter)
+{
+  RGWRateLimitInfo ratelimit_info;
+  user->load_user(dpp(), null_yield);
+  auto iter = user->get_attrs().find(RGW_ATTR_RATELIMIT);
+  if(iter != user->get_attrs().end()) {
+    try {
+      bufferlist& bl = iter->second;
+      auto biter = bl.cbegin();
+      decode(ratelimit_info, biter);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+      return -EIO;
+    }
+  }
+  formatter->open_object_section("user_ratelimit");
+  encode_json("user_ratelimit", ratelimit_info, formatter);
+  formatter->close_section();
+  formatter->flush(cout);
+  cout << std::endl;
+  return 0;
+}
+
+int show_bucket_ratelimit(rgw::sal::Store* store, const string& tenant_name,
+                          const string& bucket_name, Formatter *formatter)
+{
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  int r = store->get_bucket(dpp(), nullptr, tenant_name, bucket_name, &bucket, null_yield);
+  if (r < 0) {
+    cerr << "could not get bucket info for bucket=" << bucket_name << ": " << cpp_strerror(-r) << std::endl;
+    return -r;
+  }
+  RGWRateLimitInfo ratelimit_info;
+  auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+  if (iter != bucket->get_attrs().end()) {
+    try {
+      bufferlist& bl = iter->second;
+      auto biter = bl.cbegin();
+      decode(ratelimit_info, biter);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp(), 0) << "ERROR: failed to decode rate limit" << dendl;
+      return -EIO;
+    }
+  }
+  formatter->open_object_section("bucket_ratelimit");
+  encode_json("bucket_ratelimit", ratelimit_info, formatter);
+  formatter->close_section();
+  formatter->flush(cout);
+  cout << std::endl;
+  return 0;
+}
 int set_user_bucket_quota(OPT opt_cmd, RGWUser& user, RGWUserAdminOpState& op_state, int64_t max_size, int64_t max_objects,
                           bool have_max_size, bool have_max_objects)
 {
@@ -3255,6 +3472,7 @@ int main(int argc, const char **argv)
   string op_id;
   string op_mask_str;
   string quota_scope;
+  string ratelimit_scope;
   string object_version;
   string placement_id;
   std::optional<string> opt_storage_class;
@@ -3264,8 +3482,16 @@ int main(int argc, const char **argv)
 
   int64_t max_objects = -1;
   int64_t max_size = -1;
+  int64_t max_read_ops = 0;
+  int64_t max_write_ops = 0;
+  int64_t max_read_bytes = 0;
+  int64_t max_write_bytes = 0;
   bool have_max_objects = false;
   bool have_max_size = false;
+  bool have_max_write_ops = false;
+  bool have_max_read_ops = false;
+  bool have_max_write_bytes = false;
+  bool have_max_read_bytes = false;
   int include_all = false;
   int allow_unordered = false;
 
@@ -3487,6 +3713,34 @@ int main(int argc, const char **argv)
         return EINVAL;
       }
       have_max_objects = true;
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-read-ops", (char*)NULL)) {
+      max_read_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+      if (!err.empty()) {
+        cerr << "ERROR: failed to parse max read requests: " << err << std::endl;
+        return EINVAL;
+      }
+      have_max_read_ops = true;
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-write-ops", (char*)NULL)) {
+      max_write_ops = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+      if (!err.empty()) {
+        cerr << "ERROR: failed to parse max write requests: " << err << std::endl;
+        return EINVAL;
+      }
+      have_max_write_ops = true;
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-read-bytes", (char*)NULL)) {
+      max_read_bytes = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+      if (!err.empty()) {
+        cerr << "ERROR: failed to parse max read bytes: " << err << std::endl;
+        return EINVAL;
+      }
+      have_max_read_bytes = true;
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-write-bytes", (char*)NULL)) {
+      max_write_bytes = (int64_t)strict_strtoll(val.c_str(), 10, &err);
+      if (!err.empty()) {
+        cerr << "ERROR: failed to parse max write bytes: " << err << std::endl;
+        return EINVAL;
+      }
+      have_max_write_bytes = true;
     } else if (ceph_argparse_witharg(args, i, &val, "--date", "--time", (char*)NULL)) {
       date = val;
       if (end_date.empty())
@@ -3604,6 +3858,8 @@ int main(int argc, const char **argv)
       end_marker = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--quota-scope", (char*)NULL)) {
       quota_scope = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--ratelimit-scope", (char*)NULL)) {
+      ratelimit_scope = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--index-type", (char*)NULL)) {
       string index_type_str = val;
       bi_index_type = get_bi_index_type(index_type_str);
@@ -3902,6 +4158,8 @@ int main(int argc, const char **argv)
                         OPT::PERIOD_GET_CURRENT, OPT::PERIOD_LIST,
                         OPT::GLOBAL_QUOTA_GET, OPT::GLOBAL_QUOTA_SET,
                         OPT::GLOBAL_QUOTA_ENABLE, OPT::GLOBAL_QUOTA_DISABLE,
+       OPT::GLOBAL_RATELIMIT_GET, OPT::GLOBAL_RATELIMIT_SET,
+                        OPT::GLOBAL_RATELIMIT_ENABLE, OPT::GLOBAL_RATELIMIT_DISABLE,
                         OPT::REALM_DELETE, OPT::REALM_GET, OPT::REALM_LIST,
                         OPT::REALM_LIST_PERIODS,
                         OPT::REALM_GET_DEFAULT,
@@ -3958,6 +4216,7 @@ int main(int argc, const char **argv)
                         OPT::PERIOD_GET_CURRENT,
                         OPT::PERIOD_LIST,
                         OPT::GLOBAL_QUOTA_GET,
+       OPT::GLOBAL_RATELIMIT_GET,
                         OPT::SYNC_INFO,
                         OPT::SYNC_STATUS,
                         OPT::ROLE_GET,
@@ -4252,6 +4511,100 @@ int main(int argc, const char **argv)
         formatter->flush(cout);
       }
       break;
+    case OPT::GLOBAL_RATELIMIT_GET:
+    case OPT::GLOBAL_RATELIMIT_SET:
+    case OPT::GLOBAL_RATELIMIT_ENABLE:
+    case OPT::GLOBAL_RATELIMIT_DISABLE:
+      {
+        if (realm_id.empty()) {
+          RGWRealm realm(g_ceph_context, static_cast<rgw::sal::RadosStore*>(store)->svc()->sysobj);
+          if (!realm_name.empty()) {
+            // look up realm_id for the given realm_name
+            int ret = realm.read_id(dpp(), realm_name, realm_id, null_yield);
+            if (ret < 0) {
+              cerr << "ERROR: failed to read realm for " << realm_name
+                  << ": " << cpp_strerror(-ret) << std::endl;
+              return -ret;
+            }
+          } else {
+            // use default realm_id when none is given
+            int ret = realm.read_default_id(dpp(), realm_id, null_yield);
+            if (ret < 0 && ret != -ENOENT) { // on ENOENT, use empty realm_id
+              cerr << "ERROR: failed to read default realm: "
+                  << cpp_strerror(-ret) << std::endl;
+              return -ret;
+            }
+          }
+        }
+
+        RGWPeriodConfig period_config;
+        int ret = period_config.read(dpp(), static_cast<rgw::sal::RadosStore*>(store)->svc()->sysobj, realm_id, null_yield);
+        if (ret < 0 && ret != -ENOENT) {
+          cerr << "ERROR: failed to read period config: "
+              << cpp_strerror(-ret) << std::endl;
+          return -ret;
+        }
+        bool ratelimit_configured = true;
+        formatter->open_object_section("period_config");
+        if (ratelimit_scope == "bucket") {
+          ratelimit_configured = set_ratelimit_info(period_config.bucket_ratelimit, opt_cmd,
+                         max_read_ops, max_write_ops,
+                         max_read_bytes, max_write_bytes,
+                         have_max_read_ops, have_max_write_ops,
+                         have_max_read_bytes, have_max_write_bytes);
+          encode_json("bucket_ratelimit", period_config.bucket_ratelimit, formatter.get());
+        } else if (ratelimit_scope == "user") {
+          ratelimit_configured = set_ratelimit_info(period_config.user_ratelimit, opt_cmd,
+                         max_read_ops, max_write_ops,
+                         max_read_bytes, max_write_bytes,
+                         have_max_read_ops, have_max_write_ops,
+                         have_max_read_bytes, have_max_write_bytes);
+          encode_json("user_ratelimit", period_config.user_ratelimit, formatter.get());
+        } else if (ratelimit_scope == "anonymous") {
+          ratelimit_configured = set_ratelimit_info(period_config.anon_ratelimit, opt_cmd,
+                         max_read_ops, max_write_ops,
+                         max_read_bytes, max_write_bytes,
+                         have_max_read_ops, have_max_write_ops,
+                         have_max_read_bytes, have_max_write_bytes);
+          encode_json("anonymous_ratelimit", period_config.anon_ratelimit, formatter.get());
+        } else if (ratelimit_scope.empty() && opt_cmd == OPT::GLOBAL_RATELIMIT_GET) {
+          // if no scope is given for GET, print both
+          encode_json("bucket_ratelimit", period_config.bucket_ratelimit, formatter.get());
+          encode_json("user_ratelimit", period_config.user_ratelimit, formatter.get());
+          encode_json("anonymous_ratelimit", period_config.anon_ratelimit, formatter.get());
+        } else {
+          cerr << "ERROR: invalid rate limit scope specification. Please specify "
+              "either --ratelimit-scope=bucket, or --ratelimit-scope=user or --ratelimit-scope=anonymous" << std::endl;
+          return EINVAL;
+        }
+        if (!ratelimit_configured) {
+          cerr << "ERROR: no rate limit values have been specified" << std::endl;
+          return EINVAL;
+        }
+
+        formatter->close_section();
+
+        if (opt_cmd != OPT::GLOBAL_RATELIMIT_GET) {
+          // write the modified period config
+          ret = period_config.write(dpp(), static_cast<rgw::sal::RadosStore*>(store)->svc()->sysobj, realm_id, null_yield);
+          if (ret < 0) {
+            cerr << "ERROR: failed to write period config: "
+                << cpp_strerror(-ret) << std::endl;
+            return -ret;
+          }
+          if (!realm_id.empty()) {
+            cout << "Global ratelimit changes saved. Use 'period update' to apply "
+                "them to the staging period, and 'period commit' to commit the "
+                "new period." << std::endl;
+          } else {
+            cout << "Global ratelimit changes saved. They will take effect as "
+                "the gateways are restarted." << std::endl;
+          }
+        }
+
+        formatter->flush(cout);
+      }
+      break;
     case OPT::GLOBAL_QUOTA_GET:
     case OPT::GLOBAL_QUOTA_SET:
     case OPT::GLOBAL_QUOTA_ENABLE:
@@ -6413,6 +6766,19 @@ int main(int argc, const char **argv)
       cerr << "failure: " << cpp_strerror(-r) << ": " << err << std::endl;
       return -r;
     }
+    RGWRateLimitInfo ratelimit_info;
+    std::unique_ptr<rgw::sal::Bucket> bucket_sal;
+    auto iter = bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+    if(iter != bucket->get_attrs().end()) {
+      try {
+        std::cerr << "here" << std::endl;
+        bufferlist& bl = iter->second;
+        auto biter = bl.cbegin();
+        decode(ratelimit_info, biter);
+      } catch (buffer::error& err) {
+        cerr << "ERROR: failed to decode rate limit" << std::endl;
+      }
+    }
   }
 
   if (opt_cmd == OPT::BUCKET_LINK) {
@@ -9134,6 +9500,57 @@ next:
     }
   }
 
+  bool ratelimit_op_set = (opt_cmd == OPT::RATELIMIT_SET || opt_cmd == OPT::RATELIMIT_ENABLE || opt_cmd == OPT::RATELIMIT_DISABLE);
+  bool ratelimit_op_get = opt_cmd == OPT::RATELIMIT_GET;
+  if (ratelimit_op_set) {
+    if (bucket_name.empty() && rgw::sal::User::empty(user)) {
+      cerr << "ERROR: bucket name or uid is required for ratelimit operation" << std::endl;
+      return EINVAL;
+    }
+
+    if (!bucket_name.empty()) {
+      if (!ratelimit_scope.empty() && ratelimit_scope != "bucket") {
+        cerr << "ERROR: invalid ratelimit scope specification. (bucket scope is not bucket but bucket has been specified)" << std::endl;
+        return EINVAL;
+      }
+      return set_bucket_ratelimit(store, opt_cmd, tenant, bucket_name,
+                           max_read_ops, max_write_ops,
+                           max_read_bytes, max_write_bytes,
+                           have_max_read_ops, have_max_write_ops,
+                           have_max_read_bytes, have_max_write_bytes);
+    } else if (!rgw::sal::User::empty(user)) {
+      } if (ratelimit_scope == "user") {
+        return set_user_ratelimit(opt_cmd, user, max_read_ops, max_write_ops,
+                         max_read_bytes, max_write_bytes,
+                         have_max_read_ops, have_max_write_ops,
+                         have_max_read_bytes, have_max_write_bytes);
+      } else {
+        cerr << "ERROR: invalid ratelimit scope specification. Please specify either --ratelimit-scope=bucket, or --ratelimit-scope=user" << std::endl;
+        return EINVAL;
+      }
+  }
+
+  if (ratelimit_op_get) {
+    if (bucket_name.empty() && rgw::sal::User::empty(user)) {
+      cerr << "ERROR: bucket name or uid is required for ratelimit operation" << std::endl;
+      return EINVAL;
+    }
+
+    if (!bucket_name.empty()) {
+      if (!ratelimit_scope.empty() && ratelimit_scope != "bucket") {
+        cerr << "ERROR: invalid ratelimit scope specification. (bucket scope is not bucket but bucket has been specified)" << std::endl;
+        return EINVAL;
+      }
+      return show_bucket_ratelimit(store, tenant, bucket_name, formatter.get());
+    } else if (!rgw::sal::User::empty(user)) {
+      } if (ratelimit_scope == "user") {
+        return show_user_ratelimit(user, formatter.get());
+      } else {
+        cerr << "ERROR: invalid ratelimit scope specification. Please specify either --ratelimit-scope=bucket, or --ratelimit-scope=user" << std::endl;
+        return EINVAL;
+      }
+  }
+
   if (opt_cmd == OPT::MFA_CREATE) {
     rados::cls::otp::otp_info_t config;
 
index fca29fddbd271909f5090d86224c8c0dc265eec8..d6001a67f16ca26aa12dc34c770d1854297b8997 100644 (file)
@@ -265,10 +265,11 @@ void handle_connection(boost::asio::io_context& context,
       string user = "-";
       const auto started = ceph::coarse_real_clock::now();
       ceph::coarse_real_clock::duration latency{};
-
       process_request(env.store, env.rest, &req, env.uri_prefix,
                       *env.auth_registry, &client, env.olog, y,
-                      scheduler, &user, &latency, &http_ret);
+                      scheduler, &user, &latency,
+                      env.ratelimiting->get_active(),
+                      &http_ret);
 
       if (cct->_conf->subsys.should_gather(dout_subsys, 1)) {
         // access log line elements begin per Apache Combined Log Format with additions following
index 1f0c1ef94f853507332a9b97495963fef0c00f1e..a2797b47d0c22da360a7cce402827669ecded673 100644 (file)
@@ -247,6 +247,7 @@ struct RGWBucketAdminOpState {
   std::unique_ptr<rgw::sal::Bucket>  bucket;
 
   RGWQuotaInfo quota;
+  RGWRateLimitInfo ratelimit_info;
 
   void set_fetch_stats(bool value) { stat_buckets = value; }
   void set_check_objects(bool value) { check_objects = value; }
@@ -274,6 +275,9 @@ struct RGWBucketAdminOpState {
   void set_quota(RGWQuotaInfo& value) {
     quota = value;
   }
+  void set_bucket_ratelimit(RGWRateLimitInfo& value) {
+    ratelimit_info = value;
+  }
 
 
   void set_sync_bucket(bool value) { sync_bucket = value; }
index 6fb22dbb2258145a07a5c9d922e6f1a578eb856e..5262460905c828875992d067618bf88aa59f0f1f 100644 (file)
@@ -2628,6 +2628,24 @@ void op_type_to_str(uint32_t mask, char *buf, int len)
   return mask_to_str(op_type_flags, mask, buf, len);
 }
 
+void RGWRateLimitInfo::decode_json(JSONObj *obj)
+{
+  JSONDecoder::decode_json("max_read_ops", max_read_ops, obj);
+  JSONDecoder::decode_json("max_write_ops", max_write_ops, obj);
+  JSONDecoder::decode_json("max_read_bytes", max_read_ops, obj);
+  JSONDecoder::decode_json("max_write_bytes", max_write_ops, obj);
+  JSONDecoder::decode_json("enabled", enabled, obj);
+}
+
+void RGWRateLimitInfo::dump(Formatter *f) const
+{
+  f->dump_int("max_read_ops", max_read_ops);
+  f->dump_int("max_write_ops", max_write_ops);
+  f->dump_int("max_read_bytes", max_read_bytes);
+  f->dump_int("max_write_bytes", max_write_bytes);
+  f->dump_bool("enabled", enabled);
+}
+
 void RGWUserInfo::dump(Formatter *f) const
 {
 
index f7170a2af285d095924cfcc929876e5dc9f050b9..1f7fa4f26f8ccc70f7904d32acdc826b1b9ee318 100644 (file)
@@ -18,6 +18,8 @@
 
 #include <array>
 #include <string_view>
+#include <atomic>
+#include <unordered_map>
 
 #include "common/ceph_crypto.h"
 #include "common/random_string.h"
@@ -66,6 +68,7 @@ using ceph::crypto::MD5;
 #define RGW_SYS_PARAM_PREFIX "rgwx-"
 
 #define RGW_ATTR_ACL           RGW_ATTR_PREFIX "acl"
+#define RGW_ATTR_RATELIMIT             RGW_ATTR_PREFIX "ratelimit"
 #define RGW_ATTR_LC            RGW_ATTR_PREFIX "lc"
 #define RGW_ATTR_CORS          RGW_ATTR_PREFIX "cors"
 #define RGW_ATTR_ETAG          RGW_ATTR_PREFIX "etag"
@@ -304,6 +307,7 @@ struct rgw_err {
   std::string message;
 };
 
+
 /* Helper class used for RGWHTTPArgs parsing */
 class NameVal
 {
@@ -691,6 +695,43 @@ void decode_json_obj(rgw_placement_rule& v, JSONObj *obj);
 inline std::ostream& operator<<(std::ostream& out, const rgw_placement_rule& rule) {
   return out << rule.to_str();
 }
+
+class RateLimiter;
+struct RGWRateLimitInfo {
+  int64_t max_write_ops;
+  int64_t max_read_ops;
+  int64_t max_write_bytes;
+  int64_t max_read_bytes;
+  bool enabled = false;
+  RGWRateLimitInfo()
+    : max_write_ops(0), max_read_ops(0), max_write_bytes(0), max_read_bytes(0)  {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(max_write_ops, bl);
+    encode(max_read_ops, bl);
+    encode(max_write_bytes, bl);
+    encode(max_read_bytes, bl);
+    encode(enabled, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(max_write_ops,bl);
+    decode(max_read_ops, bl);
+    decode(max_write_bytes,bl);
+    decode(max_read_bytes, bl);
+    decode(enabled, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+
+  void decode_json(JSONObj *obj);
+
+};
+WRITE_CLASS_ENCODER(RGWRateLimitInfo)
+
 struct RGWUserInfo
 {
   rgw_user user_id;
@@ -1510,6 +1551,11 @@ struct req_state : DoutPrefixProvider {
   rgw::io::BasicClient *cio{nullptr};
   http_op op{OP_UNKNOWN};
   RGWOpType op_type{};
+  std::shared_ptr<RateLimiter> ratelimit_data;
+  RGWRateLimitInfo user_ratelimit;
+  RGWRateLimitInfo bucket_ratelimit;
+  std::string ratelimit_bucket_marker;
+  std::string ratelimit_user_name;
   bool content_started{false};
   int format{0};
   ceph::Formatter *formatter{nullptr};
index 547e74ef4b9844f0c87e6844ef509670762940c8..27078f04c4e89d1ff5f22ac44e7ec1a1596950f6 100644 (file)
@@ -133,10 +133,11 @@ void RGWLoadGenProcess::handle_request(const DoutPrefixProvider *dpp, RGWRequest
 
   RGWLoadGenIO real_client_io(&env);
   RGWRestfulIO client_io(cct, &real_client_io);
-
+  ActiveRateLimiter ratelimit(cct);
   int ret = process_request(store, rest, req, uri_prefix,
                             *auth_registry, &client_io, olog,
-                            null_yield, nullptr, nullptr, nullptr);
+                            null_yield, nullptr, nullptr, nullptr,
+                            ratelimit.get_active());
   if (ret < 0) {
     /* we don't really care about return code */
     dout(20) << "process_request() returned " << ret << dendl;
index 5442d3d117543540d6e6a3eef759c393067a4152..a932433e8c41c4b93e1c20183dddc0a6e69d7f40 100644 (file)
@@ -536,6 +536,9 @@ int radosgw_Main(int argc, const char **argv)
   rgw::dmclock::SchedulerCtx sched_ctx{cct.get()};
 
   OpsLogManifold *olog = new OpsLogManifold();
+  ActiveRateLimiter ratelimiting{cct.get()};
+  ratelimiting.start();
+
   if (!g_conf()->rgw_ops_log_socket_path.empty()) {
     OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
     olog_socket->init(g_conf()->rgw_ops_log_socket_path);
@@ -603,7 +606,7 @@ int radosgw_Main(int argc, const char **argv)
       std::string uri_prefix;
       config->get_val("prefix", "", &uri_prefix);
 
-      RGWProcessEnv env = { store, &rest, olog, port, uri_prefix, auth_registry };
+      RGWProcessEnv env = { store, &rest, olog, port, uri_prefix, auth_registry, &ratelimiting };
 
       fe = new RGWLoadGenFrontend(env, config);
     }
@@ -612,7 +615,7 @@ int radosgw_Main(int argc, const char **argv)
       config->get_val("port", 80, &port);
       std::string uri_prefix;
       config->get_val("prefix", "", &uri_prefix);
-      RGWProcessEnv env{ store, &rest, olog, port, uri_prefix, auth_registry };
+      RGWProcessEnv env{ store, &rest, olog, port, uri_prefix, auth_registry, &ratelimiting };
       fe = new RGWAsioFrontend(env, config, sched_ctx);
     }
 
index 80338086c750966496ad9307b1621fe75791ba1e..f8d43a3258d225ee735018bb5ee11168a504ff8f 100644 (file)
@@ -6,6 +6,7 @@
 #include "common/WorkQueue.h"
 #include "include/scope_guard.h"
 
+#include <utility>
 #include "rgw_dmclock_scheduler.h"
 #include "rgw_rest.h"
 #include "rgw_frontend.h"
@@ -18,6 +19,8 @@
 #include "rgw_lua.h"
 #include "rgw_lua_request.h"
 #include "rgw_tracer.h"
+#include "rgw_ratelimit.h"
+
 #include "services/svc_zone_utils.h"
 
 #define dout_subsys ceph_subsys_rgw
@@ -87,12 +90,83 @@ void RGWProcess::RGWWQ::_process(RGWRequest *req, ThreadPool::TPHandle &) {
   process->req_throttle.put(1);
   perfcounter->inc(l_rgw_qactive, -1);
 }
+bool rate_limit(rgw::sal::Store* store, req_state* s) {
+  // we dont want to limit health check or system or admin requests
+  const auto& is_admin_or_system = s->user->get_info();
+  if ((s->op_type ==  RGW_OP_GET_HEALTH_CHECK) || is_admin_or_system.admin || is_admin_or_system.system)
+    return false;
+  std::string userfind;
+  RGWRateLimitInfo global_user;
+  RGWRateLimitInfo global_bucket;
+  RGWRateLimitInfo global_anon;
+  RGWRateLimitInfo* bucket_ratelimit;
+  RGWRateLimitInfo* user_ratelimit;
+  store->get_ratelimit(global_bucket, global_user, global_anon);
+  bucket_ratelimit = &global_bucket;
+  user_ratelimit = &global_user;
+  s->user->get_id().to_str(userfind);
+  userfind = "u" + userfind;
+  s->ratelimit_user_name = userfind;
+  std::string bucketfind = !rgw::sal::Bucket::empty(s->bucket.get()) ? "b" + s->bucket->get_marker() : "";
+  s->ratelimit_bucket_marker = bucketfind;
+  const char *method = s->info.method;
+
+  auto iter = s->user->get_attrs().find(RGW_ATTR_RATELIMIT);
+  if(iter != s->user->get_attrs().end()) {
+    try {
+      RGWRateLimitInfo user_ratelimit_temp;
+      bufferlist& bl = iter->second;
+      auto biter = bl.cbegin();
+      decode(user_ratelimit_temp, biter);
+      // override global rate limiting only if local rate limiting is enabled
+      if (user_ratelimit_temp.enabled)
+        *user_ratelimit = user_ratelimit_temp;
+    } catch (buffer::error& err) {
+      ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
+      return -EIO;
+    }
+  }
+  if (s->user->get_id().id == RGW_USER_ANON_ID && global_anon.enabled) {
+    *user_ratelimit = global_anon;
+  }
+  bool limit_bucket = false;
+  bool limit_user = s->ratelimit_data->should_rate_limit(method, s->ratelimit_user_name, s->time, user_ratelimit);
+
+  if(!rgw::sal::Bucket::empty(s->bucket.get()))
+  {
+    iter = s->bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+    if(iter != s->bucket->get_attrs().end()) {
+      try {
+        RGWRateLimitInfo bucket_ratelimit_temp;
+        bufferlist& bl = iter->second;
+        auto biter = bl.cbegin();
+        decode(bucket_ratelimit_temp, biter);
+        // override global rate limiting only if local rate limiting is enabled
+        if (bucket_ratelimit_temp.enabled)
+          *bucket_ratelimit = bucket_ratelimit_temp;
+      } catch (buffer::error& err) {
+        ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
+        return -EIO;
+      }
+    }
+    if (!limit_user) {
+      limit_bucket = s->ratelimit_data->should_rate_limit(method, s->ratelimit_bucket_marker, s->time, bucket_ratelimit);
+    }
+  }
+  if(limit_bucket && !limit_user) {
+    s->ratelimit_data->giveback_tokens(method, s->ratelimit_user_name);
+  }
+  s->user_ratelimit = *user_ratelimit;
+  s->bucket_ratelimit = *bucket_ratelimit;
+  return (limit_user || limit_bucket);
+}
 
 int rgw_process_authenticated(RGWHandler_REST * const handler,
                               RGWOp *& op,
                               RGWRequest * const req,
                               req_state * const s,
-                             optional_yield y,
+                                               optional_yield y,
+                              rgw::sal::Store* store,
                               const bool skip_retarget)
 {
   ldpp_dout(op, 2) << "init permissions" << dendl;
@@ -169,6 +243,10 @@ int rgw_process_authenticated(RGWHandler_REST * const handler,
   ldpp_dout(op, 2) << "pre-executing" << dendl;
   op->pre_exec();
 
+  ldpp_dout(op, 2) << "check rate limiting" << dendl;
+  if (rate_limit(store, s)) {
+    return -ERR_RATE_LIMITED;
+  }
   ldpp_dout(op, 2) << "executing" << dendl;
   {
     auto span = tracing::rgw::tracer.add_span("execute", s->trace);
@@ -194,6 +272,7 @@ int process_request(rgw::sal::Store* const store,
                    rgw::dmclock::Scheduler *scheduler,
                     string* user,
                     ceph::coarse_real_clock::duration* latency,
+                    std::shared_ptr<RateLimiter> ratelimit,
                     int* http_ret)
 {
   int ret = client_io->init(g_ceph_context);
@@ -207,6 +286,7 @@ int process_request(rgw::sal::Store* const store,
   struct req_state rstate(g_ceph_context, &rgw_env, req->id);
   struct req_state *s = &rstate;
 
+  s->ratelimit_data = ratelimit;
   std::unique_ptr<rgw::sal::User> u = store->get_user(rgw_user());
   s->set_user(u);
 
@@ -311,7 +391,7 @@ int process_request(rgw::sal::Store* const store,
     s->trace->SetAttribute(tracing::rgw::OP, op->name());
     s->trace->SetAttribute(tracing::rgw::TYPE, tracing::rgw::REQUEST);
 
-    ret = rgw_process_authenticated(handler, op, req, s, yield);
+    ret = rgw_process_authenticated(handler, op, req, s, yield, store);
     if (ret < 0) {
       abort_early(s, op, ret, handler, yield);
       goto done;
@@ -353,7 +433,6 @@ done:
     dout(0) << "ERROR: client_io->complete_request() returned "
             << e.what() << dendl;
   }
-
   if (should_log) {
     rgw_log_op(rest, s, (op ? op->name() : "unknown"), olog);
   }
@@ -382,7 +461,6 @@ done:
   if (latency) {
     *latency = lat;
   }
-
   dout(1) << "====== req done req=" << hex << req << dec
          << " op status=" << op_ret
          << " http_status=" << s->err.http_ret
index 1aac4a6cdf1bc8e4cbe7a6cdf6ce4d788178594e..edf76a443e30f4f66d50d8998aff61ec670c5074 100644 (file)
@@ -10,7 +10,7 @@
 #include "rgw_user.h"
 #include "rgw_op.h"
 #include "rgw_rest.h"
-
+#include "rgw_ratelimit.h"
 #include "include/ceph_assert.h"
 
 #include "common/WorkQueue.h"
@@ -38,6 +38,8 @@ struct RGWProcessEnv {
   int port;
   std::string uri_prefix;
   std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
+  //maybe there is a better place to store the rate limit data structure
+  ActiveRateLimiter* ratelimiting;
 };
 
 class RGWFrontendConfig;
@@ -174,13 +176,15 @@ extern int process_request(rgw::sal::Store* store,
                            rgw::dmclock::Scheduler *scheduler,
                            std::string* user,
                            ceph::coarse_real_clock::duration* latency,
+                           std::shared_ptr<RateLimiter> ratelimit,
                            int* http_ret = nullptr);
 
 extern int rgw_process_authenticated(RGWHandler_REST* handler,
                                      RGWOp*& op,
                                      RGWRequest* req,
                                      req_state* s,
-                                    optional_yield y,
+                                                            optional_yield y,
+                                     rgw::sal::Store* store,
                                      bool skip_retarget = false);
 
 #if defined(def_dout_subsys)
diff --git a/src/rgw/rgw_ratelimit.h b/src/rgw/rgw_ratelimit.h
new file mode 100644 (file)
index 0000000..2639d4d
--- /dev/null
@@ -0,0 +1,292 @@
+#pragma once
+#include <chrono>
+#include <thread>
+#include <condition_variable>
+#include "rgw_common.h"
+
+
+class RateLimiterEntry {
+  /* 
+    fixed_point_rgw_ratelimit is important to preserve the precision of the token calculation
+    for example: a user have a limit of single op per minute, the user will consume its single token and then will send another request, 1s after it.
+    in that case, without this method, the user will get 0 tokens although it should get 0.016 tokens.
+    using this method it will add 16 tokens to the user, and the user will have 16 tokens, each time rgw will do comparison rgw will divide by fixed_point_rgw_ratelimit, so the user will be blocked anyway until it has enough tokens.
+  */
+  static constexpr int64_t fixed_point_rgw_ratelimit = 1000;
+  // counters are tracked in multiples of fixed_point_rgw_ratelimit
+  struct counters {
+    int64_t ops = 0;
+    int64_t bytes = 0;
+  };
+  counters read;
+  counters write;
+  ceph::timespan ts;
+  bool first_run = true;
+  std::mutex ts_lock;
+  // Those functions are returning the integer value of the tokens 
+  int64_t read_ops () const
+  {
+    return read.ops / fixed_point_rgw_ratelimit;
+  }
+  int64_t write_ops() const
+  {
+    return write.ops / fixed_point_rgw_ratelimit;
+  }
+  int64_t read_bytes() const
+  {
+    return read.bytes / fixed_point_rgw_ratelimit;
+  }
+  int64_t write_bytes() const
+  {
+    return write.bytes / fixed_point_rgw_ratelimit;
+  }
+  bool should_rate_limit_read(int64_t ops_limit, int64_t bw_limit) {
+    //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
+    if(((read_ops() - 1 < 0) && (ops_limit > 0)) ||
+      (read_bytes() < 0 && bw_limit > 0))
+  {
+    return true;
+  }
+    // we don't want to reduce ops' tokens if we've rejected it.
+    read.ops -= fixed_point_rgw_ratelimit;
+    return false;
+  }
+  bool should_rate_limit_write(int64_t ops_limit, int64_t bw_limit) 
+  {
+    //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
+    if(((write_ops() - 1 < 0) && (ops_limit > 0)) ||
+      (write_bytes() < 0 && bw_limit > 0))
+    {
+      return true;
+    }
+
+    // we don't want to reduce ops' tokens if we've rejected it.
+    write.ops -= fixed_point_rgw_ratelimit;
+    return false;
+  }
+  /* The purpose of this function is to minimum time before overriding the stored timestamp
+     This function is necessary to force the increase tokens add at least 1 token when it updates the last stored timestamp.
+     That way the user/bucket will not lose tokens because of rounding
+  */
+  bool minimum_time_reached(ceph::timespan curr_timestamp) const
+  {
+    using namespace std::chrono;
+    constexpr auto min_duration = duration_cast<ceph::timespan>(seconds(60)) / fixed_point_rgw_ratelimit;
+    const auto delta = curr_timestamp - ts;
+    if (delta < min_duration)
+    {
+      return false;
+    }
+    return true;
+  }
+
+  void increase_tokens(ceph::timespan curr_timestamp,
+                       const RGWRateLimitInfo* info)
+  {
+    constexpr int fixed_point = fixed_point_rgw_ratelimit;
+    if (first_run)
+    {
+      write.ops = info->max_write_ops * fixed_point;
+      write.bytes = info->max_write_bytes * fixed_point;
+      read.ops = info->max_read_ops * fixed_point;
+      read.bytes = info->max_read_bytes * fixed_point;
+      ts = curr_timestamp;
+      first_run = false;
+      return;
+    }
+    else if(curr_timestamp > ts && minimum_time_reached(curr_timestamp))
+    {
+      const int64_t time_in_ms = std::chrono::duration_cast<std::chrono::milliseconds>(curr_timestamp - ts).count() / 60.0 / std::milli::den * fixed_point; // / 60 to make it work with 1 min token bucket
+      ts = curr_timestamp;
+      const int64_t write_ops = info->max_write_ops * time_in_ms;
+      const int64_t write_bw = info->max_write_bytes * time_in_ms;
+      const int64_t read_ops = info->max_read_ops * time_in_ms;
+      const int64_t read_bw = info->max_read_bytes * time_in_ms;
+      read.ops = std::min(info->max_read_ops * fixed_point, read_ops + read.ops);
+      read.bytes = std::min(info->max_read_bytes * fixed_point, read_bw + read.bytes);
+      write.ops = std::min(info->max_write_ops * fixed_point, write_ops + write.ops);
+      write.bytes = std::min(info->max_write_bytes * fixed_point, write_bw + write.bytes);
+    }
+  }
+
+  public:
+    bool should_rate_limit(bool is_read, const RGWRateLimitInfo* ratelimit_info, ceph::timespan curr_timestamp)
+    {
+      std::unique_lock lock(ts_lock);
+      increase_tokens(curr_timestamp, ratelimit_info);
+      if (is_read)
+      {
+        return should_rate_limit_read(ratelimit_info->max_read_ops, ratelimit_info->max_read_bytes);
+      }
+      return should_rate_limit_write(ratelimit_info->max_write_ops, ratelimit_info->max_write_bytes);
+    }
+    void decrease_bytes(bool is_read, int64_t amount, const RGWRateLimitInfo* info) {
+      std::unique_lock lock(ts_lock);
+      // we don't want the tenant to be with higher debt than 120 seconds(2 min) of its limit
+      if (is_read)
+      {
+        read.bytes = std::max(read.bytes - amount * fixed_point_rgw_ratelimit,info->max_read_bytes * fixed_point_rgw_ratelimit * -2);
+      } else {
+        write.bytes = std::max(write.bytes - amount * fixed_point_rgw_ratelimit,info->max_write_bytes * fixed_point_rgw_ratelimit * -2);
+      }
+    }
+    void giveback_tokens(bool is_read)
+    {
+      std::unique_lock lock(ts_lock);
+      if (is_read) 
+      {
+        read.ops += fixed_point_rgw_ratelimit;
+      } else {
+        write.ops += fixed_point_rgw_ratelimit;
+      }
+    }
+};
+
+class RateLimiter {
+
+  static constexpr size_t map_size = 2000000; // will create it with the closest upper prime number
+  std::shared_mutex insert_lock;
+  std::atomic_bool& replacing;
+  std::condition_variable& cv;
+  typedef std::unordered_map<std::string, RateLimiterEntry> hash_map;
+  hash_map ratelimit_entries{map_size};
+  static bool is_read_op(const std::string_view method) {
+    if (method == "GET" || method == "HEAD")
+    {
+      return true;
+    }
+    return false;
+  }
+
+    // find or create an entry, and return its iterator
+  auto& find_or_create(const std::string& key) {
+    std::shared_lock rlock(insert_lock);
+    if (ratelimit_entries.size() > 0.9 * map_size && replacing == false)
+    {
+      replacing = true;
+      cv.notify_all();
+    }
+    auto ret = ratelimit_entries.find(key);
+    rlock.unlock();
+    if (ret == ratelimit_entries.end())
+    {
+      std::unique_lock wlock(insert_lock);
+      ret = ratelimit_entries.emplace(std::piecewise_construct,
+                                 std::forward_as_tuple(key),
+                                 std::forward_as_tuple()).first;
+    }
+    return ret->second;
+  }
+
+  
+
+  public:
+    RateLimiter(const RateLimiter&) = delete;
+    RateLimiter& operator =(const RateLimiter&) = delete;
+    RateLimiter(RateLimiter&&) = delete;
+    RateLimiter& operator =(RateLimiter&&) = delete;
+    RateLimiter() = delete;
+    RateLimiter(std::atomic_bool& replacing, std::condition_variable& cv)
+      : replacing(replacing), cv(cv)
+    {
+      // prevents rehash, so no iterators invalidation
+      ratelimit_entries.max_load_factor(1000);
+    };
+
+    bool should_rate_limit(const char *method, const std::string& key, ceph::coarse_real_time curr_timestamp, const RGWRateLimitInfo* ratelimit_info) {
+      if (key.empty() || key.length() == 1 || !ratelimit_info->enabled)
+      {
+        return false;
+      }
+      bool is_read = is_read_op(method);
+      auto& it = find_or_create(key);
+      auto curr_ts = curr_timestamp.time_since_epoch();
+      return it.should_rate_limit(is_read ,ratelimit_info, curr_ts);
+    }
+    void giveback_tokens(const char *method, const std::string& key)
+    {
+      bool is_read = is_read_op(method);
+      auto& it = find_or_create(key);
+      it.giveback_tokens(is_read);
+    }
+    void decrease_bytes(const char *method, const std::string& key, const int64_t amount, const RGWRateLimitInfo* info) {
+      if (key.empty() || key.length() == 1 || !info->enabled)
+      {
+        return;
+      }
+      bool is_read = is_read_op(method);
+      if ((is_read && !info->max_read_bytes) || (!is_read && !info->max_write_bytes))
+      {
+        return;
+      }
+      auto& it = find_or_create(key);
+      it.decrease_bytes(is_read, amount, info);
+    }
+    void clear() {
+      ratelimit_entries.clear();
+    }
+};
+// This class purpose is to hold 2 RateLimiter instances, one active and one passive.
+// once the active has reached the watermark for clearing it will call the replace_active() thread using cv
+// The replace_active will clear the previous RateLimiter after all requests to it has been done (use_count() > 1)
+// In the meanwhile new requests will come into the newer active
+class ActiveRateLimiter : public DoutPrefix  {
+  std::atomic_uint8_t stopped = {false};
+  std::condition_variable cv;
+  std::mutex cv_m;
+  std::thread runner;
+  std::atomic_bool replacing = false;
+  std::atomic_uint8_t current_active = 0;
+  std::shared_ptr<RateLimiter> ratelimit[2];
+  void replace_active() {
+    using namespace std::chrono_literals;
+    std::unique_lock<std::mutex> lk(cv_m);
+    while (!stopped) {
+      cv.wait(lk);
+      current_active = current_active ^ 1;
+      ldpp_dout(this, 20) << "replacing active ratelimit data structure" << dendl;
+      while (!stopped && ratelimit[(current_active ^ 1)].use_count() > 1 ) {
+        if (cv.wait_for(lk, 1min) != std::cv_status::timeout && stopped)
+        {
+          return;
+        }
+      }
+      if (stopped)
+      {
+        return;
+      }
+      ldpp_dout(this, 20) << "clearing passive ratelimit data structure" << dendl;
+      ratelimit[(current_active ^ 1)]->clear();
+      replacing = false;
+    }
+  }
+  public:
+    ActiveRateLimiter(const ActiveRateLimiter&) = delete;
+    ActiveRateLimiter& operator =(const ActiveRateLimiter&) = delete;
+    ActiveRateLimiter(ActiveRateLimiter&&) = delete;
+    ActiveRateLimiter& operator =(ActiveRateLimiter&&) = delete;
+    ActiveRateLimiter() = delete;
+    ActiveRateLimiter(CephContext* cct) :
+      DoutPrefix(cct, ceph_subsys_rgw, "rate limiter: ")
+    {
+      ratelimit[0] = std::make_shared<RateLimiter>(replacing, cv);
+      ratelimit[1] = std::make_shared<RateLimiter>(replacing, cv);
+    }
+    ~ActiveRateLimiter() {
+      ldpp_dout(this, 20) << "stopping ratelimit_gc thread" << dendl;
+      cv_m.lock();
+      stopped = true;
+      cv_m.unlock();
+      cv.notify_all();
+      runner.join();
+    }
+    std::shared_ptr<RateLimiter> get_active() {
+      return ratelimit[current_active];
+    }
+    void start() {
+      ldpp_dout(this, 20) << "starting ratelimit_gc thread" << dendl;
+      runner = std::thread(&ActiveRateLimiter::replace_active, this);
+      const auto rc = ceph_pthread_setname(runner.native_handle(), "ratelimit_gc");
+      ceph_assert(rc==0);
+    }
+};
index 51e89fd10a48b0ce6dc537d9a991c6393a164b7a..fd2b781fa209549ead4b42ac1b40fde403b64cb2 100644 (file)
@@ -28,6 +28,7 @@
 #include "rgw_resolve.h"
 #include "rgw_sal_rados.h"
 
+#include "rgw_ratelimit.h"
 #include <numeric>
 
 #define dout_subsys ceph_subsys_rgw
@@ -759,6 +760,16 @@ int dump_body(struct req_state* const s,
               const char* const buf,
               const size_t len)
 {
+  bool healthchk = false;
+  // we dont want to limit health checks
+  if(s->op_type == RGW_OP_GET_HEALTH_CHECK)
+    healthchk = true;
+  if(len > 0 && !healthchk) {
+    const char *method = s->info.method;
+    s->ratelimit_data->decrease_bytes(method, s->ratelimit_user_name, len, &s->user_ratelimit);
+    if(!rgw::sal::Bucket::empty(s->bucket.get()))
+      s->ratelimit_data->decrease_bytes(method, s->ratelimit_bucket_marker, len, &s->bucket_ratelimit);
+  }
   try {
     return RESTFUL_IO(s)->send_body(buf, len);
   } catch (rgw::io::Exception& e) {
@@ -780,11 +791,24 @@ int recv_body(struct req_state* const s,
               char* const buf,
               const size_t max)
 {
+  int len;
   try {
-    return RESTFUL_IO(s)->recv_body(buf, max);
+    len = RESTFUL_IO(s)->recv_body(buf, max);
   } catch (rgw::io::Exception& e) {
     return -e.code().value();
   }
+  bool healthchk = false;
+  // we dont want to limit health checks
+  if(s->op_type ==  RGW_OP_GET_HEALTH_CHECK)
+    healthchk = true;
+  if(len > 0 && !healthchk) {
+    const char *method = s->info.method;
+    s->ratelimit_data->decrease_bytes(method, s->ratelimit_user_name, len, &s->user_ratelimit);
+    if(!rgw::sal::Bucket::empty(s->bucket.get()))
+      s->ratelimit_data->decrease_bytes(method, s->ratelimit_bucket_marker, len, &s->bucket_ratelimit);
+  }
+  return len;
+
 }
 
 int RGWGetObj_ObjStore::get_params(optional_yield y)
index 414f3a31c1a4c4095c8aae0f550260aebbdf326d..f8a966d851c46a3a8e8838467d8d7c7539599a5f 100644 (file)
@@ -2378,7 +2378,7 @@ int RGWSwiftWebsiteHandler::serve_errordoc(const int http_ret,
 
   RGWOp* newop = &get_errpage_op;
   RGWRequest req(0);
-  return rgw_process_authenticated(handler, newop, &req, s, y, true);
+  return rgw_process_authenticated(handler, newop, &req, s, y, store, true);
 }
 
 int RGWSwiftWebsiteHandler::error_handler(const int err_no,
index c815032164f11f2573407d8b5abc2a74f06a4dd9..baa535058a5587e6d13d99fc1e40142584521eef 100644 (file)
@@ -311,6 +311,8 @@ class Store {
                                        const std::map<std::string, std::string>& meta) = 0;
     /** Get default quota info.  Used as fallback if a user or bucket has no quota set*/
     virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) = 0;
+    /** Get global rate limit configuration*/
+    virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) = 0;
     /** Enable or disable a set of bucket.  e.g. if a User is suspended */
     virtual int set_buckets_enabled(const DoutPrefixProvider* dpp, std::vector<rgw_bucket>& buckets, bool enabled) = 0;
     /** Get a new request ID */
@@ -486,12 +488,7 @@ class User {
     virtual int read_attrs(const DoutPrefixProvider* dpp, optional_yield y) = 0;
     /** Set the attributes in attrs, leaving any other existing attrs set, and
      * write them to the backing store; a merge operation */
-    virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) {
-      for(auto& it : new_attrs) {
-       attrs[it.first] = it.second;
-      }
-      return 0;
-    }
+    virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0;
     virtual int read_stats(const DoutPrefixProvider *dpp,
                            optional_yield y, RGWStorageStats* stats,
                           ceph::real_time* last_stats_sync = nullptr,
@@ -507,8 +504,10 @@ class User {
                           std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) = 0;
     /** Trim User usage stats to the given epoch range */
     virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) = 0;
-    virtual RGWObjVersionTracker& get_version_tracker() { return objv_tracker; }
-    virtual Attrs& get_attrs() { return attrs; }
+
+    /** Load this User from the backing store.  requires ID to be set, fills all other fields. */
+    virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) = 0;
+    /** Store this User to the backing store */
     virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info = nullptr) = 0;
     /** Remove this User from the backing store */
     virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) = 0;
@@ -681,12 +680,7 @@ class Bucket {
     virtual int check_quota(const DoutPrefixProvider *dpp, RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) = 0;
     /** Set the attributes in attrs, leaving any other existing attrs set, and
      * write them to the backing store; a merge operation */
-    virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) {
-      for(auto& it : new_attrs) {
-       attrs[it.first] = it.second;
-      }
-      return 0;
-    }
+    virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0;
     /** Try to refresh the cached bucket info from the backing store.  Used in
      * read-modify-update loop. */
     virtual int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime) = 0;
index a81fc94685b18ce64da9bcadceddb60ed9f7e3d4..2026c8e5b7ef42d96d72a8d69a9baca5713082ad 100644 (file)
@@ -199,7 +199,13 @@ namespace rgw::sal {
 
     return ret;
   }
-
+  int DBUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
+  {
+    for(auto& it : new_attrs) {
+         attrs[it.first] = it.second;
+    }
+    return store_user(dpp, y, false);
+  }
   int DBUser::store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info)
   {
     int ret = 0;
@@ -322,7 +328,9 @@ namespace rgw::sal {
   {
     int ret = 0;
 
-    Bucket::merge_and_store_attrs(dpp, new_attrs, y);
+    for(auto& it : new_attrs) {
+           attrs[it.first] = it.second;
+    }
 
     /* XXX: handle has_instance_obj like in set_bucket_instance_attrs() */
 
@@ -1753,6 +1761,11 @@ namespace rgw::sal {
     return 0;
   }
 
+  void DBStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit)
+  {
+    return;
+  }
+
   void DBStore::get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota)
   {
     // XXX: Not handled for the first pass 
index 30da100d1084a9a3992bafeb1e9f35677952adce..02d622a0682e219360f9137aff4bb23ba471cfdd 100644 (file)
@@ -114,6 +114,7 @@ protected:
       virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) override;
 
       /* Placeholders */
+      virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) override;
       virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) override;
       virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info = nullptr) override;
       virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) override;
@@ -731,6 +732,7 @@ public:
       virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
       virtual int register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type,
           const map<string, string>& meta) override;
+      virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) override;
       virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) override;
       virtual int set_buckets_enabled(const DoutPrefixProvider *dpp, vector<rgw_bucket>& buckets, bool enabled) override;
       virtual uint64_t get_new_req_id() override { return 0; }
index be602880d0228469e1f7e2592639c62206046228..638b2270ab05ba72f5f31520d216d482f4c76c8c 100644 (file)
@@ -285,7 +285,9 @@ int RadosUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y)
 
 int RadosUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
 {
-  User::merge_and_store_attrs(dpp, new_attrs, y);
+  for(auto& it : new_attrs) {
+         attrs[it.first] = it.second;
+  }
   return store_user(dpp, y, false);
 }
 
@@ -728,7 +730,9 @@ int RadosBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuotaInfo& user_q
 
 int RadosBucket::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
 {
-  Bucket::merge_and_store_attrs(dpp, new_attrs, y);
+  for(auto& it : new_attrs) {
+         attrs[it.first] = it.second;
+  }
   return store->ctl()->bucket->set_bucket_instance_attrs(get_info(),
                                new_attrs, &get_info().objv_tracker, y, dpp);
 }
@@ -1213,6 +1217,13 @@ void RadosStore::get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota)
     user_quota = svc()->quota->get_user_quota();
 }
 
+void RadosStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit)
+{
+  bucket_ratelimit = svc()->zone->get_current_period().get_config().bucket_ratelimit;
+  user_ratelimit = svc()->zone->get_current_period().get_config().user_ratelimit;
+  anon_ratelimit = svc()->zone->get_current_period().get_config().anon_ratelimit;
+}
+
 int RadosStore::set_buckets_enabled(const DoutPrefixProvider* dpp, vector<rgw_bucket>& buckets, bool enabled)
 {
     return rados->set_buckets_enabled(buckets, enabled, dpp);
index 981bb2335b21fb5e524706378a1f8d43f60b03d9..4f5ef28988a020f4f4757deb1a9aee9ec8885959 100644 (file)
@@ -411,6 +411,7 @@ class RadosStore : public Store {
     virtual int register_to_service_map(const DoutPrefixProvider *dpp, const std::string& daemon_type,
                                const std::map<std::string, std::string>& meta) override;
     virtual void get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota) override;
+    virtual void get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) override;
     virtual int set_buckets_enabled(const DoutPrefixProvider* dpp, std::vector<rgw_bucket>& buckets, bool enabled) override;
     virtual uint64_t get_new_req_id() override { return rados->get_new_req_id(); }
     virtual int get_sync_policy_handler(const DoutPrefixProvider* dpp,
index c29224cbeb48ed9850a33becdff67651e599e44d..68bbcb552042b90cf630b7146be021154e56ee4a 100644 (file)
@@ -173,9 +173,13 @@ struct RGWUserAdminOpState {
 
   bool bucket_quota_specified{false};
   bool user_quota_specified{false};
+  bool bucket_ratelimit_specified{false};
+  bool user_ratelimit_specified{false};
 
   RGWQuotaInfo bucket_quota;
   RGWQuotaInfo user_quota;
+  RGWRateLimitInfo user_ratelimit;
+  RGWRateLimitInfo bucket_ratelimit;
 
   // req parameters for listing user
   std::string marker{""};
@@ -339,6 +343,16 @@ struct RGWUserAdminOpState {
     user_quota_specified = true;
   }
 
+  void set_bucket_ratelimit(RGWRateLimitInfo& ratelimit) {
+    bucket_ratelimit = ratelimit;
+    bucket_ratelimit_specified = true;
+  }
+
+  void set_user_ratelimit(RGWRateLimitInfo& ratelimit) {
+    user_ratelimit = ratelimit;
+    user_ratelimit_specified = true;
+  }
+
   void set_mfa_ids(const std::set<std::string>& ids) {
     mfa_ids = ids;
     mfa_ids_specified = true;
index e5ea2d625f8e231dd594563c2dbf5d76e30d21a5..5daffe58a8ac99335b2634af56bd4a1586134000 100644 (file)
@@ -2755,12 +2755,18 @@ void RGWPeriodConfig::dump(Formatter *f) const
 {
   encode_json("bucket_quota", bucket_quota, f);
   encode_json("user_quota", user_quota, f);
+  encode_json("user_ratelimit", user_ratelimit, f);
+  encode_json("bucket_ratelimit", bucket_ratelimit, f);
+  encode_json("anonymous_ratelimit", anon_ratelimit, f);
 }
 
 void RGWPeriodConfig::decode_json(JSONObj *obj)
 {
   JSONDecoder::decode_json("bucket_quota", bucket_quota, obj);
   JSONDecoder::decode_json("user_quota", user_quota, obj);
+  JSONDecoder::decode_json("user_ratelimit", user_ratelimit, obj);
+  JSONDecoder::decode_json("bucket_ratelimit", bucket_ratelimit, obj);
+  JSONDecoder::decode_json("anonymous_ratelimit", anon_ratelimit, obj);
 }
 
 void RGWRegionMap::dump(Formatter *f) const
index 9a0066d9ad152d2be694c99ec341e49763b1cc82..a84d492e1f52c91abed41c0c65d4db8d5edf9434 100644 (file)
@@ -1028,18 +1028,30 @@ struct RGWPeriodConfig
 {
   RGWQuotaInfo bucket_quota;
   RGWQuotaInfo user_quota;
+  RGWRateLimitInfo user_ratelimit;
+  RGWRateLimitInfo bucket_ratelimit;
+  // rate limit unauthenticated user
+  RGWRateLimitInfo anon_ratelimit;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(bucket_quota, bl);
     encode(user_quota, bl);
+    encode(bucket_ratelimit, bl);
+    encode(user_ratelimit, bl);
+    encode(anon_ratelimit, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(bucket_quota, bl);
     decode(user_quota, bl);
+    if (struct_v >= 2) {
+      decode(bucket_ratelimit, bl);
+      decode(user_ratelimit, bl);
+      decode(anon_ratelimit, bl);
+    }
     DECODE_FINISH(bl);
   }
 
index 1fc852247f400f70e04a91ee60a4a3772b8ac945..6784b966ca4451939bc1f2f3dacf2182cda8218e 100644 (file)
     quota set                  set quota params
     quota enable               enable quota
     quota disable              disable quota
+    ratelimit get              get ratelimit params
+    ratelimit set              set ratelimit params
+    ratelimit enable           enable ratelimit
+    ratelimit disable          disable ratelimit
     global quota get           view global quota params
     global quota set           set global quota params
     global quota enable        enable a global quota
     global quota disable       disable a global quota
+    global ratelimit get       view global ratelimit params
+    global ratelimit set       set global ratelimit params
+    global ratelimit enable    enable a ratelimit quota
+    global ratelimit disable   disable a ratelimit quota
     realm create               create a new realm
     realm rm                   remove a realm
     realm get                  show realm info
      --max-size                specify max size (in B/K/M/G/T, negative value to disable)
      --quota-scope             scope of quota (bucket, user)
   
+  Rate limiting options:
+     --max-read-ops            specify max requests per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited
+     --max-read-bytes          specify max bytes per minute for READ ops per RGW (GET and HEAD request methods), 0 means unlimited
+     --max-write-ops           specify max requests per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited
+     --max-write-bytes         specify max bytes per minute for WRITE ops per RGW (Not GET or HEAD request methods), 0 means unlimited
+     --ratelimit-scope         scope of rate limiting: bucket, user, anonymous
+                               anonymous can be configured only with global rate limit
+  
   Orphans search options:
      --num-shards              num of shards to use for keeping the temporary scan info
      --orphan-stale-secs       num of seconds to wait before declaring an object to be an orphan (default: 86400)
index 641fd69cd91cfbe3e0ac0cc5d072caf9bf770c76..2df70d88e2ef1b86470b6cb2a25b4633199c391d 100644 (file)
@@ -52,6 +52,16 @@ set(test_rgw_a_src test_rgw_common.cc)
 add_library(test_rgw_a STATIC ${test_rgw_a_src})
 target_link_libraries(test_rgw_a ${rgw_libs})
 
+add_executable(bench_rgw_ratelimit bench_rgw_ratelimit.cc)
+target_link_libraries(bench_rgw_ratelimit ${rgw_libs})
+
+add_executable(bench_rgw_ratelimit_gc bench_rgw_ratelimit_gc.cc )
+target_link_libraries(bench_rgw_ratelimit_gc ${rgw_libs})
+
+add_executable(unittest_rgw_ratelimit test_rgw_ratelimit.cc $<TARGET_OBJECTS:unit-main>)
+target_link_libraries(unittest_rgw_ratelimit ${rgw_libs})
+add_ceph_unittest(unittest_rgw_ratelimit)
+
 # ceph_test_rgw_manifest
 set(test_rgw_manifest_srcs test_rgw_manifest.cc)
 add_executable(ceph_test_rgw_manifest
diff --git a/src/test/rgw/bench_rgw_ratelimit.cc b/src/test/rgw/bench_rgw_ratelimit.cc
new file mode 100644 (file)
index 0000000..6c682fc
--- /dev/null
@@ -0,0 +1,247 @@
+#include "rgw/rgw_ratelimit.h"
+#include "rgw/rgw_common.h"
+#include "random"
+#include <cstdlib>
+#include <string>
+#include <boost/asio.hpp>
+#include <spawn/spawn.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <chrono>
+#include <mutex>
+#include <unordered_map>
+#include <atomic>
+#include <boost/program_options.hpp>
+
+
+using Executor = boost::asio::io_context::executor_type;
+std::uniform_int_distribution<unsigned int> dist(0, 1);
+std::random_device rd;
+std::default_random_engine rng{rd()};
+std::uniform_int_distribution<unsigned long long> disttenant(2, 100000000);
+struct client_info {
+    uint64_t accepted = 0;
+    uint64_t rejected = 0;
+    uint64_t ops = 0;
+    uint64_t bytes = 0;
+    uint64_t num_retries = 0;
+    std::string tenant;
+};
+
+struct parameters {
+    int64_t req_size = 1;
+    int64_t backend_bandwidth = 1;
+    size_t wait_between_retries_ms = 1;
+    int num_clients = 1;
+};
+std::shared_ptr<std::vector<client_info>> ds = std::make_shared<std::vector<client_info>>(std::vector<client_info>());
+
+std::string method[2] = {"PUT", "GET"};
+void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx)
+{
+    auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
+    boost::asio::steady_timer timer(ioctx);
+    int rw = 0; // will always use PUT method as there is no difference
+    std::string methodop(method[rw]);
+    auto req_size = params.req_size;
+    auto backend_bandwidth = params.backend_bandwidth;
+// the 4 * 1024 * 1024 is the RGW default we are sending in a typical environment
+    while (req_size) {
+        if (req_size <= backend_bandwidth) {
+            while (req_size > 0) {
+                if(req_size > 4*1024*1024) {
+                    ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info);
+                    it.bytes += 4*1024*1024;
+                    req_size = req_size - 4*1024*1024;
+                }
+                else {
+                    ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info);
+                    req_size = 0;
+                }
+            }
+        } else {
+                int64_t total_bytes = 0;
+                while (req_size > 0) {
+                if (req_size >= 4*1024*1024) {
+                    if (total_bytes >= backend_bandwidth)
+                    {
+                        timer.expires_after(std::chrono::seconds(1));
+                        timer.async_wait(yield);
+                        total_bytes = 0;
+                    }
+                    ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info);
+                    it.bytes += 4*1024*1024;
+                    req_size = req_size - 4*1024*1024;
+                    total_bytes += 4*1024*1024;
+                }
+                else {
+                    ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info);
+                    it.bytes += req_size;
+                    total_bytes += req_size;
+                    req_size = 0;
+                }
+            }
+        }
+    }
+}
+bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit)
+{
+    boost::asio::io_context context;
+    auto time = ceph::coarse_real_clock::now();
+    int rw = 0; // will always use PUT method as there is no different
+    std::string methodop = method[rw];
+    auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
+    bool to_fail = ratelimit->should_rate_limit(methodop.c_str(), it.tenant, time, &info);
+    if(to_fail)
+    {
+        it.rejected++;
+        it.ops++;
+        return true;
+    }
+    it.accepted++;
+    return false;
+}
+void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
+{
+    for (;;)
+    {
+                    bool to_retry = simulate_request(it, info, ratelimit);
+                    while (to_retry && to_run)
+                    {
+                        if (params.wait_between_retries_ms)
+                        {
+                            boost::asio::steady_timer timer(ioctx);
+                            timer.expires_after(std::chrono::milliseconds(params.wait_between_retries_ms));
+                            timer.async_wait(ctx);
+                        }
+                        to_retry = simulate_request(it, info, ratelimit);
+                    }
+                    if (!to_run)
+                    {
+                        return;
+                    }
+                    simulate_transfer(it, &info, ratelimit, params, ctx, ioctx);
+    }
+}
+void simulate_clients(boost::asio::io_context& context, std::string tenant, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, bool& to_run)
+{
+    for (int i = 0; i < params.num_clients; i++)
+    {
+        auto& it = ds->emplace_back(client_info());
+        it.tenant = tenant;
+        int x = ds->size() - 1;
+        spawn::spawn(context,
+                [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx)
+                {
+                    auto& it = ds.get()->operator[](x);
+                    simulate_client(it, info, ratelimit, params, ctx, to_run, context);
+                });
+    }
+}
+int main(int argc, char **argv)
+{
+    int num_ratelimit_classes = 1;
+    int64_t ops_limit = 1;
+    int64_t bw_limit = 1;
+    int thread_count = 512;
+    int runtime = 60;
+    parameters params;
+    try
+    {
+        using namespace boost::program_options;
+        options_description desc{"Options"};
+        desc.add_options()
+        ("help,h", "Help screen")
+        ("num_ratelimit_classes", value<int>()->default_value(1), "how many ratelimit tenants")
+        ("request_size", value<int64_t>()->default_value(1), "what is the request size we are testing if 0, it will be randomized")
+        ("backend_bandwidth", value<int64_t>()->default_value(1), "what is the backend bandwidth, so there will be wait between decrease_bytes")
+        ("wait_between_retries_ms", value<size_t>()->default_value(1), "time in seconds to wait between retries")
+        ("ops_limit", value<int64_t>()->default_value(1), "ops limit for the tenants")
+        ("bw_limit", value<int64_t>()->default_value(1), "bytes per second limit")
+        ("threads", value<int>()->default_value(512), "server's threads count")
+        ("runtime", value<int>()->default_value(60), "For how many seconds the test will run")
+        ("num_clients", value<int>()->default_value(1), "number of clients per tenant to run");
+        variables_map vm;
+        store(parse_command_line(argc, argv, desc), vm);
+        if (vm.count("help")) {
+            std::cout << desc << std::endl;
+            return EXIT_SUCCESS;
+        }
+        num_ratelimit_classes = vm["num_ratelimit_classes"].as<int>();
+        params.req_size = vm["request_size"].as<int64_t>();
+        params.backend_bandwidth = vm["backend_bandwidth"].as<int64_t>();
+        params.wait_between_retries_ms = vm["wait_between_retries_ms"].as<size_t>();
+        params.num_clients = vm["num_clients"].as<int>();
+        ops_limit = vm["ops_limit"].as<int64_t>();
+        bw_limit = vm["bw_limit"].as<int64_t>();
+        thread_count = vm["threads"].as<int>();
+        runtime = vm["runtime"].as<int>();
+    }
+    catch (const boost::program_options::error &ex)
+    {
+        std::cerr << ex.what() << std::endl;
+        return EXIT_FAILURE;
+    }
+    RGWRateLimitInfo info;
+    info.enabled = true;
+    info.max_read_bytes = bw_limit;
+    info.max_write_bytes = bw_limit;
+    info.max_read_ops = ops_limit;
+    info.max_write_ops = ops_limit;
+    std::unique_ptr<CephContext> cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_ANY);
+    if (!g_ceph_context)
+    {
+        g_ceph_context = cct.get();
+    }
+    std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+    ratelimit->start();
+    std::vector<std::thread> threads;
+    using Executor = boost::asio::io_context::executor_type;
+    std::optional<boost::asio::executor_work_guard<Executor>> work;
+    threads.reserve(thread_count);
+    boost::asio::io_context context;
+    boost::asio::io_context stopme;
+    work.emplace(boost::asio::make_work_guard(context));
+    // server execution
+    for (int i = 0; i < thread_count; i++) {
+      threads.emplace_back([&]() noexcept {
+        context.run();
+      });
+    }
+    //client execution
+    bool to_run = true;
+    ds->reserve(num_ratelimit_classes*params.num_clients);
+    for (int i = 0; i < num_ratelimit_classes; i++)
+    {
+        unsigned long long tenantid = disttenant(rng);
+        std::string tenantuser = "uuser" + std::to_string(tenantid);
+        simulate_clients(context, tenantuser, info, ratelimit->get_active(), params, to_run);
+    }
+    boost::asio::steady_timer timer_runtime(stopme);
+    timer_runtime.expires_after(std::chrono::seconds(runtime));
+    timer_runtime.wait();
+    work.reset();
+    context.stop();
+    to_run = false;
+
+    for (auto& i : threads)
+    {
+        i.join();
+    }
+    std::unordered_map<std::string,client_info> metrics_by_tenant;
+    for(auto& i : *ds.get())
+    {
+        auto it = metrics_by_tenant.emplace(i.tenant, client_info()).first;
+        std::cout << i.accepted << std::endl;
+        it->second.accepted += i.accepted;
+        it->second.rejected += i.rejected;
+    }
+    // TODO sum the results by tenant
+    for(auto& i : metrics_by_tenant)
+    {
+        std::cout << "Tenant is: " << i.first << std::endl;
+        std::cout << "Simulator finished accepted  sum : " << i.second.accepted << std::endl;
+        std::cout << "Simulator finished rejected  sum : " << i.second.rejected << std::endl;
+    }
+
+    return 0;
+}
diff --git a/src/test/rgw/bench_rgw_ratelimit_gc.cc b/src/test/rgw/bench_rgw_ratelimit_gc.cc
new file mode 100644 (file)
index 0000000..7b07dc2
--- /dev/null
@@ -0,0 +1,52 @@
+#include "rgw/rgw_ratelimit.h"
+#include "rgw/rgw_common.h"
+#include "random"
+#include <cstdlib>
+#include <string>
+#include <chrono>
+#include <boost/program_options.hpp>
+int main(int argc, char **argv)
+{
+    int num_qos_classes = 1;
+    try
+    {
+        using namespace boost::program_options;
+        options_description desc{"Options"};
+        desc.add_options()
+        ("help,h", "Help screen")
+        ("num_qos_classes", value<int>()->default_value(1), "how many qos tenants");
+        variables_map vm;
+        store(parse_command_line(argc, argv, desc), vm);
+        if (vm.count("help")) {
+            std::cout << desc << std::endl;
+            return EXIT_SUCCESS;
+        }
+        num_qos_classes = vm["num_qos_classes"].as<int>();
+    }
+    catch (const boost::program_options::error &ex)
+    {
+        std::cerr << ex.what() << std::endl;
+        return EXIT_FAILURE;
+    }
+    RGWRateLimitInfo info;
+    info.enabled = true;
+    info.max_read_bytes = 0;
+    info.max_write_bytes = 0;
+    info.max_read_ops = 0;
+    info.max_write_ops = 0;
+    std::unique_ptr<CephContext> cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_ANY);
+    if (!g_ceph_context)
+    {
+        g_ceph_context = cct.get();
+    }
+    std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+    ratelimit->start();
+    auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
+    for(int i = 0; i < num_qos_classes; i++)
+    {
+        std::string tenant = "uuser" + std::to_string(i);
+        auto time = ceph::coarse_real_clock::now();
+        ratelimit->get_active()->should_rate_limit("PUT", tenant, time, &info);
+    }
+
+}
index 32ec599985d737c6f17dea5eaf29efdc682b5064..501144d5a34d25159dbacf00326de93e62ae1277 100644 (file)
@@ -115,7 +115,9 @@ public:
   virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) override {
     return 0;
   }
-
+  virtual int merge_and_store_attrs(const DoutPrefixProvider *dpp, rgw::sal::Attrs& attrs, optional_yield y) override {
+    return 0;
+  }
   virtual ~TestUser() = default;
 };
 
diff --git a/src/test/rgw/test_rgw_ratelimit.cc b/src/test/rgw/test_rgw_ratelimit.cc
new file mode 100644 (file)
index 0000000..875ec83
--- /dev/null
@@ -0,0 +1,376 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <gtest/gtest.h>
+#include "rgw/rgw_ratelimit.h"
+
+
+using namespace std::chrono_literals;
+
+TEST(RGWRateLimit, op_limit_not_enabled)
+{
+  // info.enabled = false, so no limit
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("PUT", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, reject_op_over_limit)
+{
+  // check that request is being rejected because there are not enough tokens
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  time = ceph::coarse_real_clock::now();
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimit, accept_op_after_giveback)
+{
+  // check that giveback is working fine
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  ratelimit.giveback_tokens("GET", key);
+  time = ceph::coarse_real_clock::now();
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, accept_op_after_refill)
+{
+  // check that tokens are being filled properly
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  time += 61s;
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, reject_bw_over_limit)
+{
+  // check that a newer request is rejected if there is no enough tokens (bw)
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_bytes = 1;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  ratelimit.decrease_bytes("GET",key, 2, &info);
+  time = ceph::coarse_real_clock::now();
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimit, accept_bw)
+{
+  // check that when there are enough tokens (bw) the request is still being served
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_bytes = 2;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  ratelimit.decrease_bytes("GET",key, 1, &info);
+  time = ceph::coarse_real_clock::now();
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, check_bw_debt_at_max_120secs)
+{
+  // check that the bandwidth debt is not larger than 120 seconds
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_bytes = 2;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  ratelimit.decrease_bytes("GET",key, 100, &info);
+  time += 121s;
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, check_that_bw_limit_not_affect_ops)
+{
+  // check that high read bytes limit, does not affect ops limit
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  info.max_read_bytes = 100000000;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  ratelimit.decrease_bytes("GET",key, 10000, &info);
+  time = ceph::coarse_real_clock::now();
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimit, read_limit_does_not_affect_writes)
+{
+  // read limit does not affect writes
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  info.max_read_bytes = 100000000;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("PUT", key, time, &info);
+  ratelimit.decrease_bytes("PUT",key, 10000, &info);
+  time = ceph::coarse_real_clock::now();
+  success = ratelimit.should_rate_limit("PUT", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimit, write_limit_does_not_affect_reads)
+{
+  // write limit does not affect reads
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_write_ops = 1;
+  info.max_write_bytes = 100000000;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  ratelimit.decrease_bytes("GET",key, 10000, &info);
+  time = ceph::coarse_real_clock::now();
+  success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+
+TEST(RGWRateLimit, allow_unlimited_access)
+{
+  // 0 values in RGWRateLimitInfo should allow unlimited access
+  std::atomic_bool replacing;
+  std::condition_variable cv;
+  RateLimiter ratelimit(replacing, cv);
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  bool success = ratelimit.should_rate_limit("GET", key, time, &info);
+  EXPECT_EQ(false, success);
+}
+
+TEST(RGWRateLimitGC, NO_GC_AHEAD_OF_TIME)
+{
+  // Test if GC is not starting the replace before getting to map_size * 0.9
+  // Please make sure to change those values when you change the map_size in the code
+
+  std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+  ratelimit->start();
+  auto active = ratelimit->get_active();
+  RGWRateLimitInfo info;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "uuser123";
+  active->should_rate_limit("GET", key, time, &info);
+  auto activegc = ratelimit->get_active();
+  EXPECT_EQ(activegc, active);
+}
+TEST(RGWRateLimiterGC, GC_IS_WORKING)
+{
+  // Test if GC is replacing the active RateLimiter
+  // Please make sure to change those values when you change the map_size in the code
+
+  std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
+  ratelimit->start();
+  auto active = ratelimit->get_active();
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  auto time = ceph::coarse_real_clock::now();
+  std::string key = "-1";
+  for(int i = 0; i < 2000000; i++)
+  {
+    active->should_rate_limit("GET", key, time, &info);
+    key = std::to_string(i);
+  }
+  auto activegc = ratelimit->get_active();
+  EXPECT_NE(activegc, active);
+}
+  
+  
+TEST(RGWRateLimitEntry, op_limit_not_enabled)
+{
+  // info.enabled = false, so no limit
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(false, &info, time);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, reject_op_over_limit)
+{
+  // check that request is being rejected because there are not enough tokens
+
+  RGWRateLimitInfo info;
+  RateLimiterEntry entry;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true, &info, time);
+  time = ceph::coarse_real_clock::now().time_since_epoch();
+  success = entry.should_rate_limit(true, &info, time);
+  EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimitEntry, accept_op_after_giveback)
+{
+  // check that giveback is working fine
+  RGWRateLimitInfo info;
+  RateLimiterEntry entry;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true,  &info, time);
+  entry.giveback_tokens(true);
+  time = ceph::coarse_real_clock::now().time_since_epoch();
+  success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, accept_op_after_refill)
+{
+  // check that tokens are being filled properly
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true,  &info, time);
+  time += 61s;
+  success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, reject_bw_over_limit)
+{
+  // check that a newer request is rejected if there is no enough tokens (bw)
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_bytes = 1;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true,  &info, time);
+  entry.decrease_bytes(true, 2, &info);
+  time = ceph::coarse_real_clock::now().time_since_epoch();
+  success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimitEntry, accept_bw)
+{
+  // check that when there are enough tokens (bw) the request is still being served
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_bytes = 2;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true,  &info, time);
+  entry.decrease_bytes(true, 1, &info);
+  time = ceph::coarse_real_clock::now().time_since_epoch();
+  success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, check_bw_debt_at_max_120secs)
+{
+  // check that the bandwidth debt is not larger than 120 seconds
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_bytes = 2;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true,  &info, time);
+  entry.decrease_bytes(true, 100, &info);
+  time += 121s;
+  success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, check_that_bw_limit_not_affect_ops)
+{
+  // check that high read bytes limit, does not affect ops limit
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  info.max_read_bytes = 100000000;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true,  &info, time);
+  entry.decrease_bytes(true, 10000, &info);
+  time = ceph::coarse_real_clock::now().time_since_epoch();
+  success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(true, success);
+}
+TEST(RGWRateLimitEntry, read_limit_does_not_affect_writes)
+{
+  // read limit does not affect writes
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_read_ops = 1;
+  info.max_read_bytes = 100000000;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(false,  &info, time);
+  entry.decrease_bytes(false, 10000, &info);
+  time = ceph::coarse_real_clock::now().time_since_epoch();
+  success = entry.should_rate_limit(false,  &info, time);
+  EXPECT_EQ(false, success);
+}
+TEST(RGWRateLimitEntry, write_limit_does_not_affect_reads)
+{
+  // write limit does not affect reads
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  info.max_write_ops = 1;
+  info.max_write_bytes = 100000000;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  std::string key = "uuser123";
+  bool success = entry.should_rate_limit(true,  &info, time);
+  entry.decrease_bytes(true, 10000, &info);
+  time = ceph::coarse_real_clock::now().time_since_epoch();
+  success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(false, success);
+}
+
+TEST(RGWRateLimitEntry, allow_unlimited_access)
+{
+  // 0 values in RGWRateLimitInfo should allow unlimited access (default value)
+  RateLimiterEntry entry;
+  RGWRateLimitInfo info;
+  info.enabled = true;
+  auto time = ceph::coarse_real_clock::now().time_since_epoch();
+  bool success = entry.should_rate_limit(true,  &info, time);
+  EXPECT_EQ(false, success);
+}