]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
messenger: add the shell of a system to delay incoming Message delivery
authorGreg Farnum <greg@inktank.com>
Wed, 21 Nov 2012 18:54:06 +0000 (10:54 -0800)
committerSage Weil <sage@inktank.com>
Fri, 30 Nov 2012 00:09:44 +0000 (16:09 -0800)
When ms_inject_delay_type matches that of the incoming Connection,
the Pipe sets up a delay queue that it shuttles all Messages through.
This lets us check cleanup and some notification code but doesn't
actually generate any delays.

Signed-off-by: Greg Farnum <greg@inktank.com>
src/common/config_opts.h
src/msg/Pipe.cc
src/msg/Pipe.h

index 8699d789164bdebb15605f27bfad72062cf9d0e3..a067934c2f1f4828c7e05c281980902c0dddb2b3 100644 (file)
@@ -106,6 +106,9 @@ OPTION(ms_bind_port_max, OPT_INT, 7100)
 OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
 OPTION(ms_tcp_read_timeout, OPT_U64, 900)
 OPTION(ms_inject_socket_failures, OPT_U64, 0)
+OPTION(ms_inject_delay_type, OPT_STR, "")          // "osd mds mon client" allowed
+OPTION(ms_inject_delay_max, OPT_DOUBLE, 1)         // seconds
+OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
 
 OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id")
 OPTION(mon_initial_members, OPT_STR, "")    // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster
index e273ad49f2b38e5b1fca6035241b94ddce25d4b2..695be3b3e8e8c9f9791976b578fa4a547e09d0e5 100644 (file)
@@ -54,6 +54,8 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
 
 Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
   : reader_thread(this), writer_thread(this),
+    dispatch_thread(NULL), delay_queue(NULL),
+    delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true),
     msgr(r),
     conn_id(r->dispatch_queue.get_id()),
     sd(-1), port(0),
@@ -85,6 +87,15 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
   msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
   if (msgr->timeout == 0)
     msgr->timeout = -1;
+
+  if (msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
+      != string::npos) {
+    lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
+    dispatch_thread = new DelayedDelivery(this);
+    delay_queue = new std::deque< Message * >();
+    delay_lock = new Mutex("delay_lock");
+    delay_cond = new Cond();
+  }
 }
 
 Pipe::~Pipe()
@@ -94,6 +105,14 @@ Pipe::~Pipe()
   if (connection_state)
     connection_state->put();
   delete session_security;
+  if (dispatch_thread) {
+    delete dispatch_thread;
+    assert(delay_queue->empty());
+    delete delay_queue;
+    assert(!delay_lock->is_locked());
+    delete delay_lock;
+    delete delay_cond;
+  }
 }
 
 void Pipe::handle_ack(uint64_t seq)
@@ -125,6 +144,13 @@ void Pipe::start_reader()
   }
   reader_running = true;
   reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
+  if (dispatch_thread && stop_delayed_delivery) {
+    lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl;
+    delay_lock->Lock();
+    stop_delayed_delivery = false;
+    dispatch_thread->create();
+    delay_lock->Unlock();
+  }
 }
 
 void Pipe::start_writer()
@@ -150,10 +176,32 @@ void Pipe::join_reader()
 void Pipe::queue_received(Message *m, int priority)
 {
   assert(pipe_lock.is_locked());
+  if (delay_queue) {
+    lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl;
+    Mutex::Locker locker(*delay_lock);
+    delay_queue->push_back(m);
+    delay_cond->Signal();
+    return;
+  }
   in_q->enqueue(m, priority, conn_id);
 }
 
-
+void Pipe::delayed_delivery() {
+  Mutex::Locker locker(*delay_lock);
+  if (delay_queue->empty())
+    lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
+    delay_cond->Wait(*delay_lock);
+  while (!stop_delayed_delivery) {
+    Message *m = delay_queue->front();
+    lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delayed delivery" << dendl;
+    delay_queue->pop_front();
+    in_q->enqueue(m, m->get_priority(), conn_id);
+    if (delay_queue->empty()) {
+      lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl;
+      delay_cond->Wait(*delay_lock);
+    }
+  }
+}
 
 int Pipe::accept()
 {
@@ -1141,6 +1189,12 @@ void Pipe::stop()
   state = STATE_CLOSED;
   cond.Signal();
   shutdown_socket();
+  if (dispatch_thread) {
+    lsubdout(msgr->cct, ms, 1) << "signalling to stop delayed dispatch thread and clear out messages" << dendl;
+    Mutex::Locker locker(*delay_lock);
+    stop_delayed_delivery = true;
+    delay_cond->Signal();
+  }
 }
 
 
index cc92b2f6f8b9ae87743d14b6feff7fc2609e3c7e..648e2e87c9fe8ba77139fac6d33ba09d920ccf9f 100644 (file)
@@ -60,6 +60,29 @@ class DispatchQueue;
     } writer_thread;
     friend class Writer;
 
+    /**
+     * The DelayedDelivery is for injecting delays into Message delivery off
+     * the socket. It is only enabled if delays are requested, and if they
+     * are then it pulls Messages off the DelayQueue and puts them into the
+     * in_q (SimpleMessenger::dispatch_queue).
+     * Please note that this probably has issues with Pipe shutdown and
+     * replacement semantics. I've tried, but no guarantees.
+     */
+    class DelayedDelivery: public Thread {
+      Pipe *pipe;
+    public:
+      DelayedDelivery(Pipe *p) : pipe(p) {}
+      void *entry() { pipe->delayed_delivery(); return 0; }
+    };
+    friend class DelayedDelivery;
+
+    DelayedDelivery *dispatch_thread;
+    // TODO: clean up the delay_queue better on shutdown
+    std::deque< Message * > *delay_queue;
+    Mutex *delay_lock;
+    Cond *delay_cond;
+    bool stop_delayed_delivery;
+
   public:
     Pipe(SimpleMessenger *r, int st, Connection *con);
     ~Pipe();
@@ -185,6 +208,8 @@ class DispatchQueue;
       queue_received(m, m->get_priority());
     }
 
+    void delayed_delivery();
+
     __u32 get_out_seq() { return out_seq; }
 
     bool is_queued() { return !out_q.empty() || keepalive; }
@@ -208,6 +233,13 @@ class DispatchQueue;
         writer_thread.join();
       if (reader_thread.is_started())
         reader_thread.join();
+      if (dispatch_thread && dispatch_thread->is_started()) {
+       delay_lock->Lock();
+       stop_delayed_delivery = true;
+       delay_cond->Signal();
+       delay_lock->Unlock();
+       dispatch_thread->join();
+      }
     }
     void stop();