]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: reworked accept a bit to more closely mirror pseudocode in wiki
authorSage Weil <sage@newdream.net>
Tue, 18 Mar 2008 22:49:13 +0000 (15:49 -0700)
committerSage Weil <sage@newdream.net>
Tue, 18 Mar 2008 22:49:13 +0000 (15:49 -0700)
src/msg/SimpleMessenger.cc

index 803dd1b48e7565939e4cf652c759acb1ab26e173..ca1df55cc3e56a7985a64ffc1b26ed3b69a55511 100644 (file)
@@ -789,7 +789,11 @@ int Rank::Pipe::accept()
   }
   
   __u32 peer_cseq;
+  Pipe *existing = 0;
   
+  // this should roughly mirror pseudocode at
+  //  http://ceph.newdream.net/wiki/Messaging_protocol
+
   while (1) {
     rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
     if (rc < 0) {
@@ -802,7 +806,7 @@ int Rank::Pipe::accept()
 
     // existing?
     if (rank.rank_pipe.count(peer_addr)) {
-      Pipe *existing = rank.rank_pipe[peer_addr];
+      existing = rank.rank_pipe[peer_addr];
       existing->lock.Lock();
       
       if (peer_cseq < existing->connect_seq) {
@@ -814,7 +818,7 @@ int Rank::Pipe::accept()
          // old attempt, or we sent READY but they didn't get it.
          dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
                   << " > " << peer_cseq << ", RETRY" << dendl;
-         connect_seq = existing->connect_seq;
+         connect_seq = existing->connect_seq;  // so we can send it below..
          existing->lock.Unlock();
          rank.lock.Unlock();
          char tag = CEPH_MSGR_TAG_RETRY;
@@ -864,30 +868,13 @@ int Rank::Pipe::accept()
        continue;       
       } else {
        // reconnect
-       dout(10) << "accept peer sent cseq " << peer_cseq << " > " << existing->connect_seq << dendl;
+       dout(10) << "accept peer sent cseq " << peer_cseq
+                << " > " << existing->connect_seq << dendl;
        goto replace;
       }
       assert(0);
-
-    replace:
-      dout(10) << "accept replacing " << existing << dendl;
-      existing->state = STATE_CLOSED;
-      existing->cond.Signal();
-      existing->unregister_pipe();
-      
-      // steal queue and out_seq
-      out_seq = existing->out_seq;
-      if (!existing->sent.empty()) {
-       out_seq = existing->sent.front()->get_seq()-1;
-       q.splice(q.begin(), existing->sent);
-      }
-      q.splice(q.end(), existing->q);
-      
-      existing->lock.Unlock();
-      break;
-    }
-
-    if (peer_cseq > 0) {
+    } // existing
+    else if (peer_cseq > 0) {
       // we reset, and they are opening a new session
       dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
       rank.lock.Unlock();
@@ -898,11 +885,28 @@ int Rank::Pipe::accept()
     } else {
       // new session
       dout(10) << "accept new session" << dendl;
-      break;
+      goto open;
     }
     assert(0);    
   }
   
+ replace:
+  dout(10) << "accept replacing " << existing << dendl;
+  existing->state = STATE_CLOSED;
+  existing->cond.Signal();
+  existing->unregister_pipe();
+  
+  // steal queue and out_seq
+  out_seq = existing->out_seq;
+  if (!existing->sent.empty()) {
+    out_seq = existing->sent.front()->get_seq()-1;
+    q.splice(q.begin(), existing->sent);
+  }
+  q.splice(q.end(), existing->q);
+  
+  existing->lock.Unlock();
+
+ open:
   // open
   register_pipe();
   rank.lock.Unlock();