]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: add thread to progress queues outside dispatch
authorJohn Spray <john.spray@redhat.com>
Wed, 27 Aug 2014 21:36:10 +0000 (22:36 +0100)
committerJohn Spray <john.spray@redhat.com>
Tue, 2 Sep 2014 13:06:25 +0000 (14:06 +0100)
This speeds up processing of queued waiters.

Fixes: #9252
Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/Beacon.cc
src/mds/MDS.cc
src/mds/MDS.h

index 83a7efd1b1a8cb3fb88908e0f72b2ed9a8fbf808..7ccbe0e2aa4ce9212e333a558481473b3ec17cb7 100644 (file)
@@ -79,10 +79,9 @@ bool Beacon::ms_dispatch(Message *m)
     if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
       handle_mds_beacon(static_cast<MMDSBeacon*>(m));
     }
+    return true;
   }
 
-  // Let message fall through to MDS so that we get the execution of
-  // his finished_queue and waiting_for_nolaggy stuff.
   return false;
 }
 
index 53a8cfd42baca0526a0d2283ad86b4d4b907e2c8..2c286db0d6ab242126979cf547902af710d8bf98 100644 (file)
@@ -104,7 +104,10 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
   log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
   op_tracker(cct, m->cct->_conf->mds_enable_op_tracker),
   finisher(cct),
-  sessionmap(this), asok_hook(NULL) {
+  sessionmap(this),
+  progress_thread(this),
+  asok_hook(NULL)
+{
 
   hb = cct->get_heartbeat_map()->add_worker("MDS");
 
@@ -683,6 +686,9 @@ int MDS::init(MDSMap::DaemonState wanted_state)
   // schedule tick
   reset_tick();
 
+  // Start handler for finished_queue
+  progress_thread.create();
+
   create_logger();
   set_up_admin_socket();
   g_conf->add_observer(this);
@@ -714,6 +720,10 @@ void MDS::tick()
   if (beacon.is_laggy()) {
     dout(5) << "tick bailing out since we seem laggy" << dendl;
     return;
+  } else {
+    // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
+    // messages to progress.
+    progress_thread.signal();
   }
 
   // make sure mds log flushes, trims periodically
@@ -1756,6 +1766,8 @@ void MDS::suicide()
 
   op_tracker.on_shutdown();
 
+  progress_thread.shutdown();
+
   // shut down messenger
   messenger->shutdown();
 
@@ -1876,13 +1888,6 @@ bool MDS::handle_core_message(Message *m)
     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS);
     handle_mds_map(static_cast<MMDSMap*>(m));
     break;
-  case MSG_MDS_BEACON:
-    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
-    // no-op, Beacon handles this on our behalf but we listen for the
-    // message to get the side effect of handling finished_queue and
-    // waiting_for_nolaggy at end of MDS dispatch.
-    m->put();
-    break;
 
     // misc
   case MSG_MON_COMMAND:
@@ -2002,33 +2007,16 @@ bool MDS::is_stale_message(Message *m)
   return false;
 }
 
