]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: add second per-message throttler to message policy
authorSage Weil <sage@inktank.com>
Thu, 4 Apr 2013 04:30:51 +0000 (21:30 -0700)
committerSage Weil <sage@inktank.com>
Sat, 6 Apr 2013 15:17:01 +0000 (08:17 -0700)
We already have a throttler that lets of limit the amount of memory
consumed by messages from a given source.  Currently this is based only
on the size of the message payload.  Add a second throttler that limits
the number of messages so that we can effectively throttle small requests
as well.

Signed-off-by: Sage Weil <sage@inktank.com>
src/ceph_mon.cc
src/ceph_osd.cc
src/msg/Message.h
src/msg/Messenger.h
src/msg/Pipe.cc
src/msg/SimpleMessenger.h
src/test/mon/test_mon_workloadgen.cc

index 4a4df8942e960d650375d8eb3738876ed605bcba..72354e1887607c9cd2b3aee73b21d90366736603 100644 (file)
@@ -407,15 +407,15 @@ int main(int argc, const char **argv)
   // throttle client traffic
   Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
                                            g_conf->mon_client_bytes);
-  messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler);
+  messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, client_throttler, NULL);
 
   // throttle daemon traffic
   // NOTE: actual usage on the leader may multiply by the number of
   // monitors if they forward large update messages from daemons.
   Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
                                            g_conf->mon_daemon_bytes);
-  messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler);
-  messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler);
+  messenger->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler, NULL);
+  messenger->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler, NULL);
 
   cout << "starting " << g_conf->name << " rank " << rank
        << " at " << ipaddr
index 5a90abd6125c475513be3efcee9ff07c7fd25b05..d7735a7a83afc1a79008362d67223faf51606366 100644 (file)
@@ -349,9 +349,9 @@ int main(int argc, const char **argv)
     CEPH_FEATURE_MSG_AUTH;
 
   client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
