]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Messenger,DispatchQueue: add ms_fast_dispatch mechanism
authorSamuel Just <sam.just@inktank.com>
Thu, 7 Nov 2013 21:46:08 +0000 (13:46 -0800)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:15 +0000 (15:29 -0700)
This adds a Dispatcher interface allowing the implementation
to accept ms_fast_dispatch calls for some messages without
going through the DispatchQueue. To support that, we also add
1) new synchronous notifications on connect and accept events
2) a fast_preprocess mechanism

Signed-off-by: Samuel Just <sam.just@inktank.com>
Signed-off-by: Greg Farnum <greg@inktank.com>
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h
src/msg/Dispatcher.h
src/msg/Messenger.h
src/msg/Pipe.cc

index 9489a40490dd0b1faedca23bbd2e88c92c447488..d72d1cd1b16609b46244f9cdb9b9b1a9f8ef64e4 100644 (file)
@@ -60,17 +60,35 @@ void DispatchQueue::post_dispatch(Message *m, uint64_t msize)
   ldout(cct,20) << "done calling dispatch on " << m << dendl;
 }
 
+bool DispatchQueue::can_fast_dispatch(Message *m)
+{
+  return msgr->ms_can_fast_dispatch(m);
+}
+
+void DispatchQueue::fast_dispatch(Message *m)
+{
+  uint64_t msize = pre_dispatch(m);
+  msgr->ms_fast_dispatch(m);
+  post_dispatch(m, msize);
+}
+
+void DispatchQueue::fast_preprocess(Message *m)
+{
+  msgr->ms_fast_preprocess(m);
+}
+
 void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
 {
+
   Mutex::Locker l(lock);
   ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
   add_arrival(m);
   if (priority >= CEPH_MSG_PRIO_LOW) {
     mqueue.enqueue_strict(
-      id, priority, QueueItem(m));
+        id, priority, QueueItem(m));
   } else {
     mqueue.enqueue(
-      id, priority, m->get_cost(), QueueItem(m));
+        id, priority, m->get_cost(), QueueItem(m));
   }
   cond.Signal();
 }
index 5b0decdd984b832d933b3d9f91f063111f9fbf02..19bfe40edd4d707308d75f139915ca6d2763ca6e 100644 (file)
@@ -161,6 +161,9 @@ class DispatchQueue {
     cond.Signal();
   }
 
+  bool can_fast_dispatch(Message *m);
+  void fast_dispatch(Message *m);
+  void fast_preprocess(Message *m);
   void enqueue(Message *m, int priority, uint64_t id);
   void discard_queue(uint64_t id);
   uint64_t get_id() {
index 9f837cd6fdbc7fc1f0d65a1db03b9672613118ce..864e9e1ed2891209f4dd796d5845add3ca14e5ec 100644 (file)
@@ -30,18 +30,89 @@ public:
   }
   virtual ~Dispatcher() { }
 
-  // how i receive messages
+  /**
+   * The Messenger calls this function to query if you are capable
+   * of "fast dispatch"ing a message. Indicating that you can fast
+   * dispatch it requires that you:
+   * 1) Handle the Message quickly and without taking long-term contended
+   * locks. (This function is likely to be called in-line with message
+   * receipt.)
+   * 2) Be able to accept the Message even if you have not yet received
+   * an ms_handle_accept() notification for the Connection it is associated
+   * with, and even if you *have* called mark_down() or received an
+   * ms_handle_reset() (or similar) call on the Connection. You will
+   * not receive more than one dead "message" (and should generally be
+   * prepared for that circumstance anyway, since the normal dispatch can begin,
+   * then trigger Connection failure before it's percolated through your system).
+   * We provide ms_handle_fast_[connect|accept] calls if you need them, under
+   * similar speed and state constraints as fast_dispatch itself.
+   * 3) Be able to make a determination on fast_dispatch without relying
+   * on particular system state -- the ms_can_fast_dispatch() call might
+   * be called multiple times on a single message; the state might change between
+   * calling ms_can_fast_dispatch and ms_fast_dispatch; etc.
+   *
+   * @param m The message we want to fast dispatch.
+   * @returns True if the message can be fast dispatched; false otherwise.
+   */
+  virtual bool ms_can_fast_dispatch(Message *m) const { return false;}
+  /**
+   * This function determines if a dispatcher is included in the
+   * list of fast-dispatch capable Dispatchers.
+   * @returns True if the Dispatcher can handle any messages via
+   * fast dispatch; false otherwise.
+   */
+  virtual bool ms_can_fast_dispatch_any() const { return false; }
+  /**
+   * Perform a "fast dispatch" on a given message. See
+   * ms_can_fast_dispatch() for the requirements.
+   *
+   * @param m The Message to fast dispatch.
+   */
+  virtual void ms_fast_dispatch(Message *m) { assert(0); }
+  /**
+   * Let the Dispatcher preview a Message before it is dispatched. This
+   * function is called on *every* Message, prior to the fast/regular dispatch
+   * decision point, but it is only used on fast-dispatch capable systems. An
+   * implementation of ms_fast_preprocess must be essentially lock-free in the
+   * same way as the ms_fast_dispatch function is (in particular, ms_fast_preprocess
+   * may be called while the Messenger holds internal locks that prevent progress from
+   * other threads, so any locks it takes must be at the very bottom of the hierarchy).
+   * Messages are delivered in receipt order within a single Connection, but there are
+   * no guarantees across Connections. This makes it useful for some limited
+   * coordination between Messages which can be fast_dispatch'ed and those which must
+   * go through normal dispatch.
+   *
+   * @param m A message which has been received
+   */
+  virtual void ms_fast_preprocess(Message *m) {}
+  /**
+   * The Messenger calls this function to deliver a single message.
+   *
+   * @param m The message being delivered. You (the Dispatcher)
+   * are given a single reference count on it.
+   */
   virtual bool ms_dispatch(Message *m) = 0;
 
   /**
-   * This function will be called whenever a new Connection is made to the
-   * Messenger.
+   * This function will be called whenever a Connection is newly-created
+   * or reconnects in the Messenger.
    *
    * @param con The new Connection which has been established. You are not
    * granted a reference to it -- take one if you need one!
    */
   virtual void ms_handle_connect(Connection *con) { };
 
