]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: enable fast_dispatch on local connections
authorGreg Farnum <greg@inktank.com>
Fri, 4 Apr 2014 20:55:54 +0000 (13:55 -0700)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:15 +0000 (15:29 -0700)
We do two things:
1) Call ms_handle_fast_connect() when setting up the local connection, so
the Dispatcher can set up any state it needs
2)Move local_delivery into a separate thread from the sender's. fast_dispatch
makes this entirely necessary since otherwise we're dipping back in to the
Dispatcher while holding whatever locks it held when it sent the Message.

Implementation starts with a thread and a list of messages to process and
proceeds as you'd expect from that.

Signed-off-by: Greg Farnum <greg@inktank.com>
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h
src/msg/SimpleMessenger.cc

index d72d1cd1b16609b46244f9cdb9b9b1a9f8ef64e4..12c48465ed3f633088af7adf060dcfac5212e005 100644 (file)
@@ -95,18 +95,48 @@ void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
 
 void DispatchQueue::local_delivery(Message *m, int priority)
 {
-  Mutex::Locker l(lock);
   m->set_connection(msgr->local_connection.get());
   m->set_recv_stamp(ceph_clock_now(msgr->cct));
-  add_arrival(m);
-  if (priority >= CEPH_MSG_PRIO_LOW) {
-    mqueue.enqueue_strict(
-      0, priority, QueueItem(m));
-  } else {
-    mqueue.enqueue(
-      0, priority, m->get_cost(), QueueItem(m));
+  Mutex::Locker l(local_delivery_lock);
+  if (local_messages.empty())
+    local_delivery_cond.Signal();
+  local_messages.push_back(make_pair(m, priority));
+  return;
+}
+
+void DispatchQueue::run_local_delivery()
+{
+  local_delivery_lock.Lock();
+  while (true) {
+    if (stop_local_delivery)
+      break;
+    if (local_messages.empty()) {
+      local_delivery_cond.Wait(local_delivery_lock);
+      continue;
+    }
+    pair<Message *, int> mp = local_messages.front();
+    local_messages.pop_front();
+    local_delivery_lock.Unlock();
+    Message *m = mp.first;
+    int priority = mp.second;
+    fast_preprocess(m);
+    if (can_fast_dispatch(m)) {
+      fast_dispatch(m);
+    } else {
+      Mutex::Locker l(lock);
+      add_arrival(m);
+      if (priority >= CEPH_MSG_PRIO_LOW) {
+        mqueue.enqueue_strict(
+            0, priority, QueueItem(m));
+      } else {
+        mqueue.enqueue(
+            0, priority, m->get_cost(), QueueItem(m));
+      }
+      cond.Signal();
+    }
+    local_delivery_lock.Lock();
   }
-  cond.Signal();
+  local_delivery_lock.Unlock();
 }
 
 /*
@@ -188,15 +218,23 @@ void DispatchQueue::start()
   assert(!stop);
   assert(!dispatch_thread.is_started());
   dispatch_thread.create();
+  local_delivery_thread.create();
 }
 
 void DispatchQueue::wait()
 {
+  local_delivery_thread.join();
   dispatch_thread.join();
 }
 
 void DispatchQueue::shutdown()
 {
+  // stop my local delivery thread
+  local_delivery_lock.Lock();
+  stop_local_delivery = true;
+  local_delivery_cond.Signal();
+  local_delivery_lock.Unlock();
+
   // stop my dispatch thread
   lock.Lock();
   stop = true;
index 19bfe40edd4d707308d75f139915ca6d2763ca6e..5fe17dcf5936723b36c24919ed35ffeaafcfa7d4 100644 (file)
@@ -106,12 +106,27 @@ class DispatchQueue {
     }
   } dispatch_thread;
 
+  Mutex local_delivery_lock;
+  Cond local_delivery_cond;
+  bool stop_local_delivery;
+  list<pair<Message *, int> > local_messages;
+  class LocalDeliveryThread : public Thread {
+    DispatchQueue *dq;
+  public:
+    LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
+    void *entry() {
+      dq->run_local_delivery();
+      return 0;
+    }
+  } local_delivery_thread;
+
   uint64_t pre_dispatch(Message *m);
   void post_dispatch(Message *m, uint64_t msize);
 
   public:
   bool stop;
   void local_delivery(Message *m, int priority);
+  void run_local_delivery();
 
   double get_max_age(utime_t now);
 
@@ -183,6 +198,9 @@ class DispatchQueue {
             cct->_conf->ms_pq_min_cost),
       next_pipe_id(1),
       dispatch_thread(this),
+      local_delivery_lock("SimpleMessenger::DispatchQueue::local_delivery_lock"),
+      stop_local_delivery(false),
+      local_delivery_thread(this),
       stop(false)
     {}
 };
index 2070fe591240276bac16c2c51bc0e89627d3db83..b612fcfe0e6285445cd645f91ffc5bab1c94ff24 100644 (file)
@@ -731,4 +731,5 @@ void SimpleMessenger::init_local_connection()
 {
   local_connection->peer_addr = my_inst.addr;
   local_connection->peer_type = my_type;
+  ms_deliver_handle_fast_connect(local_connection.get());
 }