}
-void SimpleMessenger::Pipe::requeue_sent()
+void SimpleMessenger::Pipe::requeue_sent(uint64_t max_acked)
{
if (sent.empty())
return;
list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
while (!sent.empty()) {
Message *m = sent.back();
- sent.pop_back();
- dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq
- << " (" << m->get_seq() << ")" << dendl;
- rq.push_front(m);
- out_seq--;
+ if (m->get_seq() > max_acked) {
+ sent.pop_back();
+ dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq
+ << " (" << m->get_seq() << ")" << dendl;
+ rq.push_front(m);
+ out_seq--;
+ } else
+ sent.clear();
}
}
return m;
}
- void requeue_sent();
+ /* Remove all messages from the sent queue. Add those with seq > max_acked
+ * to the highest priority outgoing queue. */
+ void requeue_sent(uint64_t max_acked=0);
void discard_queue();
void force_close() {