+  /**
+   * This function will be called synchronously whenever a Connection is
+   * newly-created or reconnects in the Messenger, if you support fast
+   * dispatch. It is guaranteed to be called before any messages are
+   * dispatched.
+   *
+   * @param con The new Connection which has been established. You are not
+   * granted a reference to it -- take one if you need one!
+   */
+  virtual void ms_handle_fast_connect(Connection *con) { };
+
   /**
    * Callback indicating we have accepted an incoming connection.
    *
@@ -49,6 +120,15 @@ public:
    */
   virtual void ms_handle_accept(Connection *con) { };
 
+  /**
+   * Callback indicating we have accepted an incoming connection, if you
+   * support fast dispatch. It is guaranteed to be called before any messages
+   * are dispatched.
+   *
+   * @param con The (new or existing) Connection associated with the session
+   */
+  virtual void ms_handle_fast_accept(Connection *con) { };
+
   /*
    * this indicates that the ordered+reliable delivery semantics have 
    * been violated.  Messages may have been lost due to a fault
index 42feaf227dfd84a69df793f88ebe43d55ce44aeb..46632718a1215eb01d0abc1a79ad991a9237ebf4 100644 (file)
@@ -39,6 +39,7 @@ class Timer;
 class Messenger {
 private:
   list<Dispatcher*> dispatchers;
+  list <Dispatcher*> fast_dispatchers;
 
 protected:
   /// the "name" of the local daemon. eg client.99
@@ -301,6 +302,8 @@ public:
   void add_dispatcher_head(Dispatcher *d) { 
     bool first = dispatchers.empty();
     dispatchers.push_front(d);
+    if (d->ms_can_fast_dispatch_any())
+      fast_dispatchers.push_front(d);
     if (first)
       ready();
   }
@@ -314,6 +317,8 @@ public:
   void add_dispatcher_tail(Dispatcher *d) { 
     bool first = dispatchers.empty();
     dispatchers.push_back(d);
+    if (d->ms_can_fast_dispatch_any())
+      fast_dispatchers.push_back(d);
     if (first)
       ready();
   }
@@ -571,6 +576,50 @@ protected:
    * @{
    */
 public:
+  /**
+   * Determine whether a message can be fast-dispatched. We will
+   * query each Dispatcher in sequence to determine if they are
+   * capable of handling a particular message via "fast dispatch".
+   *
+   * @param m The Message we are testing.
+   */
+  bool ms_can_fast_dispatch(Message *m) {
+    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+        p != fast_dispatchers.end();
+        ++p) {
+      if ((*p)->ms_can_fast_dispatch(m))
+       return true;
+    }
+    return false;
+  }
+
+  /**
+   * Deliver a single Message via "fast dispatch".
+   *
+   * @param m The Message we are fast dispatching. We take ownership
+   * of one reference to it.
+   */
+  void ms_fast_dispatch(Message *m) {
+    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+        p != fast_dispatchers.end();
+        ++p) {
+      if ((*p)->ms_can_fast_dispatch(m)) {
+       (*p)->ms_fast_dispatch(m);
+       return;
+      }
+    }
+    assert(0);
+  }
+  /**
+   *
+   */
+  void ms_fast_preprocess(Message *m) {
+    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+        p != fast_dispatchers.end();
+        ++p) {
+      (*p)->ms_fast_preprocess(m);
+    }
+  }
   /**
    *  Deliver a single Message. Send it to each Dispatcher
    *  in sequence until one of them handles it.
@@ -594,7 +643,8 @@ public:
   }
   /**
    * Notify each Dispatcher of a new Connection. Call
-   * this function whenever a new Connection is initiated.
+   * this function whenever a new Connection is initiated or
+   * reconnects.
    *
    * @param con Pointer to the new Connection.
    */
