]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: rewrote connect and accept based on new protocol spec
authorSage Weil <sage@newdream.net>
Wed, 12 Mar 2008 20:41:14 +0000 (13:41 -0700)
committerSage Weil <sage@newdream.net>
Wed, 12 Mar 2008 20:41:14 +0000 (13:41 -0700)
src/include/ceph_fs.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index e06473860e59dba02e421a7e62522e7310b5662c..395d39243b482a3af622c87dfc68ba28c6af3507 100644 (file)
@@ -206,11 +206,13 @@ struct ceph_entity_name {
 #define CEPH_ENTITY_TYPE_CLIENT 4
 #define CEPH_ENTITY_TYPE_ADMIN  5
 
-#define CEPH_MSGR_TAG_READY   1  /* server -> client + cseq: ready for messages */
-#define CEPH_MSGR_TAG_REJECT  2  /* server -> client + cseq: decline socket */
-#define CEPH_MSGR_TAG_MSG     3  /* message */
-#define CEPH_MSGR_TAG_ACK     4  /* message ack */
-#define CEPH_MSGR_TAG_CLOSE   5  /* closing pipe */
+#define CEPH_MSGR_TAG_READY         1  /* server -> client: ready for messages */
+#define CEPH_MSGR_TAG_RESETSESSION  2  /* server -> client: reset, try again */
+#define CEPH_MSGR_TAG_WAIT          3  /* server -> client: wait for racing incoming connection */
+#define CEPH_MSGR_TAG_RETRY         4  /* server -> client + cseq: try again with higher cseq */
+#define CEPH_MSGR_TAG_CLOSE         5  /* closing pipe */
+#define CEPH_MSGR_TAG_MSG          10  /* message */
+#define CEPH_MSGR_TAG_ACK          11  /* message ack */
 
 
 /*
index dd154f99c5f7f5b054db88b6769057124e355f02..bc179786c0a4e72251bfbbe9828200ff5e6f1bd1 100644 (file)
@@ -752,23 +752,6 @@ void Rank::mark_down(entity_addr_t addr)
 #define dout(l)  if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")."
 #define derr(l)  if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")."
 
-/*
- * we have to be careful about connection races:
- *  A initiates connection
- *  B initiates connection
- *  B accepts A's connection
- *  A rejects B's connection   (or vice-versa)
- * 
- * this is controlled by whether accept uses the new incoming socket
- * as the new pipe.  two cases:
- *  old         new(incoming)
- *  connecting  connecting   -> use socket initiated by lower address
- *  open        connecting 
- *   -> use new socket _only_ if connect_seq matches.  that is, the
- *      peer reconnected subsequent to the current open socket.  if
- *      connect_seq _doesn't_ match, it means that it is an 'old' attempt.
- */
-
 int Rank::Pipe::accept()
 {
   dout(10) << "accept" << dendl;
@@ -804,93 +787,121 @@ int Rank::Pipe::accept()
     }
     dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
   }
