]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: change the delay queue flushing semantics
authorGreg Farnum <greg@inktank.com>
Thu, 10 Apr 2014 23:32:56 +0000 (16:32 -0700)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:16 +0000 (15:29 -0700)
Since we're doing fast_dispatch out of the delay queue, we don't want to
flush while holding the pipe lock. Instead, make flush set it up for instant
delivery, and steal the delay queue when replacing pipes. If we're shutting
down a pipe, wait until flushing has completed before doing so.

Signed-off-by: Greg Farnum <greg@inktank.com>
src/msg/Pipe.cc
src/msg/Pipe.h

index fd2fbcef70ee46f40391ab7df4797dbe0803ccd2..eb748ea3c7fe99703e2c821d19aa8cfc0e9063d4 100644 (file)
@@ -191,17 +191,8 @@ void Pipe::DelayedDelivery::flush()
 {
   lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
   Mutex::Locker l(delay_lock);
-  while (!delay_queue.empty()) {
-    Message *m = delay_queue.front().second;
-    delay_queue.pop_front();
-    if (pipe->in_q->can_fast_dispatch(m)) {
-      delay_lock.Unlock();
-      pipe->in_q->fast_dispatch(m);
-      delay_lock.Lock();
-    } else {
-      pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
-    }
-  }
+  flush_count = delay_queue.size();
+  delay_cond.Signal();
 }
 
 void *Pipe::DelayedDelivery::entry()
@@ -218,14 +209,19 @@ void *Pipe::DelayedDelivery::entry()
     utime_t release = delay_queue.front().first;
     Message *m = delay_queue.front().second;
     string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
-    if (release > ceph_clock_now(pipe->msgr->cct) &&
-       (delay_msg_type.empty() || m->get_type_name() == delay_msg_type)) {
+    if (!flush_count &&
+        (release > ceph_clock_now(pipe->msgr->cct) &&
+         (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
       lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
       delay_cond.WaitUntil(delay_lock, release);
       continue;
     }
     lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
     delay_queue.pop_front();
+    if (flush_count > 0) {
+      --flush_count;
+      active_flush = true;
+    }
     if (pipe->in_q->can_fast_dispatch(m)) {
       delay_lock.Unlock();
       pipe->in_q->fast_dispatch(m);
@@ -233,6 +229,7 @@ void *Pipe::DelayedDelivery::entry()
     } else {
       pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
     }
+    active_flush = false;
   }
   lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
   return NULL;
@@ -611,7 +608,7 @@ int Pipe::accept()
     if (existing->connection_state->clear_pipe(existing))
       msgr->dispatch_queue.queue_reset(existing->connection_state.get());
   } else {
-    // queue a reset on the old connection
+    // queue a reset on the new connection, which we're dumping for the old
     msgr->dispatch_queue.queue_reset(connection_state.get());
 
     // drop my Connection, and take a ref to the existing one. do not
@@ -622,9 +619,12 @@ int Pipe::accept()
     // make existing Connection reference us
     connection_state->reset_pipe(this);
 
-    // flush/queue any existing delayed messages
-    if (existing->delay_thread)
-      existing->delay_thread->flush();
+    if (existing->delay_thread) {
+      existing->delay_thread->steal_for_pipe(this);
+      delay_thread = existing->delay_thread;
+      existing->delay_thread = NULL;
+      delay_thread->flush();
+    }
 
     // steal incoming queue
     uint64_t replaced_conn_id = conn_id;
@@ -1754,6 +1754,9 @@ void Pipe::unlock_maybe_reap()
   if (!reader_running && !writer_running) {
     shutdown_socket();
     pipe_lock.Unlock();
+    if (delay_thread && delay_thread->is_flushing()) {
+      delay_thread->wait_for_flush();
+    }
     msgr->queue_reap(this);
   } else {
     pipe_lock.Unlock();
index 468a6a52f2c013b98cb05f3efa7bad6971e28ea0..0bd7febae3baf19bb49f1cae98fa0bd6057a7d0e 100644 (file)
@@ -75,12 +75,15 @@ class DispatchQueue;
       std::deque< pair<utime_t,Message*> > delay_queue;
       Mutex delay_lock;
       Cond delay_cond;
+      int flush_count;
+      bool active_flush;
       bool stop_delayed_delivery;
 
     public:
       DelayedDelivery(Pipe *p)
        : pipe(p),
-         delay_lock("Pipe::DelayedDelivery::delay_lock"),
+         delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
+         active_flush(false),
          stop_delayed_delivery(false) { }
       ~DelayedDelivery() {
        discard();
@@ -93,12 +96,25 @@ class DispatchQueue;
       }
       void discard();
       void flush();
+      bool is_flushing() {
+        Mutex::Locker l(delay_lock);
+        return flush_count > 0 || active_flush;
+      }
+      void wait_for_flush() {
+        Mutex::Locker l(delay_lock);
+        while (flush_count > 0 || active_flush)
+          delay_cond.Wait(delay_lock);
+      }
       void stop() {
        delay_lock.Lock();
        stop_delayed_delivery = true;
        delay_cond.Signal();
        delay_lock.Unlock();
       }
+      void steal_for_pipe(Pipe *new_owner) {
+        Mutex::Locker l(delay_lock);
+        pipe = new_owner;
+      }
     } *delay_thread;
     friend class DelayedDelivery;