@@ -605,6 +655,20 @@ public:
       (*p)->ms_handle_connect(con);
   }
 
+  /**
+   * Notify each fast Dispatcher of a new Connection. Call
+   * this function whenever a new Connection is initiated or
+   * reconnects.
+   *
+   * @param con Pointer to the new Connection.
+   */
+  void ms_deliver_handle_fast_connect(Connection *con) {
+    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+         p != fast_dispatchers.end();
+         ++p)
+      (*p)->ms_handle_fast_connect(con);
+  }
+
   /**
    * Notify each Dispatcher of a new incomming Connection. Call
    * this function whenever a new Connection is accepted.
@@ -618,6 +682,19 @@ public:
       (*p)->ms_handle_accept(con);
   }
 
+  /**
+   * Notify each fast Dispatcher of a new incoming Connection. Call
+   * this function whenever a new Connection is accepted.
+   *
+   * @param con Pointer to the new Connection.
+   */
+  void ms_deliver_handle_fast_accept(Connection *con) {
+    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+         p != fast_dispatchers.end();
+         ++p)
+      (*p)->ms_handle_fast_accept(con);
+  }
+
   /**
    * Notify each Dispatcher of a Connection which may have lost
    * Messages. Call this function whenever you detect that a lossy Connection
index 0693c09f12211a04d63fe4f4d9cd1db89e32f328..fd2fbcef70ee46f40391ab7df4797dbe0803ccd2 100644 (file)
@@ -194,7 +194,13 @@ void Pipe::DelayedDelivery::flush()
   while (!delay_queue.empty()) {
     Message *m = delay_queue.front().second;
     delay_queue.pop_front();
-    pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+    if (pipe->in_q->can_fast_dispatch(m)) {
+      delay_lock.Unlock();
+      pipe->in_q->fast_dispatch(m);
+      delay_lock.Lock();
+    } else {
+      pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+    }
   }
 }
 
@@ -220,7 +226,13 @@ void *Pipe::DelayedDelivery::entry()
     }
     lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
     delay_queue.pop_front();
-    pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+    if (pipe->in_q->can_fast_dispatch(m)) {
+      delay_lock.Unlock();
+      pipe->in_q->fast_dispatch(m);
+      delay_lock.Lock();
+    } else {
+      pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+    }
   }
   lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
   return NULL;
@@ -662,6 +674,7 @@ int Pipe::accept()
 
   // notify
   msgr->dispatch_queue.queue_accept(connection_state.get());
+  msgr->ms_deliver_handle_fast_accept(connection_state.get());
 
   // ok!
   if (msgr->dispatch_queue.stop)
@@ -1107,6 +1120,7 @@ int Pipe::connect()
       }
 
       msgr->dispatch_queue.queue_connect(connection_state.get());
+      msgr->ms_deliver_handle_fast_connect(connection_state.get());
       
       if (!reader_running) {
        ldout(msgr->cct,20) << "connect starting reader" << dendl;
@@ -1514,19 +1528,26 @@ void Pipe::reader()
       ldout(msgr->cct,10) << "reader got message "
               << m->get_seq() << " " << m << " " << *m
               << dendl;
+      in_q->fast_preprocess(m);
 
       if (delay_thread) {
-       utime_t release;
-       if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
-         release = m->get_recv_stamp();
-         release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
-         lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
-       }
-       delay_thread->queue(release, m);
+        utime_t release;
+        if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+          release = m->get_recv_stamp();
+          release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+          lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+        }
+        delay_thread->queue(release, m);
       } else {
-       in_q->enqueue(m, m->get_priority(), conn_id);
+        if (in_q->can_fast_dispatch(m)) {
+          pipe_lock.Unlock();
+          in_q->fast_dispatch(m);
+          pipe_lock.Lock();
+        } else {
+          in_q->enqueue(m, m->get_priority(), conn_id);
+        }
       }
-    } 
+    }
     
     else if (tag == CEPH_MSGR_TAG_CLOSE) {
       ldout(msgr->cct,20) << "reader got CLOSE" << dendl;