]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Pipe: stop delayed delivery fast_dispatch in stop_and_wait()
authorGreg Farnum <greg@inktank.com>
Mon, 15 Sep 2014 21:03:35 +0000 (14:03 -0700)
committerGreg Farnum <greg@inktank.com>
Fri, 19 Sep 2014 04:13:28 +0000 (21:13 -0700)
If we don't, we can keep fast_dispatching messages after a Pipe has been
mark_down()ed. That breaks things right now.

Fixes: #9295
Signed-off-by: Greg Farnum <greg@inktank.com>
src/msg/Pipe.cc
src/msg/Pipe.h

index c910af87bc8d6e9c29877731ba6e7a523533b619..9dc5c349dbd9b8a0e12afc15a63172ef15135311 100644 (file)
@@ -220,9 +220,19 @@ void *Pipe::DelayedDelivery::entry()
       active_flush = true;
     }
     if (pipe->in_q->can_fast_dispatch(m)) {
-      delay_lock.Unlock();
-      pipe->in_q->fast_dispatch(m);
-      delay_lock.Lock();
+      if (!stop_fast_dispatching_flag) {
+        delay_dispatching = true;
+        delay_lock.Unlock();
+        pipe->in_q->fast_dispatch(m);
+        delay_lock.Lock();
+        delay_dispatching = false;
+        if (stop_fast_dispatching_flag) {
+          // we need to let the stopping thread proceed
+          delay_cond.Signal();
+          delay_lock.Unlock();
+          delay_lock.Lock();
+        }
+      }
     } else {
       pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
     }
@@ -232,6 +242,15 @@ void *Pipe::DelayedDelivery::entry()
   return NULL;
 }
 
+void Pipe::DelayedDelivery::stop_fast_dispatching() {
+  Mutex::Locker l(delay_lock);
+  stop_fast_dispatching_flag = true;
+  // we can't block if we're the delay thread; see Pipe::stop_and_wait()
+  while (delay_dispatching && !am_self())
+    delay_cond.Wait(delay_lock);
+}
+
+
 int Pipe::accept()
 {
   ldout(msgr->cct,10) << "accept" << dendl;
@@ -1407,7 +1426,11 @@ void Pipe::stop_and_wait()
   // dispatch method calls mark_down() on itself, it can block here
   // waiting for the reader_dispatching flag to clear... which will
   // clearly never happen.  Avoid the situation by skipping the wait
-  // if we are marking our *own* connect down.
+  // if we are marking our *own* connect down. Do the same for the
+  // delayed dispatch thread.
+  if (delay_thread) {
+    delay_thread->stop_fast_dispatching();
+  }
   while (reader_running &&
         reader_dispatching &&
         !reader_thread.am_self())
index cc2044c3d44fb857c8bdd3fddf2ff71d66c15a1f..5496b5ce47d657190bea5bb4652bb68547b70696 100644 (file)
@@ -79,13 +79,17 @@ class DispatchQueue;
       int flush_count;
       bool active_flush;
       bool stop_delayed_delivery;
+      bool delay_dispatching; // we are in fast dispatch now
+      bool stop_fast_dispatching_flag; // we need to stop fast dispatching
 
     public:
       DelayedDelivery(Pipe *p)
        : pipe(p),
          delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
          active_flush(false),
-         stop_delayed_delivery(false) { }
+         stop_delayed_delivery(false),
+         delay_dispatching(false),
+         stop_fast_dispatching_flag(false) { }
       ~DelayedDelivery() {
        discard();
       }
@@ -116,6 +120,11 @@ class DispatchQueue;
         Mutex::Locker l(delay_lock);
         pipe = new_owner;
       }
+      /**
+       * We need to stop fast dispatching before we need to stop putting
+       * normal messages into the DispatchQueue.
+       */
+      void stop_fast_dispatching();
     } *delay_thread;
     friend class DelayedDelivery;