]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: flush delayed messages when stealing/failing pipes
authorSage Weil <sage@inktank.com>
Sat, 1 Dec 2012 04:23:52 +0000 (20:23 -0800)
committerSage Weil <sage@inktank.com>
Sat, 1 Dec 2012 20:45:22 +0000 (12:45 -0800)
If we are failing a pipe, flush the incoming messages before we try to
reconnect.  Similarly, flush queued messages on an existing pipe beore we
replace it.  This ensures that when we get a socket failure and reconnect
the delayed messages are handled in the normal fashion.

Specifically, it fixes a situation like:

 - read msg, update in_seq etc.
 - delay msg
 - pipe faults
 - peer reconnects, we replace existing pipe, discard delayed msgs
 - peer resends msgs
 - we discard, because they are < in_seq

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/Pipe.cc
src/msg/Pipe.h

index 2bbaf0bc17f6c04d977e26f1732aa0e10428bfa3..1ebf2854473beaadf62ada7c88549e95514eed71 100644 (file)
@@ -160,6 +160,7 @@ void Pipe::join_reader()
 
 void Pipe::DelayedDelivery::discard()
 {
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl;
   Mutex::Locker l(delay_lock);
   while (!delay_queue.empty()) {
     Message *m = delay_queue.front().second;
@@ -169,6 +170,17 @@ void Pipe::DelayedDelivery::discard()
   }
 }
 
+void Pipe::DelayedDelivery::flush()
+{
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
+  Mutex::Locker l(delay_lock);
+  while (!delay_queue.empty()) {
+    Message *m = delay_queue.front().second;
+    delay_queue.pop_front();
+    pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+  }
+}
+
 void *Pipe::DelayedDelivery::entry()
 {
   Mutex::Locker locker(delay_lock);
@@ -543,7 +555,11 @@ int Pipe::accept()
 
     // make existing Connection reference us
     existing->connection_state->reset_pipe(this);
+
+    // flush/queue any existing delayed messages
+    if (existing->delay_thread)
+      existing->delay_thread->flush();
+
     // steal incoming queue
     uint64_t replaced_conn_id = conn_id;
     conn_id = existing->conn_id;
@@ -1113,6 +1129,10 @@ void Pipe::fault(bool onread)
     return;
   }
 
+  // queue delayed items immediately
+  if (delay_thread)
+    delay_thread->flush();
+
   // requeue sent items
   requeue_sent();
 
@@ -1120,7 +1140,7 @@ void Pipe::fault(bool onread)
     ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
     state = STATE_STANDBY;
     return;
-  } 
+  }
 
   if (state != STATE_CONNECTING) {
     if (policy.server) {
index 30fb7eede11322842bd3e044edb47e670de190ac..1bcc8263f4a6934df0c8d15fa78e22a6cc29e201 100644 (file)
@@ -90,6 +90,7 @@ class DispatchQueue;
        delay_cond.Signal();
       }
       void discard();
+      void flush();
       void stop() {
        delay_lock.Lock();
        stop_delayed_delivery = true;