-
-  __u32 cseq;
-  rc = tcp_read(sd, (char*)&cseq, sizeof(cseq));
-  if (rc < 0) {
-    dout(10) << "accept couldn't read connect seq" << dendl;
-    state = STATE_CLOSED;
-    return -1;
-  }
-
-  dout(20) << "accept got connect_seq " << cseq << dendl;
-
-  __u32 myseq = connect_seq = 1;
+  
+  __u32 peer_cseq;
+  connect_seq = 0;
+  
+  while (1) {
+    rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
+    if (rc < 0) {
+      dout(10) << "accept couldn't read connect peer_seq" << dendl;
+      goto fail;
+    }
+    dout(20) << "accept got peer_connect_seq " << peer_cseq << dendl;
     
-  // register pipe.
-  rank.lock.Lock();
-  {
-    if (rank.rank_pipe.count(peer_addr) == 0) {
-      dout(10) << "accept new peer " << peer_addr << dendl;
-      register_pipe();
-    } else {
-      // hmm!
-      Pipe *other = rank.rank_pipe[peer_addr];
-      other->lock.Lock();
-
-      dout(10) << "accept got connect_seq " << cseq 
-              << ", existing pipe connect_seq " << other->connect_seq
-              << " state " << other->state
-              << dendl;
-
-      // if open race, low addr's pipe "wins".
-      // otherwise, look at connect_seq
-      if ((other->state == STATE_CONNECTING && peer_addr < rank.rank_addr) ||
-         (other->state == STATE_OPEN && cseq >= other->connect_seq)) {
-       dout(10) << "accept already had pipe " << other
-                << ", but switching to this new one" << dendl;
-       // switch to this new Pipe
-       other->state = STATE_CLOSED;
-       assert(q.empty());
-       other->cond.Signal();
-       other->unregister_pipe();
-       register_pipe();
+    rank.lock.Lock();
+
+    // existing?
+    if (rank.rank_pipe.count(peer_addr)) {
+      Pipe *existing = rank.rank_pipe[peer_addr];
+      existing->lock.Lock();
+      
+      if (peer_cseq < existing->connect_seq) {
+       // old attempt, or we sent READY but they didn't get it.
+       dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+                << " > " << peer_cseq << ", RETRY" << dendl;
+       existing->lock.Unlock();
+       rank.lock.Unlock();
+       char tag = CEPH_MSGR_TAG_RETRY;
+       if (tcp_write(sd, &tag, 1) < 0)
+         goto fail;
+       continue;
+      }
+      
+      if ((peer_cseq == existing->connect_seq && peer_addr < rank.rank_addr) ||
+         (peer_cseq > existing->connect_seq)) {
+       // connection race, incoming wins; or
+       // reconnect
+       dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+                << " <= " << peer_cseq << ", replacing" << dendl;
+       assert(existing->state == STATE_CONNECTING ||
+              existing->state == STATE_WAIT);
+       existing->state = STATE_CLOSED;
+       existing->cond.Signal();
+       existing->unregister_pipe();
 
        // steal queue and out_seq
-       out_seq = other->out_seq;
-       if (!other->sent.empty()) {
-         out_seq = other->sent.front()->get_seq()-1;
-         q.splice(q.begin(), other->sent);
+       out_seq = existing->out_seq;
+       if (!existing->sent.empty()) {
+         out_seq = existing->sent.front()->get_seq()-1;
+         q.splice(q.begin(), existing->sent);
        }
-       q.splice(q.end(), other->q);
-      } 
-      else {
-       dout(10) << "accept already had pipe " << other
-                << ", closing this one" << dendl;
-       myseq = other->connect_seq;
-       state = STATE_CLOSED;
+       q.splice(q.end(), existing->q);
+
+       existing->lock.Unlock();
+       break;
       }
-      other->lock.Unlock();
-    } 
+      
+      if (peer_cseq == existing->connect_seq) {
+       // connection race, our outgoing wins
+       dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+                << " == " << peer_cseq << ", sending WAIT" << dendl;
+       assert(peer_addr > rank.rank_addr);
+       assert(existing->state == STATE_CONNECTING);
+       existing->lock.Unlock();
+       rank.lock.Unlock();
+       
+       char tag = CEPH_MSGR_TAG_WAIT;
+       if (tcp_write(sd, &tag, 1) < 0)
+         goto fail;
+       continue;       
+      }
+
+      assert(0);
+    }
+
+    if (peer_cseq > 0) {
+      // we reset, and are opening a new session
+      dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
+      rank.lock.Unlock();
+      char tag = CEPH_MSGR_TAG_RESETSESSION;
+      if (tcp_write(sd, &tag, 1) < 0)
+       goto fail;
+      continue;        
+    } else {
+      // new session
+      dout(10) << "accept new session" << dendl;
+      break;
+    }
+    assert(0);
   }
+  
+  // okay!
+  register_pipe();
   rank.lock.Unlock();
 
-  char tag;
-  if (state == STATE_CLOSED) {
-    dout(10) << "accept closed, sending REJECT tag" << dendl;
-    tag = CEPH_MSGR_TAG_REJECT;
-  } else {
-    dout(10) << "accept sending READY tag" << dendl;
-    tag = CEPH_MSGR_TAG_READY;
-    state = STATE_OPEN;
-    kick_reader_on_join = true;
-  }
+  connect_seq = peer_cseq + 1;
+  dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
 
-  if (tcp_write(sd, &tag, 1) < 0 ||
-      tcp_write(sd, (char*)&myseq, sizeof(myseq)) < 0) {
-    dout(2) << "accept couldn't send initial tag+seq: "
-           << strerror(errno) << dendl;
-    fault();
+  // send READY
+  { 
+    char tag = CEPH_MSGR_TAG_READY;
+    if (tcp_write(sd, &tag, 1) < 0) 
+      goto fail;
   }
 
   if (state != STATE_CLOSED) {
-    dout(10) << "accept starting writer, "
-            << "state=" << state << dendl;
+    dout(10) << "accept starting writer, " << "state=" << state << dendl;
     start_writer();
   }
-
   dout(20) << "accept done" << dendl;
   return 0;   // success.
+
+
+ fail:
+  state = STATE_CLOSED;
+  fault();
+  return -1;
 }
 
 int Rank::Pipe::connect()
@@ -903,10 +914,9 @@ int Rank::Pipe::connect()
     sd = -1;
   }
   __u32 cseq = connect_seq;
-  __u32 rseq;
 
   lock.Unlock();
-
+  
   int newsd;
   char tag = -1;
   int rc;
@@ -915,7 +925,7 @@ int Rank::Pipe::connect()
   struct msghdr msg;
   struct iovec msgvec[2];
   int msglen;
-
+  
   // create socket?
   newsd = ::socket(AF_INET, SOCK_STREAM, 0);
   if (newsd < 0) {
@@ -928,7 +938,6 @@ int Rank::Pipe::connect()
   myAddr.sin_family = AF_INET;
   myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
   myAddr.sin_port = htons( 0 );    
-  
   rc = ::bind(newsd, (struct sockaddr *) &myAddr, sizeof(myAddr));
   assert(rc>=0);
 
@@ -962,7 +971,7 @@ int Rank::Pipe::connect()
     goto fail;
   }
 
-  // identify myself, and send open seq
+  // identify myself, and send initial cseq
   memset(&msg, 0, sizeof(msg));
   msgvec[0].iov_base = (char*)&rank.rank_addr;
   msgvec[0].iov_len = sizeof(rank.rank_addr);
@@ -972,47 +981,26 @@ int Rank::Pipe::connect()
   msg.msg_iovlen = 2;
   msglen = msgvec[0].iov_len + msgvec[1].iov_len;
 
-  if (do_sendmsg(newsd, &msg, msglen)) {
-    dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
-    goto fail;
-  }
-
-  dout(20) << "connect wrote self, seq, waiting for tag" << dendl;
-
-  // wait for tag
-  if (tcp_read(newsd, &tag, 1) < 0 ||
-      tcp_read(newsd, (char*)&rseq, sizeof(rseq)) < 0) {
-    dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
-    goto fail;
-  }
-
-  dout(20) << "connect got initial tag " << (int)tag << " + seq " << rseq << dendl;
+  while (1) {
+    if (do_sendmsg(newsd, &msg, msglen)) {
+      dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
+      goto fail;
+    }
+    dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl;
+    if (tcp_read(newsd, &tag, 1) < 0) {
+      dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
+      goto fail;
+    }
+    dout(20) << "connect got tag " << (int)tag << dendl;
 
-  lock.Lock();
+    if (tag == CEPH_MSGR_TAG_RESETSESSION) {
+      lock.Lock();
+      if (state != STATE_CONNECTING) {
+       dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
+       goto stop_locked;
+      }
 
-  // FINISH
-  if (state != STATE_CONNECTING) {
-    dout(2) << "connect hmm, race durring connect(), not connecting anymore, failing" << dendl;
-    goto fail_locked;  // hmm!
-  }
-  if (tag == CEPH_MSGR_TAG_REJECT) {
-    if (connect_seq != rseq) {
-      dout(0) << "connect got REJECT, old connect_seq was " << connect_seq
-             << ", taking new " << rseq << dendl;
-      connect_seq = rseq;
-    } else {
-      dout(10) << "connect got REJECT, connection race (harmless), connect_seq=" << connect_seq << dendl;
-    }
-    goto fail_locked;
-  }
-  assert(tag == CEPH_MSGR_TAG_READY);
-  state = STATE_OPEN;
-  this->sd = newsd;
-  connect_seq++;
-  if (rseq != connect_seq) {
-    dout(0) << "connect REMOTE RESET: my seq = " << connect_seq << ", remote seq = " << rseq << dendl;
-    if (rseq < connect_seq) {
-      connect_seq = rseq;
+      dout(0) << "connect got RESETSESSION" << dendl;
       report_failures();
       for (unsigned i=0; i<rank.local.size(); i++) 
        if (rank.local[i] && rank.local[i]->get_dispatcher())
@@ -1022,25 +1010,83 @@ int Rank::Pipe::connect()
       for (list<Message*>::iterator p = q.begin(); p != q.end(); p++)
        (*p)->set_seq(++out_seq);
       in_seq = 0;
-    } else {
-      dout(0) << "WTF" << dendl;
-      assert(0);
     }
-  }
-  first_fault = last_attempt = utime_t();
-  dout(20) << "connect success " << connect_seq << dendl;
+    if (tag == CEPH_MSGR_TAG_RETRY) {
+      int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
+      if (rc < 0) {
+       dout(0) << "connect got RETRY tag but couldn't read cseq" << dendl;
+       goto fail;
+      }
+      lock.Lock();
+      if (state != STATE_CONNECTING) {
+       dout(0) << "connect got RETRY, but connection race or something, failing" << dendl;
+       goto stop_locked;
+      }
+      assert(cseq > connect_seq);
+      dout(10) << "connect got RETRY " << connect_seq << " -> " << cseq << dendl;
+      connect_seq = cseq;
+    }
 
-  if (!reader_running) {
-    dout(20) << "connect starting reader" << dendl;
-    start_reader();
+    if (tag == CEPH_MSGR_TAG_RESETSESSION ||
+       tag == CEPH_MSGR_TAG_RETRY) {
+      // retry
+      lock.Unlock();
+      memset(&msg, 0, sizeof(msg));
+      msgvec[0].iov_base = (char*)&cseq;
+      msgvec[0].iov_len = sizeof(cseq);
+      msg.msg_iov = msgvec;
+      msg.msg_iovlen = 1;
+      msglen = msgvec[0].iov_len;
+      continue;
+    }
+
+    if (tag == CEPH_MSGR_TAG_WAIT) {
+      lock.Lock();
+      if (state == STATE_CONNECTING) {
+       dout(3) << "connect got WAIT (connection race), will wait" << dendl;
+       state = STATE_WAIT;
+      } else {
+       dout(3) << "connect got WAIT (connection race), and lo, the wait is already over" << dendl;
+      }
+      goto stop_locked;
+    }
+
+    if (tag == CEPH_MSGR_TAG_READY) {
+      lock.Lock();
+      if (state != STATE_CONNECTING) {
+       dout(3) << "connect got READY but no longer connecting?" << dendl;
+       goto stop_locked;
+      }
+
+      // hooray!
+      state = STATE_OPEN;
+      sd = newsd;
+      connect_seq = cseq+1;
+      first_fault = last_attempt = utime_t();
+      dout(20) << "connect success " << connect_seq << dendl;
+
+      if (!reader_running) {
+       dout(20) << "connect starting reader" << dendl;
+       start_reader();
+      }
+      return 0;
+    }
+    
+    // protocol error
+    dout(0) << "connect got bad tag " << (int)tag << dendl;
+    goto fail;
   }
-  return 0;
 
  fail:
   lock.Lock();
- fail_locked:
-  if (newsd >= 0) ::close(newsd);
-  fault(tag == CEPH_MSGR_TAG_REJECT); // quiet if reject (not socket error)
+  if (state == STATE_CONNECTING)
+    fault();
+  else
+    dout(3) << "connect fault, but state != connecting, stopping" << dendl;
+
+ stop_locked:
+  if (newsd >= 0) 
+    ::close(newsd);
   return -1;
 }
 
@@ -1296,8 +1342,12 @@ void Rank::Pipe::reader()
     else if (tag == CEPH_MSGR_TAG_CLOSE) {
       dout(20) << "reader got CLOSE" << dendl;
       lock.Lock();
-      fault(true);  // treat as a fault; i.e. reconnect|close
-      continue;
+      if (state == STATE_CLOSING)
+       state = STATE_CLOSED;
+      else
+       state = STATE_CLOSING;
+      cond.Signal();
+      break;
     }
     else {
       dout(0) << "reader bad tag " << (int)tag << dendl;
@@ -1324,6 +1374,8 @@ void Rank::Pipe::reader()
     }
     rank.lock.Unlock();
   }
+
+  dout(10) << "reader done" << dendl;
 }
 
 /*
@@ -1345,7 +1397,7 @@ void Rank::Pipe::writer()
 {
   lock.Lock();
 
-  while (state != STATE_CLOSED) {
+  while (state != STATE_CLOSED && state != STATE_WAIT) {
     // standby?
     if (!q.empty() && state == STATE_STANDBY)
       state = STATE_CONNECTING;
@@ -1359,11 +1411,11 @@ void Rank::Pipe::writer()
     if (state == STATE_CLOSING) {
       // write close tag
       dout(20) << "writer writing CLOSE tag" << dendl;
-      char c = CEPH_MSGR_TAG_CLOSE;
+      char tag = CEPH_MSGR_TAG_CLOSE;
+      state = STATE_CLOSED;
       lock.Unlock();
-      if (sd) ::write(sd, &c, 1);
+      if (sd) ::write(sd, &tag, 1);
       lock.Lock();
-      state = STATE_CLOSED;
       continue;
     }
 
@@ -1442,6 +1494,8 @@ void Rank::Pipe::writer()
     }
     rank.lock.Unlock();
   }
+
+  dout(10) << "writer done" << dendl;
 }
 
 
@@ -1517,10 +1571,11 @@ Message *Rank::Pipe::read_message()
   }
   
   // unmarshall message
+  dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from " 
+           << env.src << dendl;
+
   Message *m = decode_message(env, front, data);
 
-  dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from " 
-           << m->get_source() << dendl;
   
   return m;
 }
index 01fb09d577b8986ce2a17b33bbdcbd0e6d21fee8..7c866fcbed016a67bcd38dd28bd595198f4f5321 100644 (file)
@@ -87,9 +87,8 @@ private:
       STATE_OPEN,
       STATE_STANDBY,
       STATE_CLOSED,
-      STATE_CLOSING
-      //STATE_GOTCLOSE,  // got (but haven't sent) a close
-      //STATE_SENTCLOSE  // sent (but haven't got) a close
+      STATE_CLOSING,
+      STATE_WAIT       // just wait for racing connection
     };
 
     int sd;
@@ -107,7 +106,6 @@ private:
     utime_t last_attempt;  // time of last reconnect attempt
 
     bool reader_running;
-    bool kick_reader_on_join;
     bool writer_running;
 
     list<Message*> q;
@@ -155,7 +153,7 @@ private:
     Pipe(int st) : 
       sd(-1),
       state(st), 
-      reader_running(false), kick_reader_on_join(false), writer_running(false),
+      reader_running(false), writer_running(false),
       connect_seq(0),
       out_seq(0), in_seq(0), in_seq_acked(0),
       reader_thread(this), writer_thread(this) { }
@@ -181,10 +179,7 @@ private:
     void dirty_close();
     void join() {
       if (writer_thread.is_started()) writer_thread.join();
-      if (reader_thread.is_started()) {
-       //if (kick_reader_on_join) reader_thread.kill(SIGUSR1);
-       reader_thread.join();
-      }
+      if (reader_thread.is_started()) reader_thread.join();
     }
     void stop();