From: Greg Farnum Date: Mon, 15 Sep 2014 21:03:35 +0000 (-0700) Subject: Pipe: stop delayed delivery fast_dispatch in stop_and_wait() X-Git-Tag: v0.86~31^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=681a75488c8365ff101aae1588f082f0b7f36ceb;p=ceph.git Pipe: stop delayed delivery fast_dispatch in stop_and_wait() If we don't, we can keep fast_dispatching messages after a Pipe has been mark_down()ed. That breaks things right now. Fixes: #9295 Signed-off-by: Greg Farnum --- diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index c910af87bc8..9dc5c349dbd 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -220,9 +220,19 @@ void *Pipe::DelayedDelivery::entry() active_flush = true; } if (pipe->in_q->can_fast_dispatch(m)) { - delay_lock.Unlock(); - pipe->in_q->fast_dispatch(m); - delay_lock.Lock(); + if (!stop_fast_dispatching_flag) { + delay_dispatching = true; + delay_lock.Unlock(); + pipe->in_q->fast_dispatch(m); + delay_lock.Lock(); + delay_dispatching = false; + if (stop_fast_dispatching_flag) { + // we need to let the stopping thread proceed + delay_cond.Signal(); + delay_lock.Unlock(); + delay_lock.Lock(); + } + } } else { pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); } @@ -232,6 +242,15 @@ void *Pipe::DelayedDelivery::entry() return NULL; } +void Pipe::DelayedDelivery::stop_fast_dispatching() { + Mutex::Locker l(delay_lock); + stop_fast_dispatching_flag = true; + // we can't block if we're the delay thread; see Pipe::stop_and_wait() + while (delay_dispatching && !am_self()) + delay_cond.Wait(delay_lock); +} + + int Pipe::accept() { ldout(msgr->cct,10) << "accept" << dendl; @@ -1407,7 +1426,11 @@ void Pipe::stop_and_wait() // dispatch method calls mark_down() on itself, it can block here // waiting for the reader_dispatching flag to clear... which will // clearly never happen. Avoid the situation by skipping the wait - // if we are marking our *own* connect down. + // if we are marking our *own* connect down. Do the same for the + // delayed dispatch thread. + if (delay_thread) { + delay_thread->stop_fast_dispatching(); + } while (reader_running && reader_dispatching && !reader_thread.am_self()) diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index cc2044c3d44..5496b5ce47d 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -79,13 +79,17 @@ class DispatchQueue; int flush_count; bool active_flush; bool stop_delayed_delivery; + bool delay_dispatching; // we are in fast dispatch now + bool stop_fast_dispatching_flag; // we need to stop fast dispatching public: DelayedDelivery(Pipe *p) : pipe(p), delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0), active_flush(false), - stop_delayed_delivery(false) { } + stop_delayed_delivery(false), + delay_dispatching(false), + stop_fast_dispatching_flag(false) { } ~DelayedDelivery() { discard(); } @@ -116,6 +120,11 @@ class DispatchQueue; Mutex::Locker l(delay_lock); pipe = new_owner; } + /** + * We need to stop fast dispatching before we need to stop putting + * normal messages into the DispatchQueue. + */ + void stop_fast_dispatching(); } *delay_thread; friend class DelayedDelivery;