From 2d5d3097c3998add1061ce253104154d72879237 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Sun, 4 May 2014 21:58:04 -0700 Subject: [PATCH] Pipe: wait for Pipes to finish running, instead of just stop()ing them Add a stop_and_wait() function that, in addition to closing the Pipe and killing its socket, waits for any fast_dispatch call which is in-progress. Use this in several parts of the Pipe and SimpleMessenger code where appropriate. This fixes several races with fast_dispatch and other avenues; here are two: 1) It could be that we grab the lock while the existing pipe is fast_dispatching and then proceed to dispatch messages ourself, beating it. Instead, wait for the other pipe. Add a "reader_dispatching" member which tells bus this is happening, and when re-locking, signal the cond if we're shutting down. 2) It could be that a normally-dispatched Message in the OSD triggers a mark_down() on the Connection and then clears out the Session (Connection::priv) pointer, causing a racing fast_dispatch()'ed function to assert out in the OSD because it requires a valid Session. Signed-off-by: Greg Farnum Reviewed-by: Sage Weil --- src/msg/Pipe.cc | 15 +++++++++++++++ src/msg/Pipe.h | 5 +++++ src/msg/SimpleMessenger.cc | 12 ++++++------ 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index eb748ea3c7fe9..89b592b3f4af3 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -83,6 +83,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) state(st), connection_state(NULL), reader_running(false), reader_needs_join(false), + reader_dispatching(false), writer_running(false), in_q(&(r->dispatch_queue)), send_keepalive(false), @@ -642,6 +643,7 @@ int Pipe::accept() ++p) out_q[p->first].splice(out_q[p->first].begin(), p->second); } + existing->stop_and_wait(); existing->pipe_lock.Unlock(); open: @@ -1390,6 +1392,15 @@ void Pipe::stop() shutdown_socket(); } +void Pipe::stop_and_wait() +{ + if (state != STATE_CLOSED) + stop(); + + while (reader_running && + reader_dispatching) + cond.Wait(pipe_lock); +} /* read msgs from socket. * also, server. @@ -1540,9 +1551,13 @@ void Pipe::reader() delay_thread->queue(release, m); } else { if (in_q->can_fast_dispatch(m)) { + reader_dispatching = true; pipe_lock.Unlock(); in_q->fast_dispatch(m); pipe_lock.Lock(); + reader_dispatching = false; + if (state == STATE_CLOSED) // there might be somebody waiting + cond.Signal(); } else { in_q->enqueue(m, m->get_priority(), conn_id); } diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 0bd7febae3baf..eef62875df298 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -173,6 +173,7 @@ class DispatchQueue; utime_t backoff; // backoff time bool reader_running, reader_needs_join; + bool reader_dispatching; /// reader thread is dispatching without pipe_lock bool writer_running; map > out_q; // priority queue for outbound msgs @@ -257,7 +258,11 @@ class DispatchQueue; void register_pipe(); void unregister_pipe(); void join(); + /// stop a Pipe by closing its socket and setting it to STATE_CLOSED void stop(); + /// stop() a Pipe if not already done, and wait for it to finish any + /// fast_dispatch in progress. + void stop_and_wait(); void _send(Message *m) { assert(pipe_lock.is_locked()); diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index ddc0dcaffcc75..4a8711f722daf 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -571,7 +571,7 @@ void SimpleMessenger::wait() Pipe *p = rank_pipe.begin()->second; p->unregister_pipe(); p->pipe_lock.Lock(); - p->stop(); + p->stop_and_wait(); p->pipe_lock.Unlock(); } @@ -599,7 +599,7 @@ void SimpleMessenger::mark_down_all() Pipe *p = *q; ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl; p->pipe_lock.Lock(); - p->stop(); + p->stop_and_wait(); ConnectionRef con = p->connection_state; if (con && con->clear_pipe(p)) dispatch_queue.queue_reset(con.get()); @@ -614,7 +614,7 @@ void SimpleMessenger::mark_down_all() rank_pipe.erase(it); p->unregister_pipe(); p->pipe_lock.Lock(); - p->stop(); + p->stop_and_wait(); ConnectionRef con = p->connection_state; if (con && con->clear_pipe(p)) dispatch_queue.queue_reset(con.get()); @@ -631,7 +631,7 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr) ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl; p->unregister_pipe(); p->pipe_lock.Lock(); - p->stop(); + p->stop_and_wait(); if (p->connection_state) { // generate a reset event for the caller in this case, even // though they asked for it, since this is the addr-based (and @@ -658,7 +658,7 @@ void SimpleMessenger::mark_down(Connection *con) assert(p->msgr == this); p->unregister_pipe(); p->pipe_lock.Lock(); - p->stop(); + p->stop_and_wait(); if (p->connection_state) { // do not generate a reset event for the caller in this case, // since they asked for it. @@ -682,7 +682,7 @@ void SimpleMessenger::mark_down_on_empty(Connection *con) p->unregister_pipe(); if (p->out_q.empty()) { ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " closing (queue is empty)" << dendl; - p->stop(); + p->stop_and_wait(); } else { ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " marking (queue is not empty)" << dendl; p->close_on_empty = true; -- 2.39.5