active_flush = true;
}
if (pipe->in_q->can_fast_dispatch(m)) {
- delay_lock.Unlock();
- pipe->in_q->fast_dispatch(m);
- delay_lock.Lock();
+ if (!stop_fast_dispatching_flag) {
+ delay_dispatching = true;
+ delay_lock.Unlock();
+ pipe->in_q->fast_dispatch(m);
+ delay_lock.Lock();
+ delay_dispatching = false;
+ if (stop_fast_dispatching_flag) {
+ // we need to let the stopping thread proceed
+ delay_cond.Signal();
+ delay_lock.Unlock();
+ delay_lock.Lock();
+ }
+ }
} else {
pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
}
return NULL;
}
+void Pipe::DelayedDelivery::stop_fast_dispatching() {
+ Mutex::Locker l(delay_lock);
+ stop_fast_dispatching_flag = true;
+ // we can't block if we're the delay thread; see Pipe::stop_and_wait()
+ while (delay_dispatching && !am_self())
+ delay_cond.Wait(delay_lock);
+}
+
+
int Pipe::accept()
{
ldout(msgr->cct,10) << "accept" << dendl;
// dispatch method calls mark_down() on itself, it can block here
// waiting for the reader_dispatching flag to clear... which will
// clearly never happen. Avoid the situation by skipping the wait
- // if we are marking our *own* connect down.
+ // if we are marking our *own* connect down. Do the same for the
+ // delayed dispatch thread.
+ if (delay_thread) {
+ delay_thread->stop_fast_dispatching();
+ }
while (reader_running &&
reader_dispatching &&
!reader_thread.am_self())
int flush_count;
bool active_flush;
bool stop_delayed_delivery;
+ bool delay_dispatching; // we are in fast dispatch now
+ bool stop_fast_dispatching_flag; // we need to stop fast dispatching
public:
DelayedDelivery(Pipe *p)
: pipe(p),
delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
active_flush(false),
- stop_delayed_delivery(false) { }
+ stop_delayed_delivery(false),
+ delay_dispatching(false),
+ stop_fast_dispatching_flag(false) { }
~DelayedDelivery() {
discard();
}
Mutex::Locker l(delay_lock);
pipe = new_owner;
}
+ /**
+ * We need to stop fast dispatching before we need to stop putting
+ * normal messages into the DispatchQueue.
+ */
+ void stop_fast_dispatching();
} *delay_thread;
friend class DelayedDelivery;