void Pipe::DelayedDelivery::discard()
{
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl;
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
}
}
+void Pipe::DelayedDelivery::flush()
+{
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ delay_queue.pop_front();
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
+}
+
void *Pipe::DelayedDelivery::entry()
{
Mutex::Locker locker(delay_lock);
// make existing Connection reference us
existing->connection_state->reset_pipe(this);
-
+
+ // flush/queue any existing delayed messages
+ if (existing->delay_thread)
+ existing->delay_thread->flush();
+
// steal incoming queue
uint64_t replaced_conn_id = conn_id;
conn_id = existing->conn_id;
return;
}
+ // queue delayed items immediately
+ if (delay_thread)
+ delay_thread->flush();
+
// requeue sent items
requeue_sent();
ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
return;
- }
+ }
if (state != STATE_CONNECTING) {
if (policy.server) {