]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: release bytes reserved from throttlers in failure paths
authorSage Weil <sage@newdream.net>
Wed, 30 Jun 2010 19:08:18 +0000 (12:08 -0700)
committerSage Weil <sage@newdream.net>
Wed, 30 Jun 2010 19:09:31 +0000 (12:09 -0700)
If we don't release those bytes, the throttler count eventually fills up
with bytes we were going to read but didn't (due to socket errors, etc)
until we can't read anything.

Signed-off-by: Sage Weil <sage@newdream.net>
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 937f1764f94fc19e839353bf5dae68ded19e4dfe..1c65cb621e10cd62d451d4c9c7b77f583b69854d 100644 (file)
@@ -1500,12 +1500,12 @@ void SimpleMessenger::Pipe::reader()
 
     else if (tag == CEPH_MSGR_TAG_MSG) {
       dout(20) << "reader got MSG" << dendl;
-      Message *m = read_message();
+      Message *m = 0;
+      int r = read_message(&m);
 
       pipe_lock.Lock();
       
       if (!m) {
-       derr(2) << "reader read null message, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
        fault(false, true);
        continue;
       }
@@ -1701,8 +1701,9 @@ void SimpleMessenger::Pipe::unlock_maybe_reap()
 }
 
 
-Message *SimpleMessenger::Pipe::read_message()
+int SimpleMessenger::Pipe::read_message(Message **pm)
 {
+  int ret = -1;
   // envelope
   //dout(10) << "receiver.read_message from sd " << sd  << dendl;
   
@@ -1712,12 +1713,12 @@ Message *SimpleMessenger::Pipe::read_message()
   
   if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
     if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
-      return 0;
+      return -1;
     header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
   } else {
     ceph_msg_header_old oldheader;
     if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
-      return 0;
+      return -1;
     // this is fugly
     memcpy(&header, &oldheader, sizeof(header));
     header.src = oldheader.src.name;
@@ -1736,51 +1737,58 @@ Message *SimpleMessenger::Pipe::read_message()
   // verify header crc
   if (header_crc != header.crc) {
     dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
-    return 0;
+    return -1;
   }
-  dout(10) << "getting message bytes now, currently using "
-          << messenger->message_throttler.get_current() << "/"
-          << messenger->message_throttler.get_max() << dendl;
-  uint64_t message_size = header.front_len  + header.middle_len
-    + header.data_len;
+
+  bufferlist front, middle, data;
+  int front_len, middle_len;
+  unsigned data_len, data_off;
+  int aborted;
+  Message *message;
+
+  uint64_t message_size = header.front_len + header.middle_len + header.data_len;
   if (message_size) {
-    if (policy.throttler)
+    if (policy.throttler) {
+      dout(10) << "reader wants " << message_size << " from policy throttler "
+              << policy.throttler->get_current() << "/"
+              << policy.throttler->get_max() << dendl;
       policy.throttler->get(message_size);
+    }
 
     // throttle total bytes waiting for dispatch.  do this _after_ the
     // policy throttle, as this one does not deadlock (unless dispatch
     // blocks indefinitely, which it shouldn't).  in contrast, the
     // policy throttle carries for the lifetime of the message.
+    dout(10) << "reader wants " << message_size << " from dispatch throttler "
+            << messenger->message_throttler.get_current() << "/"
+            << messenger->message_throttler.get_max() << dendl;
     messenger->message_throttler.get(message_size);
   }
 
   // read front
-  bufferlist front;
-  int front_len = header.front_len;
+  front_len = header.front_len;
   if (front_len) {
     bufferptr bp = buffer::create(front_len);
     if (tcp_read( sd, bp.c_str(), front_len ) < 0) 
-      return 0;
+      goto out_dethrottle;
     front.push_back(bp);
     dout(20) << "reader got front " << front.length() << dendl;
   }
 
   // read middle
-  bufferlist middle;
-  int middle_len = header.middle_len;
+  middle_len = header.middle_len;
   if (middle_len) {
     bufferptr bp = buffer::create(middle_len);
     if (tcp_read( sd, bp.c_str(), middle_len ) < 0) 
-      return 0;
+      goto out_dethrottle;
     middle.push_back(bp);
     dout(20) << "reader got middle " << middle.length() << dendl;
   }
 
 
   // read data
-  bufferlist data;
-  unsigned data_len = le32_to_cpu(header.data_len);
-  unsigned data_off = le32_to_cpu(header.data_off);
+  data_len = le32_to_cpu(header.data_len);
+  data_off = le32_to_cpu(header.data_off);
   if (data_len) {
     int left = data_len;
     if (data_off & ~PAGE_MASK) {
@@ -1789,7 +1797,7 @@ Message *SimpleMessenger::Pipe::read_message()
                     (unsigned)left);
       bufferptr bp = buffer::create(head);
       if (tcp_read( sd, bp.c_str(), head ) < 0) 
-       return 0;
+       goto out_dethrottle;
       data.push_back(bp);
       left -= head;
       dout(20) << "reader got data head " << head << dendl;
@@ -1800,7 +1808,7 @@ Message *SimpleMessenger::Pipe::read_message()
     if (middle > 0) {
       bufferptr bp = buffer::create_page_aligned(middle);
       if (tcp_read( sd, bp.c_str(), middle ) < 0) 
-       return 0;
+       goto out_dethrottle;
       data.push_back(bp);
       left -= middle;
       dout(20) << "reader got data page-aligned middle " << middle << dendl;
@@ -1809,7 +1817,7 @@ Message *SimpleMessenger::Pipe::read_message()
     if (left) {
       bufferptr bp = buffer::create(left);
       if (tcp_read( sd, bp.c_str(), left ) < 0) 
-       return 0;
+       goto out_dethrottle;
       data.push_back(bp);
       dout(20) << "reader got data tail " << left << dendl;
     }
@@ -1817,25 +1825,48 @@ Message *SimpleMessenger::Pipe::read_message()
 
   // footer
   if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
-    return 0;
+    goto out_dethrottle;
   
-  int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
+  aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
   dout(10) << "aborted = " << aborted << dendl;
   if (aborted) {
     dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
            << " byte message.. ABORTED" << dendl;
     // MEH FIXME 
-    Message *m = new MGenericMessage(CEPH_MSG_PING);
+    *pm = new MGenericMessage(CEPH_MSG_PING);
     header.type = CEPH_MSG_PING;
-    m->set_header(header);
-    return m;
+    (*pm)->set_header(header);
+    ret = 0;
+    goto out_dethrottle;
   }
 
   dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
           << " byte message" << dendl;
-  Message *message = decode_message(header, footer, front, middle, data);
+  message = decode_message(header, footer, front, middle, data);
   message->set_throttler(policy.throttler);
-  return message;
+  *pm = message;
+  return 0;
+
+ out_dethrottle:
+  // release bytes reserved from the throttlers on failure
+  if (message_size) {
+    if (policy.throttler) {
+      dout(10) << "reader releasing " << message_size << " from policy throttler "
+              << policy.throttler->get_current() << "/"
+              << policy.throttler->get_max() << dendl;
+      policy.throttler->put(message_size);
+    }
+
+    // throttle total bytes waiting for dispatch.  do this _after_ the
+    // policy throttle, as this one does not deadlock (unless dispatch
+    // blocks indefinitely, which it shouldn't).  in contrast, the
+    // policy throttle carries for the lifetime of the message.
+    dout(10) << "reader releasing " << message_size << " from dispatch throttler "
+            << messenger->message_throttler.get_current() << "/"
+            << messenger->message_throttler.get_max() << dendl;
+    messenger->message_throttler.put(message_size);
+  }
+  return ret;
 }
 
 
index 023f16b6bc4ba45d253c697196175efc8d8a1cf3..3edfd7ae9b3d0a509783d836b5375bfaff26b6be 100644 (file)
@@ -149,7 +149,7 @@ private:
     void writer();
     void unlock_maybe_reap();
 
-    Message *read_message();
+    int read_message(Message **pm);
     int write_message(Message *m);
     int do_sendmsg(int sd, struct msghdr *msg, int len, bool more=false);
     int write_ack(uint64_t s);