bool DaemonServer::ms_dispatch(Message *m)
{
- Mutex::Locker l(lock);
-
+ // Note that we do *not* take ::lock here, in order to avoid
+ // serializing all message handling. It's up to each handler
+ // to take whatever locks it needs.
switch (m->get_type()) {
case MSG_PGSTATS:
cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
void DaemonServer::maybe_ready(int32_t osd_id)
{
+ Mutex::Locker l(lock);
+
if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
dout(4) << "initial report from osd " << osd_id << dendl;
reported_osds.insert(osd_id);
bool DaemonServer::handle_open(MMgrOpen *m)
{
+ Mutex::Locker l(lock);
+
DaemonKey key;
if (!m->service_name.empty()) {
key.first = m->service_name;
return true;
}
+ // Look up the DaemonState
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
dout(20) << "updating existing DaemonState for " << key << dendl;
// daemons without sessions, and ensuring that session open
// always contains metadata.
}
+
+ // Update the DaemonState
assert(daemon != nullptr);
- auto &daemon_counters = daemon->perf_counters;
{
Mutex::Locker l(daemon->lock);
+ auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(m);
+
+ if (daemon->service_daemon) {
+ utime_t now = ceph_clock_now();
+ if (m->daemon_status) {
+ daemon->service_status = *m->daemon_status;
+ daemon->service_status_stamp = now;
+ }
+ daemon->last_service_beacon = now;
+ } else if (m->daemon_status) {
+ derr << "got status from non-daemon " << key << dendl;
+ }
}
+
// if there are any schema updates, notify the python modules
if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
ostringstream oss;
py_modules.notify_all("perf_schema_update", oss.str());
}
- if (daemon->service_daemon) {
- utime_t now = ceph_clock_now();
- if (m->daemon_status) {
- daemon->service_status = *m->daemon_status;
- daemon->service_status_stamp = now;
- }
- daemon->last_service_beacon = now;
- } else if (m->daemon_status) {
- derr << "got status from non-daemon " << key << dendl;
- }
-
m->put();
return true;
}
bool DaemonServer::handle_command(MCommand *m)
{
+ Mutex::Locker l(lock);
int r = 0;
std::stringstream ss;
std::string prefix;