]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: refactor msgr delays
authorSage Weil <sage@inktank.com>
Tue, 27 Nov 2012 23:36:11 +0000 (15:36 -0800)
committerSage Weil <sage@inktank.com>
Fri, 30 Nov 2012 00:09:45 +0000 (16:09 -0800)
- move all delay state into a single class
- create thread once and only once per Pipe
- adjust debug levels
- discard messages at the appropriate times

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/Pipe.cc
src/msg/Pipe.h

index 609df8e7dd26597fe6de6cecbd85e6e67d6e46a9..34ded27d77fc1ebbc2248c110e87fba3f695b8a5 100644 (file)
@@ -54,8 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
 
 Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
   : reader_thread(this), writer_thread(this),
-    dispatch_thread(NULL), delay_queue(NULL), delay_until(NULL),
-    delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true),
+    delay_thread(NULL),
     msgr(r),
     conn_id(r->dispatch_queue.get_id()),
     sd(-1), port(0),
@@ -96,16 +95,7 @@ Pipe::~Pipe()
   if (connection_state)
     connection_state->put();
   delete session_security;
-  if (dispatch_thread) {
-    delete dispatch_thread;
-    assert(delay_queue->empty());
-    delete delay_queue;
-    assert(delay_until->empty());
-    delete delay_until;
-    assert(!delay_lock->is_locked());
-    delete delay_lock;
-    delete delay_cond;
-  }
+  delete delay_thread;
 }
 
 void Pipe::handle_ack(uint64_t seq)
@@ -137,27 +127,15 @@ void Pipe::start_reader()
   }
   reader_running = true;
   reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
-  if (!dispatch_thread &&
-      msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
-      != string::npos) {
+  if (!delay_thread &&
+      msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) {
     lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
-    dispatch_thread = new DelayedDelivery(this);
-    delay_queue = new std::deque< Message * >();
-    delay_until = new std::deque< utime_t>();
-    delay_lock = new Mutex("delay_lock");
-    delay_cond = new Cond();
+    delay_thread = new DelayedDelivery(this);
+    delay_thread->create();
   } else
     lsubdout(msgr->cct, ms, 1) << "Pipe " << this << " peer is " << ceph_entity_type_name(connection_state->peer_type)
                               << "; NOT injecting delays because it does not match "
                               << msgr->cct->_conf->ms_inject_delay_type << dendl;
-
-  if (dispatch_thread && stop_delayed_delivery) {
-    lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl;
-    delay_lock->Lock();
-    stop_delayed_delivery = false;
-    dispatch_thread->create();
-    delay_lock->Unlock();
-  }
 }
 
 void Pipe::start_writer()
@@ -183,44 +161,45 @@ void Pipe::join_reader()
 void Pipe::queue_received(Message *m, int priority)
 {
   assert(pipe_lock.is_locked());
-  if (delay_queue) {
-    lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl;
-    Mutex::Locker locker(*delay_lock);
-    delay_queue->push_back(m);
-    utime_t delay = ceph_clock_now(msgr->cct);
-    delay += 1.0;
-    delay_until->push_back(delay);
-    delay_cond->Signal();
+
+  if (delay_thread) {
+    utime_t release;
+    if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+      release = m->get_recv_stamp();
+      release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+      lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+    }
+    delay_thread->queue(release, m);
     return;
   }
+
   in_q->enqueue(m, priority, conn_id);
 }
 
