]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: add global_seq to disambiguate reset and slow connect
authorSage Weil <sage@newdream.net>
Tue, 1 Jul 2008 05:23:19 +0000 (22:23 -0700)
committerSage Weil <sage@newdream.net>
Tue, 1 Jul 2008 05:23:19 +0000 (22:23 -0700)
src/include/ceph_fs.h
src/kernel/messenger.c
src/kernel/messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 9d1c5d84ccf697abc952cd53bf31abb97864877b..d1149aff1dc29d1217fdb843120bf0e35b2a7dc2 100644 (file)
@@ -336,8 +336,9 @@ struct ceph_entity_name {
 #define CEPH_MSGR_TAG_READY         1  /* server -> client: ready for messages */
 #define CEPH_MSGR_TAG_RESETSESSION  2  /* server -> client: reset, try again */
 #define CEPH_MSGR_TAG_WAIT          3  /* server -> client: wait for racing incoming connection */
-#define CEPH_MSGR_TAG_RETRY         4  /* server -> client + cseq: try again with higher cseq */
-#define CEPH_MSGR_TAG_CLOSE         5  /* closing pipe */
+#define CEPH_MSGR_TAG_RETRY_SESSION 4  /* server -> client + cseq: try again with higher cseq */
+#define CEPH_MSGR_TAG_RETRY_GLOBAL  5  /* server -> client + gseq: try again with higher gseq */
+#define CEPH_MSGR_TAG_CLOSE         6  /* closing pipe */
 #define CEPH_MSGR_TAG_MSG          10  /* message */
 #define CEPH_MSGR_TAG_ACK          11  /* message ack */
 
index c817988825724daf4245958e9e5f750658e5d410..9f3c61fc585799521e97201ed48fa17b06ea7188 100644 (file)
@@ -16,7 +16,8 @@ int ceph_debug_msgr;
 /* static tag bytes */
 static char tag_ready = CEPH_MSGR_TAG_READY;
 static char tag_reset = CEPH_MSGR_TAG_RESETSESSION;
-static char tag_retry = CEPH_MSGR_TAG_RETRY;
+static char tag_retry_session = CEPH_MSGR_TAG_RETRY_SESSION;
+static char tag_retry_global = CEPH_MSGR_TAG_RETRY_GLOBAL;
 static char tag_wait = CEPH_MSGR_TAG_WAIT;
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -614,6 +615,17 @@ static void ceph_fault(struct ceph_connection *con)
 }
 
 
+static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
+{
+       u32 ret;
+       spin_lock(&msgr->global_seq_lock);
+       if (msgr->global_seq < gt)
+               msgr->global_seq = gt;
+       ret = ++msgr->global_seq;
+       spin_unlock(&msgr->global_seq_lock);
+       return ret;
+}
+
 
 /*
  * non-blocking versions
@@ -804,11 +816,15 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
 {
        con->out_kvec[0].iov_base = &msgr->inst.addr;
        con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
-       con->out_connect_seq = cpu_to_le32(con->connect_seq);
-       con->out_kvec[1].iov_base = &con->out_connect_seq;
+       con->global_seq = get_global_seq(msgr, 0);
+       con->out_global_seq = cpu_to_le32(con->global_seq);
+       con->out_kvec[1].iov_base = &con->out_global_seq;
        con->out_kvec[1].iov_len = 4;
-       con->out_kvec_left = 2;
-       con->out_kvec_bytes = sizeof(msgr->inst.addr) + 4;
+       con->out_connect_seq = cpu_to_le32(con->connect_seq);
+       con->out_kvec[2].iov_base = &con->out_connect_seq;
+       con->out_kvec[2].iov_len = 4;
+       con->out_kvec_left = 3;
+       con->out_kvec_bytes = sizeof(msgr->inst.addr) + 8;
        con->out_kvec_cur = con->out_kvec;
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
@@ -818,10 +834,12 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr,
                                        struct ceph_connection *con)
 {
        con->out_connect_seq = cpu_to_le32(con->connect_seq);
-       con->out_kvec[0].iov_base = &con->out_connect_seq;
+       con->out_kvec[0].iov_base = &con->out_global_seq;
        con->out_kvec[0].iov_len = 4;
-       con->out_kvec_left = 1;
-       con->out_kvec_bytes = 4;
+       con->out_kvec[1].iov_base = &con->out_connect_seq;
+       con->out_kvec[1].iov_len = 4;
+       con->out_kvec_left = 2;
+       con->out_kvec_bytes = 8;
        con->out_kvec_cur = con->out_kvec;
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
@@ -850,12 +868,12 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag)
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag)
+static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
+                                      u32 *pseq)
 {
        con->out_kvec[0].iov_base = ptag;
        con->out_kvec[0].iov_len = 1;
-       con->out_connect_seq = cpu_to_le32(con->connect_seq);
-       con->out_kvec[1].iov_base = &con->out_connect_seq;
+       con->out_kvec[1].iov_base = pseq;
        con->out_kvec[1].iov_len = 4;
        con->out_kvec_left = 2;
        con->out_kvec_bytes = 1 + 4;
@@ -1190,7 +1208,7 @@ static int read_connect_partial(struct ceph_connection *con)
                con->in_base_pos += ret;
        }
 
-       if (con->in_tag == CEPH_MSGR_TAG_RETRY) {
+       if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
                /* peers connect_seq */
                to += sizeof(con->in_connect_seq);
                if (con->in_base_pos < to) {
@@ -1204,6 +1222,20 @@ static int read_connect_partial(struct ceph_connection *con)
                        con->in_base_pos += ret;
                }
        }
