]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Pipe: wait for Pipes to finish running, instead of just stop()ing them
authorGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 04:58:04 +0000 (21:58 -0700)
committerGreg Farnum <greg@inktank.com>
Tue, 6 May 2014 18:39:34 +0000 (11:39 -0700)
Add a stop_and_wait() function that, in addition to closing the Pipe and killing
its socket, waits for any fast_dispatch call which is in-progress. Use this in
several parts of the Pipe and SimpleMessenger code where appropriate.

This fixes several races with fast_dispatch and other avenues; here are two:
1) It could be that we grab the lock while the existing pipe is fast_dispatching
and then proceed to dispatch messages ourself, beating it. Instead, wait for
the other pipe. Add a "reader_dispatching" member which tells bus this is
happening, and when re-locking, signal the cond if we're shutting down.

2) It could be that a normally-dispatched Message in the OSD triggers a
mark_down() on the Connection and then clears out the Session
(Connection::priv) pointer, causing a racing fast_dispatch()'ed function to
assert out in the OSD because it requires a valid Session.

Signed-off-by: Greg Farnum <greg@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
src/msg/Pipe.cc
src/msg/Pipe.h
src/msg/SimpleMessenger.cc

index eb748ea3c7fe99703e2c821d19aa8cfc0e9063d4..89b592b3f4af3a918d2cc70b5a30e3c73051b807 100644 (file)
@@ -83,6 +83,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
     state(st),
     connection_state(NULL),
     reader_running(false), reader_needs_join(false),
+    reader_dispatching(false),
     writer_running(false),
     in_q(&(r->dispatch_queue)),
     send_keepalive(false),
@@ -642,6 +643,7 @@ int Pipe::accept()
          ++p)
       out_q[p->first].splice(out_q[p->first].begin(), p->second);
   }
+  existing->stop_and_wait();
   existing->pipe_lock.Unlock();
 
  open:
@@ -1390,6 +1392,15 @@ void Pipe::stop()
   shutdown_socket();
 }
 
+void Pipe::stop_and_wait()
+{
+  if (state != STATE_CLOSED)
+    stop();
+  
+  while (reader_running &&
+        reader_dispatching)
+    cond.Wait(pipe_lock);
+}
 
 /* read msgs from socket.
  * also, server.
@@ -1540,9 +1551,13 @@ void Pipe::reader()
         delay_thread->queue(release, m);
       } else {
         if (in_q->can_fast_dispatch(m)) {
+         reader_dispatching = true;
           pipe_lock.Unlock();
           in_q->fast_dispatch(m);
           pipe_lock.Lock();
+         reader_dispatching = false;
+         if (state == STATE_CLOSED) // there might be somebody waiting
+           cond.Signal();
         } else {
           in_q->enqueue(m, m->get_priority(), conn_id);
         }
index 0bd7febae3baf19bb49f1cae98fa0bd6057a7d0e..eef62875df298551fb37033f10378b66ba6e11b8 100644 (file)
@@ -173,6 +173,7 @@ class DispatchQueue;
     utime_t backoff;         // backoff time
 
     bool reader_running, reader_needs_join;
+    bool reader_dispatching; /// reader thread is dispatching without pipe_lock
     bool writer_running;
 
     map<int, list<Message*> > out_q;  // priority queue for outbound msgs
@@ -257,7 +258,11 @@ class DispatchQueue;
     void register_pipe();
     void unregister_pipe();
     void join();
+    /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
     void stop();
+    /// stop() a Pipe if not already done, and wait for it to finish any
+    /// fast_dispatch in progress.
+    void stop_and_wait();
 
     void _send(Message *m) {
       assert(pipe_lock.is_locked());
index ddc0dcaffcc75e1e9f5d253091bd67f39be238e6..4a8711f722daf8ea84b98646a0c506dff468433c 100644 (file)
@@ -571,7 +571,7 @@ void SimpleMessenger::wait()
       Pipe *p = rank_pipe.begin()->second;
       p->unregister_pipe();
       p->pipe_lock.Lock();
-      p->stop();
+      p->stop_and_wait();
       p->pipe_lock.Unlock();
     }
 
@@ -599,7 +599,7 @@ void SimpleMessenger::mark_down_all()
     Pipe *p = *q;
     ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
     p->pipe_lock.Lock();
-    p->stop();
+    p->stop_and_wait();
     ConnectionRef con = p->connection_state;
     if (con && con->clear_pipe(p))
       dispatch_queue.queue_reset(con.get());
@@ -614,7 +614,7 @@ void SimpleMessenger::mark_down_all()
     rank_pipe.erase(it);
     p->unregister_pipe();
     p->pipe_lock.Lock();
-    p->stop();
+    p->stop_and_wait();
     ConnectionRef con = p->connection_state;
     if (con && con->clear_pipe(p))
       dispatch_queue.queue_reset(con.get());
@@ -631,7 +631,7 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr)
     ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
     p->unregister_pipe();
     p->pipe_lock.Lock();
-    p->stop();
+    p->stop_and_wait();
     if (p->connection_state) {
       // generate a reset event for the caller in this case, even
       // though they asked for it, since this is the addr-based (and
@@ -658,7 +658,7 @@ void SimpleMessenger::mark_down(Connection *con)
     assert(p->msgr == this);
     p->unregister_pipe();
     p->pipe_lock.Lock();
-    p->stop();
+    p->stop_and_wait();
     if (p->connection_state) {
       // do not generate a reset event for the caller in this case,
       // since they asked for it.
@@ -682,7 +682,7 @@ void SimpleMessenger::mark_down_on_empty(Connection *con)
     p->unregister_pipe();
     if (p->out_q.empty()) {
       ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " closing (queue is empty)" << dendl;
-      p->stop();
+      p->stop_and_wait();
     } else {
       ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " marking (queue is not empty)" << dendl;
       p->close_on_empty = true;