]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: when both ends support it, exchange in_seq values on reconnect
authorGreg Farnum <gregf@hq.newdream.net>
Mon, 13 Sep 2010 18:42:59 +0000 (11:42 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Mon, 13 Sep 2010 18:52:05 +0000 (11:52 -0700)
to prevent gratuitously re-sending messages.

This adds a new feature "CEPH_FEATURE_RECONNECT_SEQ" which goes into
the defaul msgr features, as well as a CEPH_MSGR_TAG_SEQ which indicates
this step is being taken and substitutes for CEPH_MSGR_TAG_READY.

src/include/ceph_fs.h
src/include/msgr.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 1b8ee257bb0e1a27bfdabd977716a76fb834a234..8c06e0de1e4341cc5ba569ca49e316fa04fb51f5 100644 (file)
@@ -45,6 +45,7 @@
 #define CEPH_FEATURE_FLOCK          (1<<3)
 #define CEPH_FEATURE_SUBSCRIBE2     (1<<4)
 #define CEPH_FEATURE_MONNAMES       (1<<5)
+#define CEPH_FEATURE_RECONNECT_SEQ  (1<<6)
 
 
 /*
index 680d3d648cac21bd9ed201a6c4bee1651a9df10e..3d94a73b5f30c43b6e47b1c97ef23c9861e5bb7f 100644 (file)
@@ -87,6 +87,7 @@ struct ceph_entity_inst {
 #define CEPH_MSGR_TAG_BADPROTOVER  10  /* bad protocol version */
 #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
 #define CEPH_MSGR_TAG_FEATURES      12 /* insufficient features */
+#define CEPH_MSGR_TAG_SEQ           13 /* 64-bit int follows with seen seq number */
 
 
 /*
index 9ce7cafecefd80068959fa72b956becad8c873d6..f0a2449a6adaa51c2be32ff7ed78e9181cac4715 100644 (file)
@@ -608,7 +608,8 @@ int SimpleMessenger::Pipe::accept()
 
   // this should roughly mirror pseudocode at
   //  http://ceph.newdream.net/wiki/Messaging_protocol
-
+  int reply_tag = 0;
+  bool replace = false;
   while (1) {
     rc = tcp_read(sd, (char*)&connect, sizeof(connect));
     if (rc < 0) {
@@ -799,26 +800,19 @@ int SimpleMessenger::Pipe::accept()
   }
   
  replace:
+  replace = true;
+  if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
+    reply_tag = CEPH_MSGR_TAG_SEQ;
+  }
   dout(10) << "accept replacing " << existing << dendl;
   existing->stop();
   existing->unregister_pipe();
     
-  // steal queue and out_seq
-  existing->requeue_sent();
-  out_seq = existing->out_seq;
-  in_seq = existing->in_seq;
-  dout(10) << "accept   out_seq " << out_seq << "  in_seq " << in_seq << dendl;
-  for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
-       p != existing->out_q.end();
-       p++)
-    out_q[p->first].splice(out_q[p->first].begin(), p->second);
-  
   //set ourself to take over other Connection, for older messages
   existing->connection_state->clear_pipe();
   existing->connection_state->pipe = get();
   existing->connection_state->put();
   existing->connection_state = NULL;
-  existing->pipe_lock.Unlock();
 
  open:
   // open
@@ -828,7 +822,7 @@ int SimpleMessenger::Pipe::accept()
   dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
 
   // send READY reply
-  reply.tag = CEPH_MSGR_TAG_READY;
+  reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
   reply.features = policy.features_supported;
   reply.global_seq = messenger->get_global_seq();
   reply.connect_seq = connect_seq;
@@ -854,6 +848,32 @@ int SimpleMessenger::Pipe::accept()
       goto fail_unlocked;
   }
 
+  if (replace) {
+    uint64_t newly_acked_seq = 0;
+    if (reply_tag == CEPH_MSGR_TAG_SEQ) {
+      if(tcp_write(sd, (char*)&existing->in_seq, sizeof(existing->in_seq)) < 0) {
+        dout(2) << "accept write error on in_seq" << dendl;
+        goto fail_unlocked;
+      }
+      if(tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
+        dout(2) << "accept read error on newly_acked_seq" << dendl;
+        goto fail_unlocked;
+      }
+    }
+    // steal queue and out_seq
+    existing->requeue_sent(newly_acked_seq);
+    out_seq = existing->out_seq;
+    in_seq = existing->in_seq;
+    in_seq_acked = in_seq;
+    dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl;
+    for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
+         p != existing->out_q.end();
+         p++)
+      out_q[p->first].splice(out_q[p->first].begin(), p->second);
+
+    existing->pipe_lock.Unlock();
+  }
+
   pipe_lock.Lock();
   if (state != STATE_CLOSED) {
     dout(10) << "accept starting writer, " << "state=" << state << dendl;
@@ -1135,13 +1155,28 @@ int SimpleMessenger::Pipe::connect()
       goto stop_locked;
     }
 
-    if (reply.tag == CEPH_MSGR_TAG_READY) {
+    if (reply.tag == CEPH_MSGR_TAG_READY ||
+        reply.tag == CEPH_MSGR_TAG_SEQ) {
       uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
       if (feat_missing) {
        dout(1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
        goto fail_locked;
       }
 
+      if (reply.tag == CEPH_MSGR_TAG_SEQ) {
+        dout(10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
+        uint64_t newly_acked_seq = 0;
+        if (tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
+          dout(2) << "connect read error on newly_acked_seq" << dendl;
+          goto fail_locked;
+        }
+        handle_ack(newly_acked_seq);
+        if (tcp_write(sd, (char*)&in_seq, sizeof(in_seq)) < 0) {
+          dout(2) << "connect write error on in_seq" << dendl;
+          goto fail_locked;
+        }
+      }
+
       // hooray!
       peer_global_seq = reply.global_seq;
       policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
@@ -1446,16 +1481,7 @@ void SimpleMessenger::Pipe::reader()
        dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
        fault(false, true);
       } else if (state != STATE_CLOSED) {
-       dout(15) << "reader got ack seq " << seq << dendl;
-       // trim sent list
-       while (!sent.empty() &&
-              sent.front()->get_seq() <= seq) {
-         Message *m = sent.front();
-         sent.pop_front();
-         dout(10) << "reader got ack seq " 
-                   << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
-         m->put();
-       }
+        handle_ack(seq);
       }
       continue;
     }
index 93e00fe96fe7c93f0927ea6b98421470fece4b55..0b9b46ee76552f5d4e7e7de60df4b8f7aa1072ab 100644 (file)
@@ -56,7 +56,8 @@ using namespace __gnu_cxx;
   CEPH_FEATURE_NOSRCADDR |      \
   CEPH_FEATURE_SUBSCRIBE2 |     \
   CEPH_FEATURE_MONNAMES |        \
-  CEPH_FEATURE_FLOCK
+  CEPH_FEATURE_FLOCK |           \
+  CEPH_FEATURE_RECONNECT_SEQ
 
 class SimpleMessenger : public Messenger {
 public:
@@ -169,6 +170,20 @@ private:
 
     void was_session_reset();
 
+    /* Clean up sent list */
+    void handle_ack(uint64_t seq) {
+      dout(15) << "reader got ack seq " << seq << dendl;
+      // trim sent list
+      while (!sent.empty() &&
+          sent.front()->get_seq() <= seq) {
+        Message *m = sent.front();
+        sent.pop_front();
+        dout(10) << "reader got ack seq "
+            << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
+        m->put();
+      }
+    }
+
     // threads
     class Reader : public Thread {
       Pipe *pipe;