]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: Document recv_stamp and add a dispatch_stamp and throttle_wait.
authorGreg Farnum <gregory.farnum@dreamhost.com>
Thu, 12 Jan 2012 20:42:21 +0000 (12:42 -0800)
committerGreg Farnum <gregory.farnum@dreamhost.com>
Tue, 31 Jan 2012 23:41:40 +0000 (15:41 -0800)
Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/msg/Message.h
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index f37a88424ffcb512d37330f96a770eff5f048d6d..5a863ed17530ac7013460d8058fcd98f55e75713 100644 (file)
@@ -272,7 +272,16 @@ protected:
   bufferlist       middle;   // "middle" unaligned blob
   bufferlist       data;     // data payload (page-alignment will be preserved where possible)
   
+  /* recv_stamp is set when the Messenger starts reading the
+   * Message off the wire */
   utime_t recv_stamp;
+  /* dispatch_stamp is set when the Messenger starts calling dispatch() on
+   * its endpoints */
+  utime_t dispatch_stamp;
+  /* throttle_wait is the amount of time spent waiting on throttlers between
+   * message receipt and message dispatch*/
+  utime_t throttle_wait;
+
   Connection *connection;
 
   // release our size in bytes back to this throttler when our payload
@@ -380,7 +389,11 @@ public:
   off_t get_data_len() { return data.length(); }
 
   void set_recv_stamp(utime_t t) { recv_stamp = t; }
-  utime_t get_recv_stamp() { return recv_stamp; }
+  const utime_t& get_recv_stamp() { return recv_stamp; }
+  void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
+  const utime_t& get_dispatch_stamp() { return dispatch_stamp; }
+  void set_throttle_wait(utime_t t) { throttle_wait = t; }
+  const utime_t& get_throttle_wait() { return throttle_wait; }
 
   void calc_header_crc() {
     header.crc = ceph_crc32c_le(0, (unsigned char*)&header,
index 2e257084b75422fcd267ad2388fd764582d6c91a..5994dce72419aeb1a3bd2b50acb6cd8369f32971 100644 (file)
@@ -96,6 +96,7 @@ protected:
 
   // dispatch incoming messages
   void ms_deliver_dispatch(Message *m) {
+    m->set_dispatch_stamp(ceph_clock_now(cct));
     for (list<Dispatcher*>::iterator p = dispatchers.begin();
         p != dispatchers.end();
         p++)
index 8706699f1623d5f114e14829c29384d088870999..770700045470d9a72849b804d001cfb233ca8d80 100644 (file)
@@ -1868,6 +1868,8 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   unsigned data_len, data_off;
   int aborted;
   Message *message;
+  utime_t recv_stamp = ceph_clock_now(msgr->cct);
+  bool waited_on_throttle = false;
 
   uint64_t message_size = header.front_len + header.middle_len + header.data_len;
   if (message_size) {
@@ -1875,7 +1877,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
       ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler "
               << policy.throttler->get_current() << "/"
               << policy.throttler->get_max() << dendl;
-      policy.throttler->get(message_size);
+      waited_on_throttle = policy.throttler->get(message_size);
     }
 
     // throttle total bytes waiting for dispatch.  do this _after_ the
@@ -1885,7 +1887,12 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
     ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
             << msgr->dispatch_throttler.get_current() << "/"
             << msgr->dispatch_throttler.get_max() << dendl;
-    msgr->dispatch_throttler.get(message_size);
+    waited_on_throttle |= msgr->dispatch_throttler.get(message_size);
+  }
+
+  utime_t throttle_wait;
+  if (waited_on_throttle) {
+    throttle_wait = ceph_clock_now(msgr->cct) - recv_stamp;
   }
 
   // read front
@@ -1993,6 +2000,9 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   // by messages entering the dispatch queue through other paths.
   message->set_dispatch_throttle_size(message_size);
 
+  message->set_recv_stamp(recv_stamp);
+  message->set_throttle_wait(throttle_wait);
+
   *pm = message;
   return 0;
 
index 54df956b84179ada7a7feea8f19251189de7aa47..9db3f39b3dd9515ef088d31393a7c81ce70dae3f 100644 (file)
@@ -288,8 +288,6 @@ private:
     void queue_received(Message *m, int priority);
     
     void queue_received(Message *m) {
-      m->set_recv_stamp(ceph_clock_now(msgr->cct));
-
       // this is just to make sure that a changeset is working
       // properly; if you start using the refcounting more and have
       // multiple people hanging on to a message, ditch the assert!