]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: add message throttling; with a default max of 100MB waiting for dispatch
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 28 Apr 2010 20:56:43 +0000 (13:56 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 28 Apr 2010 20:56:43 +0000 (13:56 -0700)
src/common/Throttle.h
src/config.cc
src/config.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 58cacddb4ae6ef55fbb77914bd565fa05f0057e0..d2805f73fb1abb1c526e2953bb577fa1a7a48fd7 100644 (file)
@@ -42,6 +42,8 @@ public:
     return count;
   }
 
+  __u64 get_max() { return max; }
+
   bool wait(__u64 m = 0) {
     Mutex::Locker l(lock);
     if (m)
index 3cb17f8b1be90dfc1ce925e5230c2abfb4218992..3204dfac5869c8bfec09d3edee17f63680364d7e 100644 (file)
@@ -354,6 +354,7 @@ static struct config_option config_optionsp[] = {
        OPTION(ms_die_on_failure, 0, OPT_BOOL, false),
        OPTION(ms_nocrc, 0, OPT_BOOL, false),
        OPTION(ms_die_on_bad_msg, 0, OPT_BOOL, false),
+       OPTION(ms_waiting_message_bytes, 0, OPT_INT, 104857600),
        OPTION(mon_data, 0, OPT_STR, ""),
        OPTION(mon_tick_interval, 0, OPT_INT, 5),
        OPTION(mon_subscribe_interval, 0, OPT_DOUBLE, 300),
index d9c651e2a5ef2d4ad6580fa45560d0a9e534b3b2..8595628ed702570b1748fd05e7bd8e6f99727014 100644 (file)
@@ -135,6 +135,7 @@ struct md_config_t {
   bool ms_die_on_failure;
   bool ms_nocrc;
   bool ms_die_on_bad_msg;
+  __u64 ms_waiting_message_bytes;
 
   // mon
   const char *mon_data;
index d9df0cf146d88002d01fbef15aca4caccc717b59..ed1a6513b6e6f7e87a3ef73cb42eb1f88cee8b6c 100644 (file)
@@ -286,7 +286,7 @@ void SimpleMessenger::dispatch_entry()
       dispatch_queue.qlen_lock.unlock();
 
       pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
-      {
+       {
        if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
          dispatch_queue.lock.Lock();
          Connection *con = dispatch_queue.remote_reset_q.front();
@@ -309,6 +309,8 @@ void SimpleMessenger::dispatch_entry()
          ms_deliver_handle_reset(con);
          con->put();
        } else {
+         ceph_msg_header& header = m->get_header();
+         int msize = header.front_len + header.middle_len + header.data_len;
          dout(1) << "<== " << m->get_source_inst()
                  << " " << m->get_seq()
                  << " ==== " << *m
@@ -319,6 +321,7 @@ void SimpleMessenger::dispatch_entry()
                  << " " << m 
                  << dendl;
          ms_deliver_dispatch(m);
+         message_throttler.put(msize);
          dout(20) << "done calling dispatch on " << m << dendl;
        }
       }
@@ -1730,7 +1733,11 @@ Message *SimpleMessenger::Pipe::read_message()
     dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
     return 0;
   }
-
+  dout(1) << "getting message bytes now, currently using "
+          << messenger->message_throttler.get_current() << "/"
+          << messenger->message_throttler.get_max() << dendl;
+  messenger->message_throttler.get(header.front_len  +
+                                  header.middle_len + header.data_len);
   // read front
   bufferlist front;
   int front_len = header.front_len;
index 36237f93f1ef7e6d84de6ecb51825382d492d109..c54123f928984808546ddf32a7607c94cf167c0b 100644 (file)
@@ -29,6 +29,7 @@ using namespace __gnu_cxx;
 #include "include/Spinlock.h"
 #include "common/Cond.h"
 #include "common/Thread.h"
+#include "common/Throttle.h"
 
 #include "Messenger.h"
 #include "Message.h"
@@ -420,6 +421,7 @@ private:
   Cond  wait_cond;  // for wait()
   bool started;
   bool did_bind;
+  Throttle message_throttler;
 
   // where i listen
   bool need_addr;
@@ -503,7 +505,8 @@ public:
   SimpleMessenger() :
     Messenger(entity_name_t()),
     accepter(this),
-    lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true),
+    lock("SimpleMessenger::lock"), started(false), did_bind(false),
+    message_throttler(g_conf.ms_waiting_message_bytes), need_addr(true),
     destination_stopped(true), my_type(-1),
     global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
     dispatch_thread(this), messenger(this) {