]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/DaemonServer: handle MMgrReports in parallel
authorJohn Spray <john.spray@redhat.com>
Mon, 28 Aug 2017 11:29:36 +0000 (07:29 -0400)
committerJohn Spray <john.spray@redhat.com>
Wed, 1 Nov 2017 23:03:23 +0000 (23:03 +0000)
The DaemonStateIndex locking is sufficient to make all
the report processing safe: holding DaemonServer::lock
through all ms_dispatch was unnecessarily serializing
dispatch.

Signed-off-by: John Spray <john.spray@redhat.com>
(cherry picked from commit 64af9d3da0fceff9ad0ff668f60d272c46912f34)

src/mgr/DaemonServer.cc

index 3602881ee2525b7ffd85827548d4bf2854161cc7..562c5a21db300ad9882f1fdf7798251273aeae85 100644 (file)
@@ -252,8 +252,9 @@ bool DaemonServer::ms_handle_refused(Connection *con)
 
 bool DaemonServer::ms_dispatch(Message *m)
 {
-  Mutex::Locker l(lock);
-
+  // Note that we do *not* take ::lock here, in order to avoid
+  // serializing all message handling.  It's up to each handler
+  // to take whatever locks it needs.
   switch (m->get_type()) {
     case MSG_PGSTATS:
       cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
@@ -274,6 +275,8 @@ bool DaemonServer::ms_dispatch(Message *m)
 
 void DaemonServer::maybe_ready(int32_t osd_id)
 {
+  Mutex::Locker l(lock);
+
   if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
     dout(4) << "initial report from osd " << osd_id << dendl;
     reported_osds.insert(osd_id);
@@ -313,6 +316,8 @@ void DaemonServer::shutdown()
 
 bool DaemonServer::handle_open(MMgrOpen *m)
 {
+  Mutex::Locker l(lock);
+
   DaemonKey key;
   if (!m->service_name.empty()) {
     key.first = m->service_name;
@@ -399,6 +404,7 @@ bool DaemonServer::handle_report(MMgrReport *m)
     return true;
   }
 
+  // Look up the DaemonState
   DaemonStatePtr daemon;
   if (daemon_state.exists(key)) {
     dout(20) << "updating existing DaemonState for " << key << dendl;
@@ -413,12 +419,26 @@ bool DaemonServer::handle_report(MMgrReport *m)
     // daemons without sessions, and ensuring that session open
     // always contains metadata.
   }
+
+  // Update the DaemonState
   assert(daemon != nullptr);
-  auto &daemon_counters = daemon->perf_counters;
   {
     Mutex::Locker l(daemon->lock);
+    auto &daemon_counters = daemon->perf_counters;
     daemon_counters.update(m);
+
+    if (daemon->service_daemon) {
+      utime_t now = ceph_clock_now();
+      if (m->daemon_status) {
+        daemon->service_status = *m->daemon_status;
+        daemon->service_status_stamp = now;
+      }
+      daemon->last_service_beacon = now;
+    } else if (m->daemon_status) {
+      derr << "got status from non-daemon " << key << dendl;
+    }
   }
+
   // if there are any schema updates, notify the python modules
   if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
     ostringstream oss;
@@ -426,17 +446,6 @@ bool DaemonServer::handle_report(MMgrReport *m)
     py_modules.notify_all("perf_schema_update", oss.str());
   }
 
-  if (daemon->service_daemon) {
-    utime_t now = ceph_clock_now();
-    if (m->daemon_status) {
-      daemon->service_status = *m->daemon_status;
-      daemon->service_status_stamp = now;
-    }
-    daemon->last_service_beacon = now;
-  } else if (m->daemon_status) {
-    derr << "got status from non-daemon " << key << dendl;
-  }
-
   m->put();
   return true;
 }
@@ -511,6 +520,7 @@ bool DaemonServer::_allowed_command(
 
 bool DaemonServer::handle_command(MCommand *m)
 {
+  Mutex::Locker l(lock);
   int r = 0;
   std::stringstream ss;
   std::string prefix;