]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: only close socket on reconnect or shutdown
authorSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 21:09:24 +0000 (13:09 -0800)
committerSage Weil <sage@newdream.net>
Fri, 12 Nov 2010 22:55:11 +0000 (14:55 -0800)
We can't modify 'sd' or (more importnatly) close sd while any other thread
might be using it, or else we might race with an open and they might end
up using someone else's fd.

Take care to _only_ close(sd) in connect(), when the reader thread is
stopped, or when reaping the connection.

Signed-off-by: Sage Weil <sage@newdream.net>
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 748373ff784201d93e56d35f18922dd798b0308f..2da27e166cb50d85ecc4e67d68eabb5c13d3ff41 100644 (file)
@@ -45,14 +45,6 @@ static ostream& _prefix(SimpleMessenger *messenger) {
 
 #include "tcp.cc"
 
-
-// help find socket resource leaks
-//static int sockopen = 0;
-#define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
-#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
-
-
-
 /********************************************
  * Accepter
  */
@@ -80,7 +72,6 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce, entity_addr_t &bind_add
         << strerror_r(errno, buf, sizeof(buf)) << std::endl;
     return -errno;
   }
-  opened_socket();
 
   // reuse addr+port when possible
   int on = 1;
@@ -201,7 +192,6 @@ void *SimpleMessenger::Accepter::entry()
     int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
     if (sd >= 0) {
       errors = 0;
-      opened_socket();
       dout(10) << "accepted incoming on sd " << sd << dendl;
       
       // disable Nagle algorithm?
@@ -236,7 +226,6 @@ void *SimpleMessenger::Accepter::entry()
   if (listen_sd >= 0) {
     ::close(listen_sd);
     listen_sd = -1;
-    closed_socket();
   }
   dout(10) << "accepter stopping" << dendl;
   return 0;
@@ -911,11 +900,6 @@ int SimpleMessenger::Pipe::connect()
   dout(10) << "connect " << connect_seq << dendl;
   assert(pipe_lock.is_locked());
 
-  if (sd >= 0) {
-    ::close(sd);
-    sd = -1;
-    closed_socket();
-  }
   __u32 cseq = connect_seq;
   __u32 gseq = messenger->get_global_seq();
 
@@ -935,6 +919,10 @@ int SimpleMessenger::Pipe::connect()
   AuthAuthorizer *authorizer = NULL;
   bufferlist addrbl, myaddrbl;
 
+  // close old socket.  this is safe because we stopped the reader thread above.
+  if (sd >= 0)
+    ::close(sd);
+
   // create socket?
   sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
   if (sd < 0) {
@@ -943,7 +931,6 @@ int SimpleMessenger::Pipe::connect()
     assert(0);
     goto fail;
   }
-  opened_socket();
 
   char buf[80];
 
@@ -1345,11 +1332,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
     return;
   }
 
-  if (sd >= 0) {
-    ::close(sd);
-    sd = -1;
-    closed_socket();
-  }
+  shutdown_socket();
 
   // lossy channel?
   if (policy.lossy) {
@@ -1435,11 +1418,7 @@ void SimpleMessenger::Pipe::stop()
   assert(pipe_lock.is_locked());
   state = STATE_CLOSED;
   cond.Signal();
-  if (sd >= 0) {
-    ::shutdown(sd, SHUT_RDWR);
-    ::close(sd);
-    sd = -1;
-  }
+  shutdown_socket();
 }
 
 
@@ -1578,7 +1557,6 @@ void SimpleMessenger::Pipe::writer()
   char buf[80];
 
   pipe_lock.Lock();
-
   while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
     dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl;
 
@@ -1688,15 +1666,8 @@ void SimpleMessenger::Pipe::writer()
 void SimpleMessenger::Pipe::unlock_maybe_reap()
 {
   if (!reader_running && !writer_running) {
-    // close
-    if (sd >= 0) {
-      ::close(sd);
-      sd = -1;
-      closed_socket();
-    }
-
+    shutdown_socket();
     pipe_lock.Unlock();
-
     messenger->queue_reap(this);
   } else {
     pipe_lock.Unlock();
@@ -2168,8 +2139,9 @@ void SimpleMessenger::reaper()
     assert(pipes.count(p));
     pipes.erase(p);
     p->join();
+    if (p->sd >= 0)
+      ::close(p->sd);
     dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
-    assert(p->sd < 0);
     if (p->connection_state)
       p->connection_state->clear_pipe();
     p->put();
index b1a0415aa09ebbece0150692d72a4248e6518e7a..88aefefe62166ed79467ecfb9ab9df43c453d391 100644 (file)
@@ -205,7 +205,8 @@ private:
   public:
     Pipe(SimpleMessenger *r, int st) : 
       messenger(r),
-      sd(-1), peer_type(-1),
+      sd(-1),
+      peer_type(-1),
       pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
       state(st), 
       connection_state(new Connection),
@@ -381,8 +382,9 @@ private:
     void requeue_sent(uint64_t max_acked=0);
     void discard_queue();
 
-    void force_close() {
-      if (sd >= 0) ::close(sd);
+    void shutdown_socket() {
+      if (sd >= 0)
+        ::shutdown(sd, SHUT_RDWR);
     }
   };