+       if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
+               /* peers global_seq */
+               to += sizeof(con->in_global_seq);
+               if (con->in_base_pos < to) {
+                       int left = to - con->in_base_pos;
+                       int have = sizeof(con->in_global_seq) - left;
+                       ret = ceph_tcp_recvmsg(con->sock,
+                                              (char *)&con->in_global_seq +
+                                              have, left);
+                       if (ret <= 0)
+                               goto out;
+                       con->in_base_pos += ret;
+               }
+       }
        ret = 1;
 out:
        dout(20, "read_connect_partial %p end at %d ret %d\n", con,
@@ -1259,7 +1291,7 @@ static int process_connect(struct ceph_connection *con)
                prepare_read_connect(con);
                con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
                break;
-       case CEPH_MSGR_TAG_RETRY:
+       case CEPH_MSGR_TAG_RETRY_SESSION:
                dout(10,
                     "process_connect got RETRY my seq = %u, peer_seq = %u\n",
                     le32_to_cpu(con->out_connect_seq),
@@ -1268,6 +1300,16 @@ static int process_connect(struct ceph_connection *con)
                prepare_write_connect_retry(con->msgr, con);
                prepare_read_connect(con);
                break;
+       case CEPH_MSGR_TAG_RETRY_GLOBAL:
+               dout(10,
+                    "process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
+                    con->global_seq, le32_to_cpu(con->in_global_seq));
+               con->global_seq =
+                       get_global_seq(con->msgr,
+                                      le32_to_cpu(con->in_global_seq));
+               prepare_write_connect_retry(con->msgr, con);
+               prepare_read_connect(con);
+               break;
        case CEPH_MSGR_TAG_WAIT:
                dout(10, "process_connect peer connecting WAIT\n");
                set_bit(WAIT, &con->state);
@@ -1307,11 +1349,24 @@ static int read_accept_partial(struct ceph_connection *con)
                con->in_base_pos += ret;
        }
 
+       /* global_seq */
+       to += sizeof(con->in_global_seq);
+       while (con->in_base_pos < to) {
+               int left = to - con->in_base_pos;
+               int have = sizeof(u32) - left;
+               ret = ceph_tcp_recvmsg(con->sock,
+                                      (char *)&con->in_global_seq + have,
+                                      left);
+               if (ret <= 0)
+                       return ret;
+               con->in_base_pos += ret;
+       }
+
        /* connect_seq */
        to += sizeof(con->in_connect_seq);
        while (con->in_base_pos < to) {
                int left = to - con->in_base_pos;
-               int have = sizeof(con->peer_addr) - left;
+               int have = sizeof(u32) - left;
                ret = ceph_tcp_recvmsg(con->sock,
                                       (char *)&con->in_connect_seq + have,
                                       left);
@@ -1358,7 +1413,8 @@ static void process_accept(struct ceph_connection *con)
 {
        struct ceph_connection *existing;
        struct ceph_messenger *msgr = con->msgr;
-       __u32 peer_cseq = le32_to_cpu(con->in_connect_seq);
+       u32 peer_gseq = le32_to_cpu(con->in_global_seq);
+       u32 peer_cseq = le32_to_cpu(con->in_connect_seq);
 
        /* do we have an existing connection for this peer? */
        if (radix_tree_preload(GFP_NOFS) < 0) {
@@ -1368,7 +1424,15 @@ static void process_accept(struct ceph_connection *con)
        spin_lock(&msgr->con_lock);
        existing = __get_connection(msgr, &con->peer_addr);
        if (existing) {
-               if (test_bit(LOSSY, &existing->state)) {
+               if (peer_gseq < existing->global_seq) {
+                       /* retry_global */
+                       con->global_seq = existing->global_seq;
+                       con->out_global_seq =
+                               cpu_to_le32(con->global_seq);
+                       prepare_write_accept_retry(con,
+                                                  &tag_retry_global,
+                                                  &con->out_global_seq);
+               } else if (test_bit(LOSSY, &existing->state)) {
                        dout(20, "process_accept replacing existing LOSSY %p\n",
                             existing);
                        reset_connection(existing);
@@ -1385,7 +1449,11 @@ static void process_accept(struct ceph_connection *con)
                                /* old attempt or peer didn't get the READY */
                                /* send retry with peers connect seq */
                                con->connect_seq = existing->connect_seq;
-                               prepare_write_accept_retry(con, &tag_retry);
+                               con->out_connect_seq =
+                                       cpu_to_le32(con->connect_seq);
+                               prepare_write_accept_retry(con,
+                                       &tag_retry_session,
+                                       &con->out_connect_seq);
                        }
                } else if (peer_cseq == existing->connect_seq &&
                           (test_bit(CONNECTING, &existing->state) ||
@@ -1419,6 +1487,7 @@ static void process_accept(struct ceph_connection *con)
        } else {
                dout(20, "process_accept no existing connection, opening\n");
                __register_connection(msgr, con);
+               con->global_seq = peer_gseq;
                con->connect_seq = peer_cseq + 1;
                prepare_write_accept_reply(con, &tag_ready);
        }
@@ -1629,6 +1698,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        INIT_LIST_HEAD(&msgr->con_all);
        INIT_LIST_HEAD(&msgr->con_accepting);
        INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC);
+       spin_lock_init(&msgr->global_seq_lock);
 
        msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
        if (!msgr->zero_page) {
index 3ccc80eaa6cac8866e61a09fe29b3090aaf0e5ff..33c4a250a3b86917b07ca043301e9b19b8aa50bb 100644 (file)
@@ -51,6 +51,8 @@ struct ceph_messenger {
        struct list_head con_accepting;  /* accepting */
        struct radix_tree_root con_tree; /*  established */
        struct page *zero_page;
+       u32 global_seq;
+       spinlock_t global_seq_lock;
 };
 
 struct ceph_msg {
@@ -101,8 +103,9 @@ struct ceph_connection {
 
        struct ceph_entity_addr peer_addr; /* peer address */
        struct ceph_entity_name peer_name; /* peer name */
-       __u32 connect_seq;
+       __u32 connect_seq, global_seq;
        __le32 in_connect_seq, out_connect_seq;
+       __le32 in_global_seq, out_global_seq;
        __u32 out_seq;               /* last message queued for send */
        __u32 in_seq, in_seq_acked;  /* last message received, acked */
 
index ac73f74a0c1ab04c0b3d07110eef325ca3ea58a6..a2e218ed986e6976dd9e70895bc08eb3310f03c4 100644 (file)
@@ -770,13 +770,18 @@ int Rank::Pipe::accept()
     dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
   }
   
-  __u32 peer_cseq;
+  __u32 peer_gseq, peer_cseq;
   Pipe *existing = 0;
   
   // this should roughly mirror pseudocode at
   //  http://ceph.newdream.net/wiki/Messaging_protocol
 
   while (1) {
+    rc = tcp_read(sd, (char*)&peer_gseq, sizeof(peer_gseq));
+    if (rc < 0) {
+      dout(10) << "accept couldn't read connect peer_gseq" << dendl;
+      goto fail;
+    }
     rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
     if (rc < 0) {
       dout(10) << "accept couldn't read connect peer_seq" << dendl;
@@ -790,6 +795,20 @@ int Rank::Pipe::accept()
     if (rank.rank_pipe.count(peer_addr)) {
       existing = rank.rank_pipe[peer_addr];
       existing->lock.Lock();
+
+      if (peer_gseq < existing->peer_global_seq) {
+       dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+                << " > " << peer_gseq << ", RETRY_GLOBAL" << dendl;
+       __u32 gseq = existing->peer_global_seq;  // so we can send it below..
+       existing->lock.Unlock();
+       rank.lock.Unlock();
+       char tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
+       if (tcp_write(sd, &tag, 1) < 0)
+         goto fail;
+       if (tcp_write(sd, (char*)&gseq, sizeof(gseq)) < 0)
+         goto fail;
+       continue;
+      }
       
       if (existing->policy.is_lossy()) {
        dout(-10) << "accept replacing existing (lossy) channel" << dendl;
@@ -798,41 +817,21 @@ int Rank::Pipe::accept()
       }
 
       if (peer_cseq < existing->connect_seq) {
-       if (false &&
-           /*
-            * FIXME: protocol spec is flawed here.  we can't
-            * distinguish between a remote reset or a slow remote
-            * connect race (where the remote connect arrives _after_
-            * our outgoing connection gets a READY reply).
-            *
-            * BUT, this doesn't happen in practice, yet.  the "reset"
-            * case comes up in two situations:
-            *
-            * - mds resets connection to client.  it should _never_
-            * talk to that client after that, unless the client
-            * initiates the connection.
-            *
-            * - mon restarts.  it'll talk to the client.  but, the client
-            * doesn't need the peer_reset calback in that case.  faling into the 
-            * RETRY case is harmless.
-            *
-            * blah!
-            */
-           peer_cseq == 0) {
+       if (peer_cseq == 0) {
          dout(10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
          existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
          goto replace;
        } else {
          // old attempt, or we sent READY but they didn't get it.
          dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
-                  << " > " << peer_cseq << ", RETRY" << dendl;
-         connect_seq = existing->connect_seq;  // so we can send it below..
+                  << " > " << peer_cseq << ", RETRY_SESSION" << dendl;
+         __u32 cseq = existing->connect_seq;  // so we can send it below..
          existing->lock.Unlock();
          rank.lock.Unlock();
-         char tag = CEPH_MSGR_TAG_RETRY;
+         char tag = CEPH_MSGR_TAG_RETRY_SESSION;
          if (tcp_write(sd, &tag, 1) < 0)
            goto fail;
-         if (tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0)
+         if (tcp_write(sd, (char*)&cseq, sizeof(cseq)) < 0)
            goto fail;
          continue;
        }
@@ -864,6 +863,7 @@ int Rank::Pipe::accept()
       }
 
       assert(peer_cseq > existing->connect_seq);
+      assert(peer_gseq > existing->peer_global_seq);
       if (existing->connect_seq == 0) {
        dout(10) << "accept we reset (peer sent cseq " << peer_cseq 
                 << ", " << existing << ".cseq = " << existing->connect_seq
@@ -921,6 +921,7 @@ int Rank::Pipe::accept()
   rank.lock.Unlock();
 
   connect_seq = peer_cseq + 1;
+  peer_global_seq = peer_gseq;
   dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
 
   // send READY
@@ -954,6 +955,7 @@ int Rank::Pipe::connect()
     sd = -1;
   }
   __u32 cseq = connect_seq;
+  __u32 gseq = rank.get_global_seq();
 
   lock.Unlock();
   
@@ -1021,15 +1023,27 @@ int Rank::Pipe::connect()
   memset(&msg, 0, sizeof(msg));
   msgvec[0].iov_base = (char*)&rank.rank_addr;
   msgvec[0].iov_len = sizeof(rank.rank_addr);
-  msgvec[1].iov_base = (char*)&cseq;
-  msgvec[1].iov_len = sizeof(cseq);
   msg.msg_iov = msgvec;
-  msg.msg_iovlen = 2;
-  msglen = msgvec[0].iov_len + msgvec[1].iov_len;
+  msg.msg_iovlen = 1;
+  msglen = msgvec[0].iov_len;
+  if (do_sendmsg(newsd, &msg, msglen)) {
+    dout(2) << "connect couldn't write self addr, " << strerror(errno) << dendl;
+    goto fail;
+  }
 
   while (1) {
+    memset(&msg, 0, sizeof(msg));
+    msgvec[0].iov_base = (char*)&gseq;
+    msgvec[0].iov_len = sizeof(gseq);
+    msgvec[1].iov_base = (char*)&cseq;
+    msgvec[1].iov_len = sizeof(cseq);
+    msg.msg_iov = msgvec;
+    msg.msg_iovlen = 2;
+    msglen = msgvec[0].iov_len + msgvec[1].iov_len;
+
+    dout(10) << "connect sending gseq " << gseq << " cseq " << cseq << dendl;
     if (do_sendmsg(newsd, &msg, msglen)) {
-      dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
+      dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
       goto fail;
     }
     dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl;
@@ -1048,34 +1062,40 @@ int Rank::Pipe::connect()
 
       dout(0) << "connect got RESETSESSION" << dendl;
       was_session_reset();
+      lock.Unlock();
+      continue;
+    }
+    if (tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
+      int rc = tcp_read(newsd, (char*)&gseq, sizeof(gseq));
+      if (rc < 0) {
+       dout(0) << "connect got RETRY_GLOBAL tag but couldn't read gseq" << dendl;
+       goto fail;
+      }
+      lock.Lock();
+      if (state != STATE_CONNECTING) {
+       dout(0) << "connect got RETRY_GLOBAL, but connection race or something, failing" << dendl;
+       goto stop_locked;
+      }
+      gseq = rank.get_global_seq(gseq);
+      dout(10) << "connect got RETRY_GLOBAL " << gseq << dendl;
+      lock.Unlock();
       continue;
     }
-    if (tag == CEPH_MSGR_TAG_RETRY) {
+    if (tag == CEPH_MSGR_TAG_RETRY_SESSION) {
       int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
       if (rc < 0) {
-       dout(0) << "connect got RETRY tag but couldn't read cseq" << dendl;
+       dout(0) << "connect got RETRY_SESSION tag but couldn't read cseq" << dendl;
        goto fail;
       }
       lock.Lock();
       if (state != STATE_CONNECTING) {
-       dout(0) << "connect got RETRY, but connection race or something, failing" << dendl;
+       dout(0) << "connect got RETRY_SESSION, but connection race or something, failing" << dendl;
        goto stop_locked;
       }
       assert(cseq > connect_seq);
-      dout(10) << "connect got RETRY " << connect_seq << " -> " << cseq << dendl;
+      dout(10) << "connect got RETRY_SESSION " << connect_seq << " -> " << cseq << dendl;
       connect_seq = cseq;
-    }
-
-    if (tag == CEPH_MSGR_TAG_RESETSESSION ||
-       tag == CEPH_MSGR_TAG_RETRY) {
-      // retry
       lock.Unlock();
-      memset(&msg, 0, sizeof(msg));
-      msgvec[0].iov_base = (char*)&cseq;
-      msgvec[0].iov_len = sizeof(cseq);
-      msg.msg_iov = msgvec;
-      msg.msg_iovlen = 1;
-      msglen = msgvec[0].iov_len;
       continue;
     }
 
index acaf6dbd24eb05606a920ff4ffcfe274544ea16a..a579967f57432db3178b82e72fc27cda18d36a67 100644 (file)
@@ -125,7 +125,7 @@ private:
     list<Message*> sent;
     Cond cond;
     
-    __u32 connect_seq;
+    __u32 connect_seq, peer_global_seq;
     __u32 out_seq;
     __u32 in_seq, in_seq_acked;
     
@@ -169,7 +169,7 @@ private:
       sd(-1),
       state(st), 
       reader_running(false), writer_running(false),
-      connect_seq(0),
+      connect_seq(0), peer_global_seq(0),
       out_seq(0), in_seq(0), in_seq_acked(0),
       reader_thread(this), writer_thread(this) { }
     //~Pipe() { cout << "destructor on " << this << std::endl; }
@@ -343,7 +343,10 @@ private:
 
   set<Pipe*>      pipes;
   list<Pipe*>     pipe_reap_queue;
-        
+  
+  Mutex global_seq_lock;
+  __u32 global_seq;
+      
   Pipe *connect_rank(const entity_addr_t& addr, const Policy& p);
 
   const entity_addr_t &get_rank_addr() { return rank_addr; }
@@ -363,6 +366,13 @@ public:
   int start(bool nodaemon = false);
   void wait();
 
+  __u32 get_global_seq(__u32 old=0) {
+    Mutex::Locker l(global_seq_lock);
+    if (old > global_seq)
+      global_seq = old;
+    return ++global_seq;
+  }
+
   EntityMessenger *register_entity(entity_name_t addr);
   void rename_entity(EntityMessenger *ms, entity_name_t newaddr);
   void unregister_entity(EntityMessenger *ms);