]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: lots of fixes, cleanup. reset detection and races both seem to work
authorSage Weil <sage@newdream.net>
Wed, 12 Mar 2008 22:29:57 +0000 (15:29 -0700)
committerSage Weil <sage@newdream.net>
Wed, 12 Mar 2008 22:29:57 +0000 (15:29 -0700)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/osd/OSD.h
src/start.sh

index bc179786c0a4e72251bfbbe9828200ff5e6f1bd1..f767ec42954afc1682a10e157f51fe252e354d03 100644 (file)
@@ -807,61 +807,88 @@ int Rank::Pipe::accept()
       existing->lock.Lock();
       
       if (peer_cseq < existing->connect_seq) {
-       // old attempt, or we sent READY but they didn't get it.
-       dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
-                << " > " << peer_cseq << ", RETRY" << dendl;
-       existing->lock.Unlock();
-       rank.lock.Unlock();
-       char tag = CEPH_MSGR_TAG_RETRY;
-       if (tcp_write(sd, &tag, 1) < 0)
-         goto fail;
-       continue;
-      }
-      
-      if ((peer_cseq == existing->connect_seq && peer_addr < rank.rank_addr) ||
-         (peer_cseq > existing->connect_seq)) {
-       // connection race, incoming wins; or
-       // reconnect
-       dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
-                << " <= " << peer_cseq << ", replacing" << dendl;
-       assert(existing->state == STATE_CONNECTING ||
-              existing->state == STATE_WAIT);
-       existing->state = STATE_CLOSED;
-       existing->cond.Signal();
-       existing->unregister_pipe();
-
-       // steal queue and out_seq
-       out_seq = existing->out_seq;
-       if (!existing->sent.empty()) {
-         out_seq = existing->sent.front()->get_seq()-1;
-         q.splice(q.begin(), existing->sent);
+       if (peer_cseq == 0) {
+         dout(10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
+         existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
+         goto replace;
+       } else {
+         // old attempt, or we sent READY but they didn't get it.
+         dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+                  << " > " << peer_cseq << ", RETRY" << dendl;
+         existing->lock.Unlock();
+         rank.lock.Unlock();
+         char tag = CEPH_MSGR_TAG_RETRY;
+         if (tcp_write(sd, &tag, 1) < 0)
+           goto fail;
+         if (tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0)
+           goto fail;
+         continue;
        }
-       q.splice(q.end(), existing->q);
-
-       existing->lock.Unlock();
-       break;
       }
-      
+
       if (peer_cseq == existing->connect_seq) {
-       // connection race, our outgoing wins
-       dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
-                << " == " << peer_cseq << ", sending WAIT" << dendl;
-       assert(peer_addr > rank.rank_addr);
-       assert(existing->state == STATE_CONNECTING);
-       existing->lock.Unlock();
+       // connection race
+       if (peer_addr < rank.rank_addr) {
+         // incoming wins
+         dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+                  << " == " << peer_cseq << ", replacing my attempt" << dendl;
+         assert(existing->state == STATE_CONNECTING ||
+                existing->state == STATE_WAIT);
+         goto replace;
+       } else {
+         // our existing outgoing wins
+         dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+                  << " == " << peer_cseq << ", sending WAIT" << dendl;
+         assert(peer_addr > rank.rank_addr);
+         assert(existing->state == STATE_CONNECTING); // this will win
+         existing->lock.Unlock();
+         rank.lock.Unlock();
+         
+         char tag = CEPH_MSGR_TAG_WAIT;
+         if (tcp_write(sd, &tag, 1) < 0)
+           goto fail;
+         continue;     
+       }
+      }
+
+      assert(peer_cseq > existing->connect_seq);
+      if (existing->connect_seq == 0) {
+       dout(10) << "accept we reset (peer sent cseq " << peer_cseq 
+                << ", " << existing << ".cseq = " << existing->connect_seq
+                << "), sending RESETSESSION" << dendl;
        rank.lock.Unlock();
-       
-       char tag = CEPH_MSGR_TAG_WAIT;
+       existing->lock.Unlock();
+       char tag = CEPH_MSGR_TAG_RESETSESSION;
        if (tcp_write(sd, &tag, 1) < 0)
          goto fail;
        continue;       
+      } else {
+       // reconnect
+       dout(10) << "accept peer sent cseq " << peer_cseq << " > " << existing->connect_seq << dendl;
+       goto replace;
       }
-
       assert(0);
+
+    replace:
+      dout(10) << "accept replacing " << existing << dendl;
+      existing->state = STATE_CLOSED;
+      existing->cond.Signal();
+      existing->unregister_pipe();
+      
+      // steal queue and out_seq
+      out_seq = existing->out_seq;
+      if (!existing->sent.empty()) {
+       out_seq = existing->sent.front()->get_seq()-1;
+       q.splice(q.begin(), existing->sent);
+      }
+      q.splice(q.end(), existing->q);
+      
+      existing->lock.Unlock();
+      break;
     }
 
     if (peer_cseq > 0) {
-      // we reset, and are opening a new session
+      // we reset, and they are opening a new session
       dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
       rank.lock.Unlock();
       char tag = CEPH_MSGR_TAG_RESETSESSION;
@@ -873,10 +900,10 @@ int Rank::Pipe::accept()
       dout(10) << "accept new session" << dendl;
       break;
     }
-    assert(0);
+    assert(0);    
   }
   
