We originally blocked in mark_down for fast dispatch threads
to complete to avoid various races in the code. Most of these
were in the OSD itself, where we were not prepared to get
messges on connections that had no attached session. Since
then, the OSD checks have been cleaned up to handle this.
There were other races we were worried about too, but the
details have been lost in the depths of time.
Instead, take the other route: make mark_down never block on
dispatch. This lets us remove the special case that
was added in order to cope with fast dispatch calling
mark_down on itself.
Now, the only stop_and_wait() user is the shutdown sequence.
Signed-off-by: Sage Weil <sage@redhat.com>
(cherry picked from commit
00907e032011b9d2acd16ea588555cf379830814)
void Pipe::DelayedDelivery::stop_fast_dispatching() {
Mutex::Locker l(delay_lock);
stop_fast_dispatching_flag = true;
- // we can't block if we're the delay thread; see Pipe::stop_and_wait()
- while (delay_dispatching && !am_self())
+ while (delay_dispatching)
delay_cond.Wait(delay_lock);
}
t.sleep();
}
- // HACK: we work around an annoying deadlock here. If the fast
- // dispatch method calls mark_down() on itself, it can block here
- // waiting for the reader_dispatching flag to clear... which will
- // clearly never happen. Avoid the situation by skipping the wait
- // if we are marking our *own* connect down. Do the same for the
- // delayed dispatch thread.
if (delay_thread) {
delay_thread->stop_fast_dispatching();
}
while (reader_running &&
- reader_dispatching &&
- !reader_thread.am_self())
+ reader_dispatching)
cond.Wait(pipe_lock);
}
Pipe *p = *q;
ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
p->pipe_lock.Lock();
- p->stop_and_wait();
+ p->stop();
PipeConnectionRef con = p->connection_state;
if (con && con->clear_pipe(p))
dispatch_queue.queue_reset(con.get());
rank_pipe.erase(it);
p->unregister_pipe();
p->pipe_lock.Lock();
- p->stop_and_wait();
+ p->stop();
PipeConnectionRef con = p->connection_state;
if (con && con->clear_pipe(p))
dispatch_queue.queue_reset(con.get());
ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
p->unregister_pipe();
p->pipe_lock.Lock();
- p->stop_and_wait();
+ p->stop();
if (p->connection_state) {
// generate a reset event for the caller in this case, even
// though they asked for it, since this is the addr-based (and
assert(p->msgr == this);
p->unregister_pipe();
p->pipe_lock.Lock();
- p->stop_and_wait();
+ p->stop();
if (p->connection_state) {
// do not generate a reset event for the caller in this case,
// since they asked for it.