From: Sage Weil Date: Thu, 21 Mar 2013 04:52:21 +0000 (-0700) Subject: msg/Pipe: fix seq handshake on reconnect X-Git-Tag: v0.62~197^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=541cd3c64be0dfa04e8a2df39422e0eb9541a428;p=ceph.git msg/Pipe: fix seq handshake on reconnect We go to the trouble to exchange our seq numbers during the handshake, but the bit that then avoids resending old messages was broken because we already requeue_sent() before we get to this point. Fix it by discarding queued items (in the high prio slot) that we don't need to resend, and adjust out_seq as needed. Drop the optional arg to requeue_sent() now that it is unused. Signed-off-by: Sage Weil --- diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 1420b63beb8..ae94a6a340c 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -648,7 +648,7 @@ int Pipe::accept() ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl; goto fail_registered; } - requeue_sent(newly_acked_seq); + discard_requeued_up_to(newly_acked_seq); } pipe_lock.Lock(); @@ -1097,7 +1097,7 @@ void Pipe::unregister_pipe() } -void Pipe::requeue_sent(uint64_t max_acked) +void Pipe::requeue_sent() { if (sent.empty()) return; @@ -1106,16 +1106,26 @@ void Pipe::requeue_sent(uint64_t max_acked) while (!sent.empty()) { Message *m = sent.back(); sent.pop_back(); - if (m->get_seq() > max_acked) { - ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq - << " (" << m->get_seq() << ")" << dendl; - rq.push_front(m); - out_seq--; - } else { - ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq - << " <= max_acked " << max_acked << ", discarding" << dendl; - m->put(); - } + ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(m); + out_seq--; + } +} + +void Pipe::discard_requeued_up_to(uint64_t seq) +{ + ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl; + list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + while (!rq.empty()) { + Message *m = rq.front(); + if (m->get_seq() == 0 || m->get_seq() > seq) + break; + ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq + << " <= " << seq << ", discarding" << dendl; + m->put(); + rq.pop_front(); + out_seq++; } } diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index ce6298d9681..e2a155a6038 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -268,9 +268,10 @@ class DispatchQueue; return m; } - /* Remove all messages from the sent queue. Add those with seq > max_acked - * to the highest priority outgoing queue. */ - void requeue_sent(uint64_t max_acked=0); + /// move all messages in the sent list back into the queue at the highest priority. + void requeue_sent(); + /// discard messages requeued by requeued_sent() up to a given seq + void discard_requeued_up_to(uint64_t seq); void discard_out_queue(); void shutdown_socket() {