ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
goto fail_registered;
}
- requeue_sent(newly_acked_seq);
+ discard_requeued_up_to(newly_acked_seq);
}
pipe_lock.Lock();
}
-void Pipe::requeue_sent(uint64_t max_acked)
+void Pipe::requeue_sent()
{
if (sent.empty())
return;
while (!sent.empty()) {
Message *m = sent.back();
sent.pop_back();
- if (m->get_seq() > max_acked) {
- ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
- << " (" << m->get_seq() << ")" << dendl;
- rq.push_front(m);
- out_seq--;
- } else {
- ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
- << " <= max_acked " << max_acked << ", discarding" << dendl;
- m->put();
- }
+ ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
+ << " (" << m->get_seq() << ")" << dendl;
+ rq.push_front(m);
+ out_seq--;
+ }
+}
+
+void Pipe::discard_requeued_up_to(uint64_t seq)
+{
+ ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
+ list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+ while (!rq.empty()) {
+ Message *m = rq.front();
+ if (m->get_seq() == 0 || m->get_seq() > seq)
+ break;
+ ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
+ << " <= " << seq << ", discarding" << dendl;
+ m->put();
+ rq.pop_front();
+ out_seq++;
}
}
return m;
}
- /* Remove all messages from the sent queue. Add those with seq > max_acked
- * to the highest priority outgoing queue. */
- void requeue_sent(uint64_t max_acked=0);
+ /// move all messages in the sent list back into the queue at the highest priority.
+ void requeue_sent();
+ /// discard messages requeued by requeued_sent() up to a given seq
+ void discard_requeued_up_to(uint64_t seq);
void discard_out_queue();
void shutdown_socket() {