]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: fix race on Pipe removal from hash
authorSage Weil <sage@inktank.com>
Sun, 23 Dec 2012 17:22:18 +0000 (09:22 -0800)
committerSage Weil <sage@inktank.com>
Sat, 29 Dec 2012 01:21:00 +0000 (17:21 -0800)
When a pipe is faulting and shutting down, we have to drop pipe_lock to
take msgr lock and then remove the entry.  The Pipe in this case will
have STATE_CLOSED.  Handle this case in all places we do a lookup on
the rank_pipe hash so that we effectively ignore entries that are
CLOSED.

This fixes a race introduced by the previous commit where we won't use
the CLOSED pipe and try to register a new one, but the old one is still
registered.

See bug #3675.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/Pipe.cc
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 62db0e8dff9f481c2a3c452f824e9dc2eaf6db1b..4ad659d6607718aaa2fa8b9a7268aac175716d14 100644 (file)
@@ -40,6 +40,7 @@
 ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
   return *_dout << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
                << " sd=" << sd << " :" << port
+               << " s=" << state
                << " pgs=" << peer_global_seq
                << " cs=" << connect_seq
                << " l=" << policy.lossy
@@ -379,8 +380,8 @@ int Pipe::accept()
 
     
     // existing?
-    if (msgr->rank_pipe.count(peer_addr)) {
-      existing = msgr->rank_pipe[peer_addr];
+    existing = msgr->_lookup_pipe(peer_addr);
+    if (existing) {
       existing->pipe_lock.Lock();
 
       if (connect.global_seq < existing->peer_global_seq) {
@@ -601,13 +602,13 @@ int Pipe::accept()
 
   rc = tcp_write((char*)&reply, sizeof(reply));
   if (rc < 0) {
-    goto fail_unlocked;
+    goto fail_registered;
   }
 
   if (reply.authorizer_len) {
     rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
     if (rc < 0) {
-      goto fail_unlocked;
+      goto fail_registered;
     }
   }
 
@@ -615,11 +616,11 @@ int Pipe::accept()
     uint64_t newly_acked_seq = 0;
     if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
       ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
-      goto fail_unlocked;
+      goto fail_registered;
     }
     if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
       ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
-      goto fail_unlocked;
+      goto fail_registered;
     }
     requeue_sent(newly_acked_seq);
   }
@@ -636,6 +637,8 @@ int Pipe::accept()
 
   return 0;   // success.
 
+ fail_registered:
+  ldout(msgr->cct, 10) << "accept fault after register" << dendl;
  fail_unlocked:
   pipe_lock.Lock();
   if (state != STATE_CLOSED) {
@@ -1001,17 +1004,18 @@ void Pipe::register_pipe()
 {
   ldout(msgr->cct,10) << "register_pipe" << dendl;
   assert(msgr->lock.is_locked());
-  assert(msgr->rank_pipe.count(peer_addr) == 0);
+  Pipe *existing = msgr->_lookup_pipe(peer_addr);
+  assert(existing == NULL);
   msgr->rank_pipe[peer_addr] = this;
 }
 
 void Pipe::unregister_pipe()
 {
   assert(msgr->lock.is_locked());
-  if (msgr->rank_pipe.count(peer_addr) &&
-      msgr->rank_pipe[peer_addr] == this) {
+  hash_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
+  if (p != msgr->rank_pipe.end() && p->second == this) {
     ldout(msgr->cct,10) << "unregister_pipe" << dendl;
-    msgr->rank_pipe.erase(peer_addr);
+    msgr->rank_pipe.erase(p);
   } else {
     ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
   }
index 3d2eb79b37656836e14db64518aa4fabd04cd52f..0e8ed27c7f440d7e0259f743b02d18013695c108 100644 (file)
@@ -111,7 +111,7 @@ int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest,
   }
 
   lock.Lock();
-  Pipe *pipe = rank_pipe.count(dest.addr) ? rank_pipe[ dest.addr ] : NULL;
+  Pipe *pipe = _lookup_pipe(dest.addr);
   submit_message(m, (pipe ? pipe->connection_state : NULL),
                  dest.addr, dest.name.type(), lazy);
   lock.Unlock();
@@ -366,10 +366,8 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
 
   // remote
   while (true) {
-    Pipe *pipe = NULL;
-    hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(dest.addr);
-    if (p != rank_pipe.end()) {
-      pipe = p->second;
+    Pipe *pipe = _lookup_pipe(dest.addr);
+    if (pipe) {
       ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
     } else {
       pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
@@ -446,10 +444,9 @@ int SimpleMessenger::send_keepalive(const entity_inst_t& dest)
     // local?
     if (my_inst.addr != dest_addr) {
       // remote.
-      Pipe *pipe = 0;
-      if (rank_pipe.count( dest_proc_addr )) {
+      Pipe *pipe = _lookup_pipe(dest_proc_addr);
+      if (pipe) {
         // connected?
-        pipe = rank_pipe[ dest_proc_addr ];
        pipe->pipe_lock.Lock();
        ldout(cct,20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl;
        pipe->_send_keepalive();
@@ -567,8 +564,8 @@ void SimpleMessenger::mark_down_all()
 void SimpleMessenger::mark_down(const entity_addr_t& addr)
 {
   lock.Lock();
-  if (rank_pipe.count(addr)) {
-    Pipe *p = rank_pipe[addr];
+  Pipe *p = _lookup_pipe(addr);
+  if (p) {
     ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
     p->unregister_pipe();
     p->pipe_lock.Lock();
index ca27cae7b8d89ad320929c7e3e5743cb91df895a..d62bb17c1cc362e272c83dc55e2f15ab66bb03a7 100644 (file)
@@ -496,7 +496,12 @@ private:
   /// lock to protect the global_seq
   pthread_spinlock_t global_seq_lock;
 
-  /// hash map of addresses to Pipes
+  /**
+   * hash map of addresses to Pipes
+   *
+   * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
+   * invalid and can be replaced by anyone holding the msgr lock
+   */
   hash_map<entity_addr_t, Pipe*> rank_pipe;
   /// a set of all the Pipes we have which are somehow active
   set<Pipe*>      pipes;
@@ -524,6 +529,16 @@ private:
 
   friend class Pipe;
 
+  Pipe *_lookup_pipe(const entity_addr_t& k) {
+    hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
+    if (p == rank_pipe.end())
+      return NULL;
+    // see lock cribbing in Pipe::fault()
+    if (p->second->state == Pipe::STATE_CLOSED)
+      return NULL;
+    return p->second;
+  }
+
 public:
 
   int timeout;