-void Pipe::delayed_delivery() {
-  Mutex::Locker locker(*delay_lock);
+void *Pipe::DelayedDelivery::entry()
+{
+  Mutex::Locker locker(delay_lock);
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry start" << dendl;
+
   while (!stop_delayed_delivery) {
-    if (delay_queue->empty()) {
-      lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
-      delay_cond->Wait(*delay_lock);
+    if (delay_queue.empty()) {
+      lgeneric_subdout(pipe->msgr->cct, ms, 30) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
+      delay_cond.Wait(delay_lock);
       continue;
     }
-    if (delay_until->front() > ceph_clock_now(msgr->cct)) {
-      lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond until message " << delay_queue->front()
-                                << " delay passes" << dendl;
-      delay_cond->WaitUntil(*delay_lock, delay_until->front());
+    utime_t release = delay_queue.front().first;
+    if (release > ceph_clock_now(pipe->msgr->cct)) {
+      lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
+      delay_cond.WaitUntil(delay_lock, release);
       continue;
     }
-    Message *m = delay_queue->front();
-    lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delivery because delay until "
-                              << delay_until->front() << " has passed" << dendl;
-    delay_queue->pop_front();
-    delay_until->pop_front();
-    in_q->enqueue(m, m->get_priority(), conn_id);
-    if (delay_queue->empty()) {
-      lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl;
-      delay_cond->Wait(*delay_lock);
-    }
+    Message *m = delay_queue.front().second;
+    lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
+    delay_queue.pop_front();
+    pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
   }
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
+  return NULL;
 }
 
 int Pipe::accept()
@@ -1125,6 +1104,8 @@ void Pipe::fault(bool onread)
     msgr->lock.Unlock();
 
     in_q->discard_queue(conn_id);
+    if (delay_thread)
+      delay_thread->discard();
     discard_out_queue();
 
     // disconnect from Connection, and mark it failed.  future messages
@@ -1190,6 +1171,8 @@ void Pipe::was_session_reset()
 
   ldout(msgr->cct,10) << "was_session_reset" << dendl;
   in_q->discard_queue(conn_id);
+  if (delay_thread)
+    delay_thread->discard();
   discard_out_queue();
 
   msgr->dispatch_queue.queue_remote_reset(connection_state);
@@ -1209,17 +1192,6 @@ void Pipe::stop()
   state = STATE_CLOSED;
   cond.Signal();
   shutdown_socket();
-  if (dispatch_thread) {
-    lsubdout(msgr->cct, ms, 1) << "signalling to stop delayed dispatch thread and clear out messages" << dendl;
-    Mutex::Locker locker(*delay_lock);
-    stop_delayed_delivery = true;
-    while (!delay_queue->empty()) {
-      delay_queue->front()->put();
-      delay_queue->pop_front();
-      delay_until->pop_front();
-    }
-    delay_cond->Signal();
-  }
 }
 
 
index 58df8e76d24e708b6827f3fc9864092222d64d75..8b09ec0f27d758ddd7a128f7b23e43ae70a9ce87 100644 (file)
@@ -70,20 +70,41 @@ class DispatchQueue;
      */
     class DelayedDelivery: public Thread {
       Pipe *pipe;
+      std::deque< pair<utime_t,Message*> > delay_queue;
+      Mutex delay_lock;
+      Cond delay_cond;
+      bool stop_delayed_delivery;
+
     public:
-      DelayedDelivery(Pipe *p) : pipe(p) {}
-      void *entry() { pipe->delayed_delivery(); return 0; }
-    };
+      DelayedDelivery(Pipe *p)
+       : pipe(p),
+         delay_lock("Pipe::DelayedDelivery::delay_lock"),
+         stop_delayed_delivery(false) { }
+      ~DelayedDelivery() {
+       discard();
+      }
+      void *entry();
+      void queue(utime_t release, Message *m) {
+       Mutex::Locker l(delay_lock);
+       delay_queue.push_back(make_pair(release, m));
+       delay_cond.Signal();
+      }
+      void discard() {
+       Mutex::Locker l(delay_lock);
+       while (!delay_queue.empty()) {
+         delay_queue.front().second->put();
+         delay_queue.pop_front();
+       }
+      }
+      void stop() {
+       delay_lock.Lock();
+       stop_delayed_delivery = true;
+       delay_cond.Signal();
+       delay_lock.Unlock();
+      }
+    } *delay_thread;
     friend class DelayedDelivery;
 
-    DelayedDelivery *dispatch_thread;
-    // TODO: clean up the delay_queue better on shutdown
-    std::deque< Message * > *delay_queue;
-    std::deque< utime_t > *delay_until;
-    Mutex *delay_lock;
-    Cond *delay_cond;
-    bool stop_delayed_delivery;
-
   public:
     Pipe(SimpleMessenger *r, int st, Connection *con);
     ~Pipe();
@@ -234,12 +255,9 @@ class DispatchQueue;
         writer_thread.join();
       if (reader_thread.is_started())
         reader_thread.join();
-      if (dispatch_thread && dispatch_thread->is_started()) {
-       delay_lock->Lock();
-       stop_delayed_delivery = true;
-       delay_cond->Signal();
-       delay_lock->Unlock();
-       dispatch_thread->join();
+      if (delay_thread) {
+       delay_thread->stop();
+       delay_thread->join();
       }
     }
     void stop();