From: John Spray Date: Mon, 28 Aug 2017 11:29:36 +0000 (-0400) Subject: mgr/DaemonServer: handle MMgrReports in parallel X-Git-Tag: v13.0.1~835^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=64af9d3da0fceff9ad0ff668f60d272c46912f34;p=ceph-ci.git mgr/DaemonServer: handle MMgrReports in parallel The DaemonStateIndex locking is sufficient to make all the report processing safe: holding DaemonServer::lock through all ms_dispatch was unnecessarily serializing dispatch. Signed-off-by: John Spray --- diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index 5ff7bbb9a89..77a5cedab8f 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -253,8 +253,9 @@ bool DaemonServer::ms_handle_refused(Connection *con) bool DaemonServer::ms_dispatch(Message *m) { - Mutex::Locker l(lock); - + // Note that we do *not* take ::lock here, in order to avoid + // serializing all message handling. It's up to each handler + // to take whatever locks it needs. switch (m->get_type()) { case MSG_PGSTATS: cluster_state.ingest_pgstats(static_cast(m)); @@ -275,6 +276,8 @@ bool DaemonServer::ms_dispatch(Message *m) void DaemonServer::maybe_ready(int32_t osd_id) { + Mutex::Locker l(lock); + if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) { dout(4) << "initial report from osd " << osd_id << dendl; reported_osds.insert(osd_id); @@ -314,6 +317,8 @@ void DaemonServer::shutdown() bool DaemonServer::handle_open(MMgrOpen *m) { + Mutex::Locker l(lock); + DaemonKey key; if (!m->service_name.empty()) { key.first = m->service_name; @@ -400,6 +405,7 @@ bool DaemonServer::handle_report(MMgrReport *m) return true; } + // Look up the DaemonState DaemonStatePtr daemon; if (daemon_state.exists(key)) { dout(20) << "updating existing DaemonState for " << key << dendl; @@ -414,12 +420,26 @@ bool DaemonServer::handle_report(MMgrReport *m) // daemons without sessions, and ensuring that session open // always contains metadata. } + + // Update the DaemonState assert(daemon != nullptr); - auto &daemon_counters = daemon->perf_counters; { Mutex::Locker l(daemon->lock); + auto &daemon_counters = daemon->perf_counters; daemon_counters.update(m); + + if (daemon->service_daemon) { + utime_t now = ceph_clock_now(); + if (m->daemon_status) { + daemon->service_status = *m->daemon_status; + daemon->service_status_stamp = now; + } + daemon->last_service_beacon = now; + } else if (m->daemon_status) { + derr << "got status from non-daemon " << key << dendl; + } } + // if there are any schema updates, notify the python modules if (!m->declare_types.empty() || !m->undeclare_types.empty()) { ostringstream oss; @@ -427,17 +447,6 @@ bool DaemonServer::handle_report(MMgrReport *m) py_modules.notify_all("perf_schema_update", oss.str()); } - if (daemon->service_daemon) { - utime_t now = ceph_clock_now(); - if (m->daemon_status) { - daemon->service_status = *m->daemon_status; - daemon->service_status_stamp = now; - } - daemon->last_service_beacon = now; - } else if (m->daemon_status) { - derr << "got status from non-daemon " << key << dendl; - } - m->put(); return true; } @@ -512,6 +521,7 @@ bool DaemonServer::_allowed_command( bool DaemonServer::handle_command(MCommand *m) { + Mutex::Locker l(lock); int r = 0; std::stringstream ss; std::string prefix;