]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: use state_closed atomic_t for _lookup_pipe
authorSage Weil <sage@inktank.com>
Sat, 29 Dec 2012 01:20:43 +0000 (17:20 -0800)
committerSage Weil <sage@inktank.com>
Sat, 29 Dec 2012 01:21:01 +0000 (17:21 -0800)
We shouldn't look at Pipe::state in SimpleMessenger::_lookup_pipe() without
holding pipe_lock.  Instead, use an atomic that we set to non-zero only
when transitioning to the terminal STATE_CLOSED state.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/Pipe.cc
src/msg/Pipe.h
src/msg/SimpleMessenger.h

index e0e0ca01738fa7e513aa3b762910a514630f064e..8e0e0dbfa918178ad34f08d79b71a04b5d3c7ede 100644 (file)
@@ -220,6 +220,7 @@ int Pipe::accept()
   if (rc < 0) {
     ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
     state = STATE_CLOSED;
+    state_closed.set(1);
     return -1;
   }
 
@@ -237,6 +238,7 @@ int Pipe::accept()
     char buf[80];
     ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
     state = STATE_CLOSED;
+    state_closed.set(1);
     return -1;
   }
   ::encode(socket_addr, addrs);
@@ -245,6 +247,7 @@ int Pipe::accept()
   if (rc < 0) {
     ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
     state = STATE_CLOSED;
+    state_closed.set(1);
     return -1;
   }
 
@@ -255,12 +258,14 @@ int Pipe::accept()
   if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
     ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
     state = STATE_CLOSED;
+    state_closed.set(1);
     return -1;
   }
   if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
     banner[strlen(CEPH_BANNER)] = 0;
     ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
     state = STATE_CLOSED;
+    state_closed.set(1);
     return -1;
   }
   bufferlist addrbl;
@@ -271,6 +276,7 @@ int Pipe::accept()
   if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
     ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
     state = STATE_CLOSED;
+    state_closed.set(1);
     return -1;
   }
   {
@@ -652,12 +658,14 @@ int Pipe::accept()
   if (state != STATE_CLOSED) {
     bool queued = is_queued();
     ldout(msgr->cct, 10) << "  queued = " << (int)queued << dendl;
-    if (queued)
+    if (queued) {
       state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
-    else if (replaced)
+    } else if (replaced) {
       state = STATE_STANDBY;
-    else
+    } else {
       state = STATE_CLOSED;
+      state_closed.set(1);
+    }
     fault();
     if (queued || replaced)
       start_writer();
@@ -677,6 +685,7 @@ int Pipe::accept()
 
   pipe_lock.Lock();
   state = STATE_CLOSED;
+  state_closed.set(1);
   fault();
   pipe_lock.Unlock();
   return -1;
@@ -1228,6 +1237,7 @@ void Pipe::stop()
   ldout(msgr->cct,10) << "stop" << dendl;
   assert(pipe_lock.is_locked());
   state = STATE_CLOSED;
+  state_closed.set(1);
   cond.Signal();
   shutdown_socket();
 }
@@ -1349,10 +1359,12 @@ void Pipe::reader()
     else if (tag == CEPH_MSGR_TAG_CLOSE) {
       ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
       pipe_lock.Lock();
-      if (state == STATE_CLOSING)
+      if (state == STATE_CLOSING) {
        state = STATE_CLOSED;
-      else
+       state_closed.set(1);
+      } else {
        state = STATE_CLOSING;
+      }
       cond.Signal();
       break;
     }
@@ -1401,6 +1413,7 @@ void Pipe::writer()
       ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
       char tag = CEPH_MSGR_TAG_CLOSE;
       state = STATE_CLOSED;
+      state_closed.set(1);
       pipe_lock.Unlock();
       if (sd) {
        int r = ::write(sd, &tag, 1);
index 28b4864c33ca40307c04962aec8b70eaa67ba0da..8f3ba641cf6779780192b7e853bdad6d3710c582 100644 (file)
@@ -142,6 +142,7 @@ class DispatchQueue;
     
     Mutex pipe_lock;
     int state;
+    atomic_t state_closed; // non-zero iff state = STATE_CLOSED
 
     // session_security handles any signatures or encryptions required for this pipe's msgs. PLR
 
index d62bb17c1cc362e272c83dc55e2f15ab66bb03a7..fb392e8f74165e5f13150b8b6ac55f1befe83246 100644 (file)
@@ -534,7 +534,7 @@ private:
     if (p == rank_pipe.end())
       return NULL;
     // see lock cribbing in Pipe::fault()
-    if (p->second->state == Pipe::STATE_CLOSED)
+    if (p->second->state_closed.read())
       return NULL;
     return p->second;
   }