-  client_messenger->set_policy_throttler(
-    entity_name_t::TYPE_CLIENT,
-    client_throttler.get());  // default, actually
+  client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+                                         client_throttler.get(),
+                                         NULL);
   client_messenger->set_policy(entity_name_t::TYPE_MON,
                                Messenger::Policy::lossy_client(supported,
                                                               CEPH_FEATURE_UID |
index 1bf28e36f2d33ff443a8ed70dd534c215a4bf6a7..33d26b2e7da2a4476aeff0405224a75b6556c649 100644 (file)
@@ -299,7 +299,10 @@ protected:
 
   // release our size in bytes back to this throttler when our payload
   // is adjusted or when we are destroyed.
-  Throttle *throttler;
+  Throttle *byte_throttler;
+
+  // release a count back to this throttler when we are destroyed
+  Throttle *msg_throttler;
 
   // keep track of how big this message was when we reserved space in
   // the msgr dispatch_throttler, so that we can properly release it
@@ -313,14 +316,16 @@ protected:
 public:
   Message()
     : connection(NULL),
-      throttler(NULL),
+      byte_throttler(NULL),
+      msg_throttler(NULL),
       dispatch_throttle_size(0) {
     memset(&header, 0, sizeof(header));
     memset(&footer, 0, sizeof(footer));
   };
   Message(int t, int version=1, int compat_version=0)
     : connection(NULL),
-      throttler(NULL),
+      byte_throttler(NULL),
+      msg_throttler(NULL),
       dispatch_throttle_size(0) {
     memset(&header, 0, sizeof(header));
     header.type = t;
@@ -340,8 +345,10 @@ protected:
     assert(nref.read() == 0);
     if (connection)
       connection->put();
-    if (throttler)
-      throttler->put(payload.length() + middle.length() + data.length());
+    if (byte_throttler)
+      byte_throttler->put(payload.length() + middle.length() + data.length());
+    if (msg_throttler)
+      msg_throttler->put();
   }
 public:
   Connection *get_connection() { return connection; }
@@ -350,8 +357,10 @@ public:
       connection->put();
     connection = c;
   }
-  void set_throttler(Throttle *t) { throttler = t; }
-  Throttle *get_throttler() { return throttler; }
+  void set_byte_throttler(Throttle *t) { byte_throttler = t; }
+  Throttle *get_byte_throttler() { return byte_throttler; }
+  void set_message_throttler(Throttle *t) { msg_throttler = t; }
+  Throttle *get_message_throttler() { return msg_throttler; }
  
   void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
   uint64_t get_dispatch_throttle_size() { return dispatch_throttle_size; }
@@ -369,39 +378,48 @@ public:
    */
 
   void clear_payload() {
-    if (throttler) throttler->put(payload.length() + middle.length());
+    if (byte_throttler)
+      byte_throttler->put(payload.length() + middle.length());
     payload.clear();
     middle.clear();
   }
   void clear_data() {
-    if (throttler) throttler->put(data.length());
+    if (byte_throttler)
+      byte_throttler->put(data.length());
     data.clear();
   }
 
   bool empty_payload() { return payload.length() == 0; }
   bufferlist& get_payload() { return payload; }
   void set_payload(bufferlist& bl) {
-    if (throttler) throttler->put(payload.length());
+    if (byte_throttler)
+      byte_throttler->put(payload.length());
     payload.claim(bl);
-    if (throttler) throttler->take(payload.length());
+    if (byte_throttler)
+      byte_throttler->take(payload.length());
   }
 
   void set_middle(bufferlist& bl) {
-    if (throttler) throttler->put(payload.length());
+    if (byte_throttler)
+      byte_throttler->put(payload.length());
     middle.claim(bl);
-    if (throttler) throttler->take(payload.length());
+    if (byte_throttler)
+      byte_throttler->take(payload.length());
   }
   bufferlist& get_middle() { return middle; }
 
   void set_data(const bufferlist &d) {
-    if (throttler) throttler->put(data.length());
+    if (byte_throttler)
+      byte_throttler->put(data.length());
     data = d;
-    if (throttler) throttler->take(data.length());
+    if (byte_throttler)
+      byte_throttler->take(data.length());
   }
 
   bufferlist& get_data() { return data; }
   void claim_data(bufferlist& bl) {
-    if (throttler) throttler->put(data.length());
+    if (byte_throttler)
+      byte_throttler->put(data.length());
     bl.claim(data);
   }
   off_t get_data_len() { return data.length(); }
index 7205940c118e44c9e5f3d931c5f6918ab2221bed..b08fdaa7f300143faed0a3f3f2a7a9f54c6009be 100644 (file)
@@ -74,7 +74,8 @@ public:
      *  the associated Connection(s). When reading in a new Message, the Messenger
      *  will call throttler->throttle() for the size of the new Message.
      */
-    Throttle *throttler;
+    Throttle *throttler_bytes;
+    Throttle *throttler_messages;
 
     /// Specify features supported locally by the endpoint.
     uint64_t features_supported;
@@ -82,12 +83,16 @@ public:
     uint64_t features_required;
 
     Policy()
-      : lossy(false), server(false), standby(false), resetcheck(true), throttler(NULL),
+      : lossy(false), server(false), standby(false), resetcheck(true),
+       throttler_bytes(NULL),
+       throttler_messages(NULL),
        features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
        features_required(0) {}
   private:
     Policy(bool l, bool s, bool st, bool r, uint64_t sup, uint64_t req)
-      : lossy(l), server(s), standby(st), resetcheck(r), throttler(NULL),
+      : lossy(l), server(s), standby(st), resetcheck(r),
+       throttler_bytes(NULL),
+       throttler_messages(NULL),
        features_supported(sup | CEPH_FEATURES_SUPPORTED_DEFAULT),
        features_required(req) {}
 
@@ -266,7 +271,7 @@ public:
    * ownership of this pointer, but you must not destroy it before
    * you destroy the Messenger.
    */
-  virtual void set_policy_throttler(int type, Throttle *t) = 0;
+  virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
   /**
    * Set the default send priority
    *
index ae94a6a340c86ed0009fbe7a7a9ba3d249e6537b..343f975225b787bfa208af586834f7f4821b01a2 100644 (file)
@@ -1661,14 +1661,20 @@ int Pipe::read_message(Message **pm)
   Message *message;
   utime_t recv_stamp = ceph_clock_now(msgr->cct);
 
+  if (policy.throttler_messages) {
+    ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
+                       << policy.throttler_messages->get_current() << "/"
+                       << policy.throttler_messages->get_max() << dendl;
+    policy.throttler_messages->get();
+  }
+
   uint64_t message_size = header.front_len + header.middle_len + header.data_len;
   if (message_size) {
-    bool waited_on_throttle = false;
-    if (policy.throttler) {
-      ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler "
-              << policy.throttler->get_current() << "/"
-              << policy.throttler->get_max() << dendl;
-      waited_on_throttle = policy.throttler->get(message_size);
+    if (policy.throttler_bytes) {
+      ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
+              << policy.throttler_bytes->get_current() << "/"
+              << policy.throttler_bytes->get_max() << dendl;
+      policy.throttler_bytes->get(message_size);
     }
 
     // throttle total bytes waiting for dispatch.  do this _after_ the
@@ -1678,7 +1684,7 @@ int Pipe::read_message(Message **pm)
     ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
             << msgr->dispatch_throttler.get_current() << "/"
             << msgr->dispatch_throttler.get_max() << dendl;
-    waited_on_throttle |= msgr->dispatch_throttler.get(message_size);
+    msgr->dispatch_throttler.get(message_size);
   }
 
   utime_t throttle_stamp = ceph_clock_now(msgr->cct);
@@ -1807,7 +1813,8 @@ int Pipe::read_message(Message **pm)
     } 
   }
 
-  message->set_throttler(policy.throttler);
+  message->set_byte_throttler(policy.throttler_bytes);
+  message->set_message_throttler(policy.throttler_messages);
 
   // store reservation size in message, so we don't get confused
   // by messages entering the dispatch queue through other paths.
@@ -1822,12 +1829,18 @@ int Pipe::read_message(Message **pm)
 
  out_dethrottle:
   // release bytes reserved from the throttlers on failure
+  if (policy.throttler_messages) {
+    ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
+                       << policy.throttler_messages->get_current() << "/"
+                       << policy.throttler_messages->get_max() << dendl;
+    policy.throttler_messages->put();
+  }
   if (message_size) {
-    if (policy.throttler) {
-      ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler "
-              << policy.throttler->get_current() << "/"
-              << policy.throttler->get_max() << dendl;
-      policy.throttler->put(message_size);
+    if (policy.throttler_bytes) {
+      ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
+                         << policy.throttler_bytes->get_current() << "/"
+                         << policy.throttler_bytes->get_max() << dendl;
+      policy.throttler_bytes->put(message_size);
     }
 
     msgr->dispatch_throttle_release(message_size);
index cc946e3d25a182de4c996b12c9cd52ebf44aed4b..d837a4496ae3fafc426dd6a3424cf325c360be79 100644 (file)
@@ -162,12 +162,15 @@ public:
    * ownership of this pointer, but you must not destroy it before
    * you destroy SimpleMessenger.
    */
-  void set_policy_throttler(int type, Throttle *t) {
+  void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) {
     Mutex::Locker l(policy_lock);
-    if (policy_map.count(type))
-      policy_map[type].throttler = t;
-    else
-      default_policy.throttler = t;
+    if (policy_map.count(type)) {
+      policy_map[type].throttler_bytes = byte_throttle;
+      policy_map[type].throttler_messages = msg_throttle;
+    } else {
+      default_policy.throttler_bytes = byte_throttle;
+      default_policy.throttler_messages = msg_throttle;
+    }
   }
   /**
    * Bind the SimpleMessenger to a specific address. If bind_addr
index 216e6288b1f44da79443dbfe1db443153053bfc3..07f999180a39d766b8f986ef31a6da1ad663dcaf 100644 (file)
@@ -366,8 +366,8 @@ class OSDStub : public TestStub
 
     messenger->set_default_policy(
        Messenger::Policy::stateless_server(supported, 0));
-    messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT,
-       &throttler);
+    messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+                                   &throttler, NULL);
     messenger->set_policy(entity_name_t::TYPE_MON,
        Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID |
          CEPH_FEATURE_PGID64 |