-/* If this function returns true, it has put the message. If it returns false,
- * it has not put the message. */
-bool MDS::_dispatch(Message *m)
+/**
+ * Advance finished_queue and waiting_for_nolaggy.
+ *
+ * Usually drain both queues, but may not drain waiting_for_nolaggy
+ * if beacon is currently laggy.
+ */
+void MDS::_advance_queues()
 {
-  if (is_stale_message(m)) {
-    m->put();
-    return true;
-  }
+  assert(mds_lock.is_locked_by_me());
 
-  // core
-  if (!handle_core_message(m)) {
-    if (beacon.is_laggy()) {
-      dout(10) << " laggy, deferring " << *m << dendl;
-      waiting_for_nolaggy.push_back(m);
-    } else {
-      if (!handle_deferrable_message(m)) {
-       dout(0) << "unrecognized message " << *m << dendl;
-       m->put();
-       return false;
-      }
-    }
-  }
-
-  if (dispatch_depth > 1)
-    return true;
-
-  // finish any triggered contexts
   while (!finished_queue.empty()) {
     dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
     dout(10) << finished_queue << dendl;
@@ -2048,10 +2036,9 @@ bool MDS::_dispatch(Message *m)
   }
 
   while (!waiting_for_nolaggy.empty()) {
-
     // stop if we're laggy now!
     if (beacon.is_laggy())
-      return true;
+      break;
 
     Message *old = waiting_for_nolaggy.front();
     waiting_for_nolaggy.pop_front();
@@ -2065,6 +2052,42 @@ bool MDS::_dispatch(Message *m)
 
     heartbeat_reset();
   }
+}
+
+/* If this function returns true, it has put the message. If it returns false,
+ * it has not put the message. */
+bool MDS::_dispatch(Message *m)
+{
+  if (is_stale_message(m)) {
+    m->put();
+    return true;
+  }
+
+  // core
+  if (!handle_core_message(m)) {
+    if (beacon.is_laggy()) {
+      dout(10) << " laggy, deferring " << *m << dendl;
+      waiting_for_nolaggy.push_back(m);
+    } else {
+      if (!handle_deferrable_message(m)) {
+       dout(0) << "unrecognized message " << *m << dendl;
+       m->put();
+       return false;
+      }
+    }
+  }
+
+  if (dispatch_depth > 1)
+    return true;
+
+  // finish any triggered contexts
+  _advance_queues();
+
+  if (beacon.is_laggy()) {
+    // We've gone laggy during dispatch, don't do any
+    // more housekeeping
+    return true;
+  }
 
   // done with all client replayed requests?
   if (is_clientreplay() &&
@@ -2358,3 +2381,33 @@ void MDS::heartbeat_reset()
   cct->get_heartbeat_map()->reset_timeout(hb, g_conf->mds_beacon_grace, 0);
 }
 
+
+void *MDS::ProgressThread::entry()
+{
+  Mutex::Locker l(mds->mds_lock);
+  while (true) {
+    while (!stopping && (mds->finished_queue.empty() && mds->waiting_for_nolaggy.empty())) {
+      cond.Wait(mds->mds_lock);
+    }
+
+    if (stopping) {
+      break;
+    }
+
+    mds->_advance_queues();
+  }
+
+  return NULL;
+}
+
+
+void MDS::ProgressThread::shutdown()
+{
+  assert(mds->mds_lock.is_locked_by_me());
+
+  stopping = true;
+  cond.Signal();
+  mds->mds_lock.Unlock();
+  join();
+  mds->mds_lock.Lock();
+}
index 6a8bd795bce31ccdb01695a9b0031841339d5658..b77f60a29fd5f3776e5b983ba5a7e3f6a8c9b93a 100644 (file)
@@ -263,13 +263,18 @@ class MDS : public Dispatcher, public md_config_obs_t {
     
 
   // -- waiters --
+private:
   list<MDSInternalContextBase*> finished_queue;
+  void _advance_queues();
+public:
 
   void queue_waiter(MDSInternalContextBase *c) {
     finished_queue.push_back(c);
+    progress_thread.signal();
   }
   void queue_waiters(list<MDSInternalContextBase*>& ls) {
     finished_queue.splice( finished_queue.end(), ls );
+    progress_thread.signal();
   }
   bool queue_one_replay() {
     if (replay_queue.empty())
@@ -319,6 +324,19 @@ class MDS : public Dispatcher, public md_config_obs_t {
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con);
 
+private:
+  class ProgressThread : public Thread {
+    MDS *mds;
+    bool stopping;
+    Cond cond;
+  public:
+    ProgressThread(MDS *mds_) : mds(mds_), stopping(false) {}
+    void * entry(); 
+    void shutdown();
+    void signal() {cond.Signal();}
+  } progress_thread;
+  void _progress_thread();
+
  public:
   MDS(const std::string &n, Messenger *m, MonClient *mc);
   ~MDS();