]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: various locking fixes
authorSage Weil <sage@newdream.net>
Thu, 13 Nov 2008 01:05:51 +0000 (17:05 -0800)
committerSage Weil <sage@newdream.net>
Thu, 13 Nov 2008 20:45:36 +0000 (12:45 -0800)
Fixes some deadlock problems.

We also avoid the use of newsd, and do a sync join() on the reader
thread when killing a pipe, to avoid leaking sockets.

Also fix a 'bad seq #' error due to the reader not rechecking the
pipe state after retaking the lock after reading some data.

src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 84c2cba7c476796202a24a4addd8610d0804b273..10780eb69913a3136ac44543d2627db537ac91cf 100644 (file)
@@ -47,6 +47,12 @@ static ostream& _prefix() {
 #include "tcp.cc"
 
 
+// help find socket resource leaks
+int sockopen = 0;
+#define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
+#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
+
+
 Rank rank;
 
 #ifdef DARWIN
@@ -72,8 +78,11 @@ void Rank::sigint()
   derr(0) << "got control-c, exiting" << dendl;
   
   // force close listener socket
-  if (accepter.listen_sd >= 0) 
+  if (accepter.listen_sd >= 0) {
     ::close(accepter.listen_sd);
+    accepter.listen_sd = -1;
+    closed_socket();
+  }
 
   // force close all pipe sockets, too
   for (hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.begin();
@@ -106,6 +115,7 @@ int Rank::Accepter::bind(int64_t force_nonce)
            << strerror(errno) << dendl;
     return -errno;
   }
+  opened_socket();
 
   int on = 1;
   ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
@@ -194,6 +204,7 @@ void *Rank::Accepter::entry()
     socklen_t slen = sizeof(addr);
     int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
     if (sd >= 0) {
+      opened_socket();
       dout(10) << "accepted incoming on sd " << sd << dendl;
       
       // disable Nagle algorithm?
@@ -222,6 +233,7 @@ void *Rank::Accepter::entry()
   if (listen_sd >= 0) {
     ::close(listen_sd);
     listen_sd = -1;
+    closed_socket();
   }
   dout(10) << "accepter stopping" << dendl;
   return 0;
@@ -261,6 +273,7 @@ void Rank::reaper()
     pipes.erase(p);
     p->join();
     dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
+    assert(p->sd < 0);
     delete p;
     dout(10) << "reaper deleted pipe " << p << dendl;
   }
@@ -773,7 +786,11 @@ void Rank::mark_down(entity_addr_t addr)
 #define dout_prefix _pipe_prefix()
 ostream& Rank::Pipe::_pipe_prefix() {
   return *_dout << dbeginl << pthread_self()
-               << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ").";
+               << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this
+               << " sd=" << sd
+               << " pgs=" << peer_global_seq
+               << " cs=" << connect_seq
+               << ").";
 }
 
 int Rank::Pipe::accept()
@@ -845,7 +862,7 @@ int Rank::Pipe::accept()
     rc = tcp_read(sd, (char*)&connect, sizeof(connect));
     if (rc < 0) {
       dout(10) << "accept couldn't read connect" << dendl;
-      goto fail;
+      goto fail_unlocked;
     }
     dout(20) << "accept got peer connect_seq " << connect.connect_seq
             << " global_seq " << connect.global_seq
@@ -874,6 +891,9 @@ int Rank::Pipe::accept()
        if (tcp_write(sd, (char*)&gseq, sizeof(gseq)) < 0)
          goto fail;
        continue;
+      } else {
+       dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+                << " <= " << connect.global_seq << ", looks ok" << dendl;
       }
       
       if (existing->policy.lossy_tx) {
@@ -895,6 +915,10 @@ int Rank::Pipe::accept()
        goto fail;
       }
 