-  // okay!
+  // open
   register_pipe();
   rank.lock.Unlock();
 
@@ -964,7 +991,7 @@ int Rank::Pipe::connect()
     dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
     goto fail;
   }
-  dout(20) << "connect read peer addr " << paddr << dendl;
+  dout(20) << "connect read peer addr " << paddr << " on socket " << newsd << dendl;
   if (!peer_addr.is_local_to(paddr)) {
     dout(0) << "connect peer identifies itself as " 
            << paddr << "... wrong node!" << dendl;
@@ -1001,15 +1028,8 @@ int Rank::Pipe::connect()
       }
 
       dout(0) << "connect got RESETSESSION" << dendl;
-      report_failures();
-      for (unsigned i=0; i<rank.local.size(); i++) 
-       if (rank.local[i] && rank.local[i]->get_dispatcher())
-         rank.local[i]->queue_remote_reset(peer_addr, last_dest_name);
-      // renumber outgoing seqs
-      out_seq = 0;
-      for (list<Message*>::iterator p = q.begin(); p != q.end(); p++)
-       (*p)->set_seq(++out_seq);
-      in_seq = 0;
+      was_session_reset();
+      continue;
     }
     if (tag == CEPH_MSGR_TAG_RETRY) {
       int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
@@ -1121,6 +1141,10 @@ void Rank::Pipe::fault(bool onconnect)
     dout(10) << "fault already closed" << dendl;
     return;
   }
+
+  ::close(sd);
+  sd = -1;
+
   if (q.empty()) {
     if (state == STATE_CLOSING || onconnect) {
       dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
@@ -1129,8 +1153,6 @@ void Rank::Pipe::fault(bool onconnect)
       dout(0) << "fault nothing to send, going to standby" << dendl;
       state = STATE_STANDBY;
     }
-    ::close(sd);
-    sd = -1;
     return;
   } 
 
@@ -1177,6 +1199,23 @@ void Rank::Pipe::fail()
       rank.local[i]->queue_reset(peer_addr, last_dest_name);
 }
 
+void Rank::Pipe::was_session_reset()
+{
+  dout(10) << "was_reset_session" << dendl;
+  report_failures();
+  for (unsigned i=0; i<rank.local.size(); i++) 
+    if (rank.local[i] && rank.local[i]->get_dispatcher())
+      rank.local[i]->queue_remote_reset(peer_addr, last_dest_name);
+
+  // renumber outgoing seqs
+  out_seq = 0;
+  for (list<Message*>::iterator p = q.begin(); p != q.end(); p++)
+    (*p)->set_seq(++out_seq);
+
+  in_seq = 0;
+  connect_seq = 0;
+}
+
 void Rank::Pipe::report_failures()
 {
   // report failures
@@ -1397,7 +1436,7 @@ void Rank::Pipe::writer()
 {
   lock.Lock();
 
-  while (state != STATE_CLOSED && state != STATE_WAIT) {
+  while (state != STATE_CLOSED) { // && state != STATE_WAIT) {
     // standby?
     if (!q.empty() && state == STATE_STANDBY)
       state = STATE_CONNECTING;
@@ -1419,7 +1458,7 @@ void Rank::Pipe::writer()
       continue;
     }
 
-    if (state != STATE_CONNECTING &&
+    if (state != STATE_CONNECTING && state != STATE_WAIT &&
        (!q.empty() || in_seq > in_seq_acked)) {
 
       // send ack?
index 7c866fcbed016a67bcd38dd28bd595198f4f5321..06eb75f915a311147d67d62e020e56c7b2d432fc 100644 (file)
@@ -130,6 +130,8 @@ private:
     void fault(bool silent=false);
     void fail();
 
+    void was_session_reset();
+
     void report_failures();
 
     // threads
index 295fe8d7d8f0fe8f1b4abe27b7d225a236f0863b..5cd010edcf45b8b07315836b8df20e921015413b 100644 (file)
@@ -377,6 +377,7 @@ private:
   void handle_sub_op_reply(class MOSDSubOpReply *m);
 
   void force_remount();
+
 };
 
 #endif
index 4466874441d53ad1d8d25dd7f3d0d8737e253afc..3e0b91574804cb6bdd8d83aa45da341d11fe100f 100755 (executable)
@@ -29,7 +29,7 @@ $CEPH_BIN/mkmonfs --clobber mondata/mon0 --mon 0 --monmap .ceph_monmap
 ARGS="-d --bind $IP -o out --debug_ms 1"
 
 # start monitor
-$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 10 --debug_ms 1
+$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 10
 
 # build and inject an initial osd map
 $CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 4 --print .ceph_osdmap