]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
messenger: introduce timeouts on pipes.
authorGreg Farnum <gregf@hq.newdream.net>
Fri, 15 Oct 2010 18:21:02 +0000 (11:21 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Fri, 15 Oct 2010 18:21:53 +0000 (11:21 -0700)
This will return read errors on a pipe if it gets no data
for the given period of time (default 15 minutes). In a stateful
session the Connection will hang around and the next write will
initiate standard reconnect, so things keep working but we don't
rack up hundreds of useless threads!

src/config.cc
src/config.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/msg/tcp.cc
src/msg/tcp.h

index 95809e14cbbcb0f514bcf96c2f38f4fb78755465..1925e7777472568b1866b1de126cc5f016708da7 100644 (file)
@@ -353,6 +353,7 @@ static struct config_option config_optionsp[] = {
        OPTION(ms_dispatch_throttle_bytes, 0, OPT_INT, 100 << 20),
        OPTION(ms_bind_ipv6, 0, OPT_BOOL, false),
         OPTION(ms_rwthread_stack_bytes, 0, OPT_INT, 1024 << 10),
+        OPTION(ms_tcp_read_timeout, 0, OPT_LONGLONG, 900),
        OPTION(mon_data, 0, OPT_STR, ""),
        OPTION(mon_tick_interval, 0, OPT_INT, 5),
        OPTION(mon_subscribe_interval, 0, OPT_DOUBLE, 300),
index d04f6be7937d65edbd2bd39640f354938dde84b7..abec2b9d8905190720f4048b73798d06ee09e117 100644 (file)
@@ -155,6 +155,7 @@ struct md_config_t {
   uint64_t ms_dispatch_throttle_bytes;
   bool ms_bind_ipv6;
   uint64_t ms_rwthread_stack_bytes;
+  uint64_t ms_tcp_read_timeout;
 
   // mon
   const char *mon_data;
index ddf65927347a6d7aa6b41deb3f4b302ca25a5d6a..463226777dbf6ddcda01742bfa69a70531a9d5fa 100644 (file)
@@ -559,7 +559,7 @@ int SimpleMessenger::Pipe::accept()
   
   // identify peer
   char banner[strlen(CEPH_BANNER)+1];
-  rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
+  rc = tcp_read(sd, banner, strlen(CEPH_BANNER), messenger->timeout);
   if (rc < 0) {
     dout(10) << "accept couldn't read banner" << dendl;
     state = STATE_CLOSED;
@@ -576,7 +576,7 @@ int SimpleMessenger::Pipe::accept()
     bufferptr tp(sizeof(peer_addr));
     addrbl.push_back(tp);
   }
-  rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
+  rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout);
   if (rc < 0) {
     dout(10) << "accept couldn't read peer_addr" << dendl;
     state = STATE_CLOSED;
@@ -611,7 +611,7 @@ int SimpleMessenger::Pipe::accept()
   int reply_tag = 0;
   bool replace = false;
   while (1) {
-    rc = tcp_read(sd, (char*)&connect, sizeof(connect));
+    rc = tcp_read(sd, (char*)&connect, sizeof(connect), messenger->timeout);
     if (rc < 0) {
       dout(10) << "accept couldn't read connect" << dendl;
       goto fail_unlocked;
@@ -621,7 +621,7 @@ int SimpleMessenger::Pipe::accept()
     authorizer.clear();
     if (connect.authorizer_len) {
       bp = buffer::create(connect.authorizer_len);
-      if (tcp_read(sd, bp.c_str(), connect.authorizer_len) < 0) {
+      if (tcp_read(sd, bp.c_str(), connect.authorizer_len, messenger->timeout) < 0) {
         dout(10) << "accept couldn't read connect authorizer" << dendl;
         goto fail_unlocked;
       }
@@ -957,7 +957,7 @@ int SimpleMessenger::Pipe::connect()
 
   // verify banner
   // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
-  rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
+  rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), messenger->timeout);
   if (rc < 0) {
     dout(2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
     goto fail;
@@ -983,7 +983,7 @@ int SimpleMessenger::Pipe::connect()
     bufferptr p(sizeof(paddr) * 2);
     addrbl.push_back(p);
   }
-  rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
+  rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout);
   if (rc < 0) {
     dout(2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
     goto fail;
@@ -1069,7 +1069,7 @@ int SimpleMessenger::Pipe::connect()
 
     dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
     ceph_msg_connect_reply reply;
-    if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) {
+    if (tcp_read(sd, (char*)&reply, sizeof(reply), messenger->timeout) < 0) {
       dout(2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl;
       goto fail;
     }
@@ -1085,7 +1085,7 @@ int SimpleMessenger::Pipe::connect()
     if (reply.authorizer_len) {
       dout(10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
       bufferptr bp = buffer::create(reply.authorizer_len);
-      if (tcp_read(sd, bp.c_str(), reply.authorizer_len) < 0) {
+      if (tcp_read(sd, bp.c_str(), reply.authorizer_len, messenger->timeout) < 0) {
         dout(10) << "connect couldn't read connect authorizer_reply" << dendl;
        goto fail;
       }
@@ -1460,7 +1460,7 @@ void SimpleMessenger::Pipe::reader()
     char buf[80];
     char tag = -1;
     dout(20) << "reader reading tag..." << dendl;
-    int rc = tcp_read(sd, (char*)&tag, 1);
+    int rc = tcp_read(sd, (char*)&tag, 1, messenger->timeout);
     if (rc < 0) {
       pipe_lock.Lock();
       dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@@ -1478,7 +1478,7 @@ void SimpleMessenger::Pipe::reader()
     if (tag == CEPH_MSGR_TAG_ACK) {
       dout(20) << "reader got ACK" << dendl;
       ceph_le64 seq;
-      int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
+      int rc = tcp_read( sd, (char*)&seq, sizeof(seq), messenger->timeout);
       pipe_lock.Lock();
       if (rc < 0) {
        dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@@ -1708,12 +1708,12 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   __u32 header_crc;
   
   if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
-    if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
+    if (tcp_read( sd, (char*)&header, sizeof(header), messenger->timeout ) < 0)
       return -1;
     header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
   } else {
     ceph_msg_header_old oldheader;
-    if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
+    if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), messenger->timeout ) < 0)
       return -1;
     // this is fugly
     memcpy(&header, &oldheader, sizeof(header));
@@ -1765,7 +1765,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   front_len = header.front_len;
   if (front_len) {
     bufferptr bp = buffer::create(front_len);
-    if (tcp_read( sd, bp.c_str(), front_len ) < 0) 
+    if (tcp_read( sd, bp.c_str(), front_len, messenger->timeout ) < 0)
       goto out_dethrottle;
     front.push_back(bp);
     dout(20) << "reader got front " << front.length() << dendl;
@@ -1775,7 +1775,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   middle_len = header.middle_len;
   if (middle_len) {
     bufferptr bp = buffer::create(middle_len);
-    if (tcp_read( sd, bp.c_str(), middle_len ) < 0) 
+    if (tcp_read( sd, bp.c_str(), middle_len, messenger->timeout ) < 0)
       goto out_dethrottle;
     middle.push_back(bp);
     dout(20) << "reader got middle " << middle.length() << dendl;
@@ -1792,7 +1792,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
       int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
                     (unsigned)left);
       bufferptr bp = buffer::create(head);
-      if (tcp_read( sd, bp.c_str(), head ) < 0) 
+      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
        goto out_dethrottle;
       data.push_back(bp);
       left -= head;
@@ -1803,7 +1803,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
     int middle = left & PAGE_MASK;
     if (middle > 0) {
       bufferptr bp = buffer::create_page_aligned(middle);
-      if (tcp_read( sd, bp.c_str(), middle ) < 0) 
+      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
        goto out_dethrottle;
       data.push_back(bp);
       left -= middle;
@@ -1812,7 +1812,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
 
     if (left) {
       bufferptr bp = buffer::create(left);
-      if (tcp_read( sd, bp.c_str(), left ) < 0) 
+      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
        goto out_dethrottle;
       data.push_back(bp);
       dout(20) << "reader got data tail " << left << dendl;
@@ -1820,7 +1820,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   }
 
   // footer
-  if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
+  if (tcp_read(sd, (char*)&footer, sizeof(footer), messenger->timeout) < 0)
     goto out_dethrottle;
   
   aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
index 0b9b46ee76552f5d4e7e7de60df4b8f7aa1072ab..b4a0ef334cfa232309e305c0493f1a142f62e58a 100644 (file)
@@ -214,6 +214,7 @@ private:
       out_seq(0), in_seq(0), in_seq_acked(0),
       reader_thread(this), writer_thread(this) {
       connection_state->pipe = get();
+      messenger->timeout = g_conf.ms_tcp_read_timeout * 1000; //convert to ms
     }
     ~Pipe() {
       for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
@@ -564,6 +565,7 @@ private:
   void dispatch_entry();
 
   SimpleMessenger *messenger; //hack to make dout macro work, will fix
+  int timeout;
 
 public:
   SimpleMessenger() :
index b0dbf6073a9a2b984bdfcc35fb8c0a977f9073ea..bab1a7651ff45edbe93bd3f4f56987469008cc40 100644 (file)
@@ -8,14 +8,14 @@
  * tcp crap
  */
 
-int tcp_read(int sd, char *buf, int len) {
+int tcp_read(int sd, char *buf, int len, int timeout) {
   if (sd < 0)
     return -1;
   struct pollfd pfd;
   pfd.fd = sd;
   pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
   while (len > 0) {
-    if (poll(&pfd, 1, -1) < 0)
+    if (poll(&pfd, 1, timeout) <= 0)
       return -1;
 
     if (!(pfd.revents & POLLIN))
index b1c636cba7ab30fa437771f008e02339080ea361..97ef3a90a86dcb6c55285101dcac256d2d036310 100644 (file)
@@ -25,7 +25,7 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss)
             << buf << ':' << serv;
 }
 
-extern int tcp_read(int sd, char *buf, int len);
+extern int tcp_read(int sd, char *buf, int len, int timeout=0);
 extern int tcp_write(int sd, const char *buf, int len);
 
 inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) {