From 82f8bcddb5fa09913eb477ee26c71d6b4bb8d97c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 28 Dec 2012 17:20:43 -0800 Subject: [PATCH] msg/Pipe: use state_closed atomic_t for _lookup_pipe We shouldn't look at Pipe::state in SimpleMessenger::_lookup_pipe() without holding pipe_lock. Instead, use an atomic that we set to non-zero only when transitioning to the terminal STATE_CLOSED state. Signed-off-by: Sage Weil --- src/msg/Pipe.cc | 23 ++++++++++++++++++----- src/msg/Pipe.h | 1 + src/msg/SimpleMessenger.h | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index e0e0ca01738fa..8e0e0dbfa9181 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -220,6 +220,7 @@ int Pipe::accept() if (rc < 0) { ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; state = STATE_CLOSED; + state_closed.set(1); return -1; } @@ -237,6 +238,7 @@ int Pipe::accept() char buf[80]; ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; state = STATE_CLOSED; + state_closed.set(1); return -1; } ::encode(socket_addr, addrs); @@ -245,6 +247,7 @@ int Pipe::accept() if (rc < 0) { ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; state = STATE_CLOSED; + state_closed.set(1); return -1; } @@ -255,12 +258,14 @@ int Pipe::accept() if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) { ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; state = STATE_CLOSED; + state_closed.set(1); return -1; } if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { banner[strlen(CEPH_BANNER)] = 0; ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; state = STATE_CLOSED; + state_closed.set(1); return -1; } bufferlist addrbl; @@ -271,6 +276,7 @@ int Pipe::accept() if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) { ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; state = STATE_CLOSED; + state_closed.set(1); return -1; } { @@ -652,12 +658,14 @@ int Pipe::accept() if (state != STATE_CLOSED) { bool queued = is_queued(); ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl; - if (queued) + if (queued) { state = policy.server ? STATE_STANDBY : STATE_CONNECTING; - else if (replaced) + } else if (replaced) { state = STATE_STANDBY; - else + } else { state = STATE_CLOSED; + state_closed.set(1); + } fault(); if (queued || replaced) start_writer(); @@ -677,6 +685,7 @@ int Pipe::accept() pipe_lock.Lock(); state = STATE_CLOSED; + state_closed.set(1); fault(); pipe_lock.Unlock(); return -1; @@ -1228,6 +1237,7 @@ void Pipe::stop() ldout(msgr->cct,10) << "stop" << dendl; assert(pipe_lock.is_locked()); state = STATE_CLOSED; + state_closed.set(1); cond.Signal(); shutdown_socket(); } @@ -1349,10 +1359,12 @@ void Pipe::reader() else if (tag == CEPH_MSGR_TAG_CLOSE) { ldout(msgr->cct,20) << "reader got CLOSE" << dendl; pipe_lock.Lock(); - if (state == STATE_CLOSING) + if (state == STATE_CLOSING) { state = STATE_CLOSED; - else + state_closed.set(1); + } else { state = STATE_CLOSING; + } cond.Signal(); break; } @@ -1401,6 +1413,7 @@ void Pipe::writer() ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl; char tag = CEPH_MSGR_TAG_CLOSE; state = STATE_CLOSED; + state_closed.set(1); pipe_lock.Unlock(); if (sd) { int r = ::write(sd, &tag, 1); diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 28b4864c33ca4..8f3ba641cf677 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -142,6 +142,7 @@ class DispatchQueue; Mutex pipe_lock; int state; + atomic_t state_closed; // non-zero iff state = STATE_CLOSED // session_security handles any signatures or encryptions required for this pipe's msgs. PLR diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index d62bb17c1cc36..fb392e8f74165 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -534,7 +534,7 @@ private: if (p == rank_pipe.end()) return NULL; // see lock cribbing in Pipe::fault() - if (p->second->state == Pipe::STATE_CLOSED) + if (p->second->state_closed.read()) return NULL; return p->second; } -- 2.39.5