From 946d1541bd2b14d6f8abbaa201c3521dd117f01b Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 28 Aug 2017 07:29:36 -0400 Subject: [PATCH] mgr/DaemonServer: handle MMgrReports in parallel The DaemonStateIndex locking is sufficient to make all the report processing safe: holding DaemonServer::lock through all ms_dispatch was unnecessarily serializing dispatch. Signed-off-by: John Spray (cherry picked from commit 64af9d3da0fceff9ad0ff668f60d272c46912f34) --- src/mgr/DaemonServer.cc | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index 3602881ee2525..562c5a21db300 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -252,8 +252,9 @@ bool DaemonServer::ms_handle_refused(Connection *con) bool DaemonServer::ms_dispatch(Message *m) { - Mutex::Locker l(lock); - + // Note that we do *not* take ::lock here, in order to avoid + // serializing all message handling. It's up to each handler + // to take whatever locks it needs. switch (m->get_type()) { case MSG_PGSTATS: cluster_state.ingest_pgstats(static_cast(m)); @@ -274,6 +275,8 @@ bool DaemonServer::ms_dispatch(Message *m) void DaemonServer::maybe_ready(int32_t osd_id) { + Mutex::Locker l(lock); + if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) { dout(4) << "initial report from osd " << osd_id << dendl; reported_osds.insert(osd_id); @@ -313,6 +316,8 @@ void DaemonServer::shutdown() bool DaemonServer::handle_open(MMgrOpen *m) { + Mutex::Locker l(lock); + DaemonKey key; if (!m->service_name.empty()) { key.first = m->service_name; @@ -399,6 +404,7 @@ bool DaemonServer::handle_report(MMgrReport *m) return true; } + // Look up the DaemonState DaemonStatePtr daemon; if (daemon_state.exists(key)) { dout(20) << "updating existing DaemonState for " << key << dendl; @@ -413,12 +419,26 @@ bool DaemonServer::handle_report(MMgrReport *m) // daemons without sessions, and ensuring that session open // always contains metadata. } + + // Update the DaemonState assert(daemon != nullptr); - auto &daemon_counters = daemon->perf_counters; { Mutex::Locker l(daemon->lock); + auto &daemon_counters = daemon->perf_counters; daemon_counters.update(m); + + if (daemon->service_daemon) { + utime_t now = ceph_clock_now(); + if (m->daemon_status) { + daemon->service_status = *m->daemon_status; + daemon->service_status_stamp = now; + } + daemon->last_service_beacon = now; + } else if (m->daemon_status) { + derr << "got status from non-daemon " << key << dendl; + } } + // if there are any schema updates, notify the python modules if (!m->declare_types.empty() || !m->undeclare_types.empty()) { ostringstream oss; @@ -426,17 +446,6 @@ bool DaemonServer::handle_report(MMgrReport *m) py_modules.notify_all("perf_schema_update", oss.str()); } - if (daemon->service_daemon) { - utime_t now = ceph_clock_now(); - if (m->daemon_status) { - daemon->service_status = *m->daemon_status; - daemon->service_status_stamp = now; - } - daemon->last_service_beacon = now; - } else if (m->daemon_status) { - derr << "got status from non-daemon " << key << dendl; - } - m->put(); return true; } @@ -511,6 +520,7 @@ bool DaemonServer::_allowed_command( bool DaemonServer::handle_command(MCommand *m) { + Mutex::Locker l(lock); int r = 0; std::stringstream ss; std::string prefix; -- 2.39.5