]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: fix seq handshake on reconnect 120/head
authorSage Weil <sage@inktank.com>
Thu, 21 Mar 2013 04:52:21 +0000 (21:52 -0700)
committerSage Weil <sage@inktank.com>
Thu, 21 Mar 2013 04:52:21 +0000 (21:52 -0700)
We go to the trouble to exchange our seq numbers during the handshake, but
the bit that then avoids resending old messages was broken because we
already requeue_sent() before we get to this point.  Fix it by discarding
queued items (in the high prio slot) that we don't need to resend, and
adjust out_seq as needed.

Drop the optional arg to requeue_sent() now that it is unused.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/Pipe.cc
src/msg/Pipe.h

index 1420b63beb805afd96433652db4b47965eb825ca..ae94a6a340c86ed0009fbe7a7a9ba3d249e6537b 100644 (file)
@@ -648,7 +648,7 @@ int Pipe::accept()
       ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
       goto fail_registered;
     }
-    requeue_sent(newly_acked_seq);
+    discard_requeued_up_to(newly_acked_seq);
   }
 
   pipe_lock.Lock();
@@ -1097,7 +1097,7 @@ void Pipe::unregister_pipe()
 }
 
 
-void Pipe::requeue_sent(uint64_t max_acked)
+void Pipe::requeue_sent()
 {
   if (sent.empty())
     return;
@@ -1106,16 +1106,26 @@ void Pipe::requeue_sent(uint64_t max_acked)
   while (!sent.empty()) {
     Message *m = sent.back();
     sent.pop_back();
-    if (m->get_seq() > max_acked) {
-      ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
-          << " (" << m->get_seq() << ")" << dendl;
-      rq.push_front(m);
-      out_seq--;
-    } else {
-      ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
-                         << " <= max_acked " << max_acked << ", discarding" << dendl;
-      m->put();
-    }
+    ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
+                       << " (" << m->get_seq() << ")" << dendl;
+    rq.push_front(m);
+    out_seq--;
+  }
+}
+
+void Pipe::discard_requeued_up_to(uint64_t seq)
+{
+  ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
+  list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+  while (!rq.empty()) {
+    Message *m = rq.front();
+    if (m->get_seq() == 0 || m->get_seq() > seq)
+      break;
+    ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
+                       << " <= " << seq << ", discarding" << dendl;
+    m->put();
+    rq.pop_front();
+    out_seq++;
   }
 }
 
index ce6298d9681162d7cc5c65cec8d4d69c8132ff59..e2a155a603863f31fdcebf40006e327324f213b7 100644 (file)
@@ -268,9 +268,10 @@ class DispatchQueue;
       return m;
     }
 
-    /* Remove all messages from the sent queue. Add those with seq > max_acked
-     * to the highest priority outgoing queue. */
-    void requeue_sent(uint64_t max_acked=0);
+    /// move all messages in the sent list back into the queue at the highest priority.
+    void requeue_sent();
+    /// discard messages requeued by requeued_sent() up to a given seq
+    void discard_requeued_up_to(uint64_t seq);
     void discard_out_queue();
 
     void shutdown_socket() {