+      dout(-10) << "accept connect_seq " << connect.connect_seq
+               << " vs existing " << existing->connect_seq
+               << " state " << existing->state << dendl;
+
       if (connect.connect_seq < existing->connect_seq) {
        if (connect.connect_seq == 0) {
          dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
@@ -1031,6 +1055,8 @@ int Rank::Pipe::accept()
 
 
  fail:
+  rank.lock.Unlock();
+ fail_unlocked:
   lock.Lock();
   state = STATE_CLOSED;
   fault();
@@ -1046,13 +1072,16 @@ int Rank::Pipe::connect()
   if (sd >= 0) {
     ::close(sd);
     sd = -1;
+    closed_socket();
   }
   __u32 cseq = connect_seq;
   __u32 gseq = rank.get_global_seq();
 
+  // stop reader thrad
+  join_reader();
+
   lock.Unlock();
   
-  int newsd;
   char tag = -1;
   int rc;
   struct sockaddr_in myAddr;
@@ -1063,23 +1092,29 @@ int Rank::Pipe::connect()
   entity_addr_t paddr;
 
   // create socket?
-  newsd = ::socket(AF_INET, SOCK_STREAM, 0);
-  if (newsd < 0) {
+  sd = ::socket(AF_INET, SOCK_STREAM, 0);
+  if (sd < 0) {
     dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl;
     assert(0);
     goto fail;
   }
+  opened_socket();
   
   // bind any port
   myAddr.sin_family = AF_INET;
   myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
   myAddr.sin_port = htons( 0 );    
-  rc = ::bind(newsd, (struct sockaddr *) &myAddr, sizeof(myAddr));
-  assert(rc>=0);
+  dout(10) << "binding to " << myAddr << dendl;
+  rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr));
+  if (rc < 0) {
+    dout(2) << "bind error " << myAddr
+            << ", " << errno << ": " << strerror(errno) << dendl;
+    goto fail;
+  }
 
   // connect!
   dout(10) << "connecting to " << peer_addr.ipaddr << dendl;
-  rc = ::connect(newsd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
+  rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
   if (rc < 0) {
     dout(2) << "connect error " << peer_addr.ipaddr
             << ", " << errno << ": " << strerror(errno) << dendl;
@@ -1089,14 +1124,14 @@ int Rank::Pipe::connect()
   // disable Nagle algorithm?
   if (g_conf.ms_tcp_nodelay) {
     int flag = 1;
-    int r = ::setsockopt(newsd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
+    int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
     if (r < 0) 
       dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
   }
 
   // verify banner
   // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
-  rc = tcp_read(newsd, (char*)&banner, strlen(CEPH_BANNER));
+  rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
   if (rc < 0) {
     dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
     goto fail;
@@ -1112,18 +1147,18 @@ int Rank::Pipe::connect()
   msg.msg_iov = msgvec;
   msg.msg_iovlen = 1;
   msglen = msgvec[0].iov_len;
-  if (do_sendmsg(newsd, &msg, msglen)) {
+  if (do_sendmsg(sd, &msg, msglen)) {
     dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
     goto fail;
   }
 
   // identify peer
-  rc = tcp_read(newsd, (char*)&paddr, sizeof(paddr));
+  rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
   if (rc < 0) {
     dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
     goto fail;
   }
-  dout(20) << "connect read peer addr " << paddr << " on socket " << newsd << dendl;
+  dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
   if (!peer_addr.is_local_to(paddr)) {
     if (paddr.ipaddr.sin_addr.s_addr == 0 &&
        peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port) {
@@ -1143,7 +1178,7 @@ int Rank::Pipe::connect()
   msg.msg_iov = msgvec;
   msg.msg_iovlen = 1;
   msglen = msgvec[0].iov_len;
-  if (do_sendmsg(newsd, &msg, msglen)) {
+  if (do_sendmsg(sd, &msg, msglen)) {
     dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
     goto fail;
   }
@@ -1164,12 +1199,12 @@ int Rank::Pipe::connect()
     msglen = msgvec[0].iov_len;
 
     dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl;
-    if (do_sendmsg(newsd, &msg, msglen)) {
+    if (do_sendmsg(sd, &msg, msglen)) {
       dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
       goto fail;
     }
     dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl;
-    if (tcp_read(newsd, &tag, 1) < 0) {
+    if (tcp_read(sd, &tag, 1) < 0) {
       dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
       goto fail;
     }
@@ -1189,7 +1224,7 @@ int Rank::Pipe::connect()
       continue;
     }
     if (tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
-      int rc = tcp_read(newsd, (char*)&gseq, sizeof(gseq));
+      int rc = tcp_read(sd, (char*)&gseq, sizeof(gseq));
       if (rc < 0) {
        dout(0) << "connect got RETRY_GLOBAL tag but couldn't read gseq" << dendl;
        goto fail;
@@ -1205,7 +1240,7 @@ int Rank::Pipe::connect()
       continue;
     }
     if (tag == CEPH_MSGR_TAG_RETRY_SESSION) {
-      int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
+      int rc = tcp_read(sd, (char*)&cseq, sizeof(cseq));
       if (rc < 0) {
        dout(0) << "connect got RETRY_SESSION tag but couldn't read cseq" << dendl;
        goto fail;
@@ -1242,7 +1277,7 @@ int Rank::Pipe::connect()
 
       // read flags
       __u8 flags;
-      if (tcp_read(newsd, (char *)&flags, 1) < 0) {
+      if (tcp_read(sd, (char *)&flags, 1) < 0) {
        dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
        goto fail;
       }
@@ -1251,7 +1286,6 @@ int Rank::Pipe::connect()
 
       // hooray!
       state = STATE_OPEN;
-      sd = newsd;
       connect_seq = cseq+1;
       first_fault = last_attempt = utime_t();
       dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
@@ -1276,8 +1310,6 @@ int Rank::Pipe::connect()
     dout(3) << "connect fault, but state != connecting, stopping" << dendl;
 
  stop_locked:
-  if (newsd >= 0) 
-    ::close(newsd);
   return -1;
 }
 
@@ -1301,11 +1333,16 @@ void Rank::Pipe::unregister_pipe()
   }
 }
 
-void Rank::Pipe::fault(bool onconnect)
+void Rank::Pipe::fault(bool onconnect, bool onread)
 {
   assert(lock.is_locked());
   cond.Signal();
 
+  if (onread && state == STATE_CONNECTING) {
+    dout(10) << "fault already connecting, reader shutting down" << dendl;
+    return;
+  }
+
   if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl;
 
   if (state == STATE_CLOSED ||
@@ -1314,9 +1351,11 @@ void Rank::Pipe::fault(bool onconnect)
     return;
   }
 
-  if (sd >= 0)
+  if (sd >= 0) {
     ::close(sd);
-  sd = -1;
+    sd = -1;
+    closed_socket();
+  }
 
   // lossy channel?
   if (policy.lossy_tx) {
@@ -1431,9 +1470,11 @@ void Rank::Pipe::stop()
 
   cond.Signal();
   state = STATE_CLOSED;
-  if (sd >= 0)
+  if (sd >= 0) {
     ::close(sd);
-  sd = -1;
+    sd = -1;
+    closed_socket();
+  }
 }
 
 
@@ -1461,12 +1502,12 @@ void Rank::Pipe::reader()
   lock.Lock();
 
   // loop.
-  while (state != STATE_CLOSED) {
+  while (state != STATE_CLOSED &&
+        state != STATE_CONNECTING) {
     assert(lock.is_locked());
 
     // sleep if (re)connecting
-    if (state == STATE_CONNECTING ||
-       state == STATE_STANDBY) {
+    if (state == STATE_STANDBY) {
       dout(20) << "reader sleeping during reconnect|standby" << dendl;
       cond.Wait(lock);
       continue;
@@ -1480,7 +1521,7 @@ void Rank::Pipe::reader()
     if (rc < 0) {
       lock.Lock();
       dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
-      fault();
+      fault(false, true);
       continue;
     }
 
@@ -1492,8 +1533,8 @@ void Rank::Pipe::reader()
       lock.Lock();
       if (rc < 0) {
        dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
-       fault();
-      } else {
+       fault(false, true);
+      } else if (state != STATE_CLOSED) {
        dout(15) << "reader got ack seq " << seq << dendl;
        // trim sent list
        while (!sent.empty() &&
@@ -1514,12 +1555,16 @@ void Rank::Pipe::reader()
       if (!m) {
        derr(2) << "reader read null message, " << strerror(errno) << dendl;
        lock.Lock();
-       fault();
+       fault(false, true);
        continue;
       }
 
       // note received seq#
       lock.Lock();
+      if (state == STATE_CLOSED ||
+         state == STATE_CONNECTING)
+       continue;
+
       if (m->get_seq() <= in_seq) {
        dout(-10) << "reader got old message "
                  << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
@@ -1539,7 +1584,7 @@ void Rank::Pipe::reader()
        derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
                << " for " << *m << " from " << m->get_source() << dendl;
        assert(in_seq == m->get_seq()); // for now!
-       fault();
+       fault(false, true);
        delete m;
        continue;
       }
@@ -1600,7 +1645,7 @@ void Rank::Pipe::reader()
     else {
       dout(0) << "reader bad tag " << (int)tag << dendl;
       lock.Lock();
-      fault();
+      fault(false, true);
     }
   }
 
@@ -1608,13 +1653,18 @@ void Rank::Pipe::reader()
   // reap?
   bool reap = false;
   reader_running = false;
-  if (!writer_running) reap = true;
+  if (!writer_running)
+    reap = true;
 
   lock.Unlock();
 
   if (reap) {
     dout(10) << "reader queueing for reap" << dendl;
-    if (sd >= 0) ::close(sd);
+    if (sd >= 0) {
+      ::close(sd);
+      sd = -1;
+      closed_socket();
+    }
     rank.lock.Lock();
     {
       rank.pipe_reap_queue.push_back(this);
@@ -1730,7 +1780,11 @@ void Rank::Pipe::writer()
   
   if (reap) {
     dout(10) << "writer queueing for reap" << dendl;
-    if (sd >= 0) ::close(sd);
+    if (sd >= 0) {
+      ::close(sd);
+      sd = -1;
+      closed_socket();
+    }
     rank.lock.Lock();
     {
       rank.pipe_reap_queue.push_back(this);
index a2f5643d6e0858c2fcfb2712c0f15e3ddd477c78..6e5ba12cc807c65033b24d68d42d7ac45393c409 100644 (file)
@@ -152,7 +152,7 @@ private:
     int do_sendmsg(int sd, struct msghdr *msg, int len);
     int write_ack(unsigned s);
 
-    void fault(bool silent=false);
+    void fault(bool silent=false, bool reader=false);
     void fail();
 
     void was_session_reset();
@@ -195,6 +195,15 @@ private:
       writer_running = true;
       writer_thread.create();
     }
+    void join_reader() {
+      if (!reader_running)
+       return;
+      cond.Signal();
+      reader_thread.kill(SIGUSR1);
+      lock.Unlock();
+      reader_thread.join();
+      lock.Lock();
+    }
 
     // public constructors
     static const Pipe& Server(int s);