]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/DaemonServer: handle MMgrReports in parallel
authorJohn Spray <john.spray@redhat.com>
Mon, 28 Aug 2017 11:29:36 +0000 (07:29 -0400)
committerJohn Spray <john.spray@redhat.com>
Mon, 18 Sep 2017 09:44:40 +0000 (05:44 -0400)
The DaemonStateIndex locking is sufficient to make all
the report processing safe: holding DaemonServer::lock
through all ms_dispatch was unnecessarily serializing
dispatch.

Signed-off-by: John Spray <john.spray@redhat.com>
src/mgr/DaemonServer.cc

index 5ff7bbb9a8963462127275ae27f3ad3483a32537..77a5cedab8fbf522803b881a4223d1b073b5fa1e 100644 (file)
@@ -253,8 +253,9 @@ bool DaemonServer::ms_handle_refused(Connection *con)
 
 bool DaemonServer::ms_dispatch(Message *m)
 {
-  Mutex::Locker l(lock);
-
+  // Note that we do *not* take ::lock here, in order to avoid
+  // serializing all message handling.  It's up to each handler
+  // to take whatever locks it needs.
   switch (m->get_type()) {
     case MSG_PGSTATS:
       cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
@@ -275,6 +276,8 @@ bool DaemonServer::ms_dispatch(Message *m)
 
 void DaemonServer::maybe_ready(int32_t osd_id)
 {
+  Mutex::Locker l(lock);
+
   if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
     dout(4) << "initial report from osd " << osd_id << dendl;
     reported_osds.insert(osd_id);
@@ -314,6 +317,8 @@ void DaemonServer::shutdown()
 
 bool DaemonServer::handle_open(MMgrOpen *m)
 {
+  Mutex::Locker l(lock);
+
   DaemonKey key;
   if (!m->service_name.empty()) {
     key.first = m->service_name;
@@ -400,6 +405,7 @@ bool DaemonServer::handle_report(MMgrReport *m)
     return true;
   }
 
+  // Look up the DaemonState
   DaemonStatePtr daemon;
   if (daemon_state.exists(key)) {
     dout(20) << "updating existing DaemonState for " << key << dendl;
@@ -414,12 +420,26 @@ bool DaemonServer::handle_report(MMgrReport *m)
     // daemons without sessions, and ensuring that session open
     // always contains metadata.
   }
+
+  // Update the DaemonState
   assert(daemon != nullptr);
-  auto &daemon_counters = daemon->perf_counters;
   {
     Mutex::Locker l(daemon->lock);
+    auto &daemon_counters = daemon->perf_counters;
     daemon_counters.update(m);
+
+    if (daemon->service_daemon) {
+      utime_t now = ceph_clock_now();
+      if (m->daemon_status) {
+        daemon->service_status = *m->daemon_status;
+        daemon->service_status_stamp = now;
+      }
+      daemon->last_service_beacon = now;
+    } else if (m->daemon_status) {
+      derr << "got status from non-daemon " << key << dendl;
+    }
   }
+
   // if there are any schema updates, notify the python modules
   if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
     ostringstream oss;
@@ -427,17 +447,6 @@ bool DaemonServer::handle_report(MMgrReport *m)
     py_modules.notify_all("perf_schema_update", oss.str());
   }
 
-  if (daemon->service_daemon) {
-    utime_t now = ceph_clock_now();
-    if (m->daemon_status) {
-      daemon->service_status = *m->daemon_status;
-      daemon->service_status_stamp = now;
-    }
-    daemon->last_service_beacon = now;
-  } else if (m->daemon_status) {
-    derr << "got status from non-daemon " << key << dendl;
-  }
-
   m->put();
   return true;
 }
@@ -512,6 +521,7 @@ bool DaemonServer::_allowed_command(
 
 bool DaemonServer::handle_command(MCommand *m)
 {
+  Mutex::Locker l(lock);
   int r = 0;
   std::stringstream ss;
   std::string prefix;