#undef dout_prefix
#define dout_prefix *_dout << "mgr.server " << __func__ << " "
-
+namespace {
+ template <typename Map>
+ bool map_compare(Map const &lhs, Map const &rhs) {
+ return lhs.size() == rhs.size()
+ && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
+ [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
+ }
+}
DaemonServer::DaemonServer(MonClient *monc_,
Finisher &finisher_,
std::lock_guard l(daemon->lock);
daemon->perf_counters.clear();
+ daemon->service_daemon = m->service_daemon;
if (m->service_daemon) {
daemon->service_status = m->daemon_status;
utime_t now = ceph_clock_now();
auto d = pending_service_map.get_daemon(m->service_name,
m->daemon_name);
- if (d->gid != (uint64_t)m->get_source().num()) {
+ if (!d->gid || d->gid != (uint64_t)m->get_source().num()) {
dout(10) << "registering " << key << " in pending_service_map" << dendl;
d->gid = m->get_source().num();
d->addr = m->get_source_addr();
return true;
}
- // Look up the DaemonState
- DaemonStatePtr daemon;
- if (daemon_state.exists(key)) {
- dout(20) << "updating existing DaemonState for " << key << dendl;
- daemon = daemon_state.get(key);
- } else {
- // we don't know the hostname at this stage, reject MMgrReport here.
- dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
- << dendl;
- // issue metadata request in background
- if (!daemon_state.is_updating(key) &&
- (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
+ {
+ lock.lock();
+
+ DaemonStatePtr daemon;
+ // Look up the DaemonState
+ if (daemon_state.exists(key)) {
+ dout(20) << "updating existing DaemonState for " << key << dendl;
+ daemon = daemon_state.get(key);
+ } else {
+ lock.unlock();
- std::ostringstream oss;
- auto c = new MetadataUpdate(daemon_state, key);
- if (key.first == "osd") {
- oss << "{\"prefix\": \"osd metadata\", \"id\": "
- << key.second<< "}";
+ // we don't know the hostname at this stage, reject MMgrReport here.
+ dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
+ << dendl;
+ // issue metadata request in background
+ if (!daemon_state.is_updating(key) &&
+ (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
- } else if (key.first == "mds") {
- c->set_default("addr", stringify(m->get_source_addr()));
- oss << "{\"prefix\": \"mds metadata\", \"who\": \""
- << key.second << "\"}";
+ std::ostringstream oss;
+ auto c = new MetadataUpdate(daemon_state, key);
+ if (key.first == "osd") {
+ oss << "{\"prefix\": \"osd metadata\", \"id\": "
+ << key.second<< "}";
+
+ } else if (key.first == "mds") {
+ c->set_default("addr", stringify(m->get_source_addr()));
+ oss << "{\"prefix\": \"mds metadata\", \"who\": \""
+ << key.second << "\"}";
- } else if (key.first == "mon") {
- oss << "{\"prefix\": \"mon metadata\", \"id\": \""
- << key.second << "\"}";
- } else {
- ceph_abort();
+ } else if (key.first == "mon") {
+ oss << "{\"prefix\": \"mon metadata\", \"id\": \""
+ << key.second << "\"}";
+ } else {
+ ceph_abort();
+ }
+
+ monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
}
- monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
- }
-
- {
- std::lock_guard l(lock);
+ lock.lock();
+
// kill session
auto priv = m->get_connection()->get_priv();
auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
- return false;
+ return false;
}
m->get_connection()->mark_down();
dout(10) << "unregistering osd." << session->osd_id
- << " session " << session << " con " << m->get_connection() << dendl;
+ << " session " << session << " con " << m->get_connection() << dendl;
if (osd_cons.find(session->osd_id) != osd_cons.end()) {
- osd_cons[session->osd_id].erase(m->get_connection());
- }
+ osd_cons[session->osd_id].erase(m->get_connection());
+ }
auto iter = daemon_connections.find(m->get_connection());
if (iter != daemon_connections.end()) {
- daemon_connections.erase(iter);
+ daemon_connections.erase(iter);
}
- }
-
- return false;
- }
-
- // Update the DaemonState
- ceph_assert(daemon != nullptr);
- {
- std::lock_guard l(daemon->lock);
- auto &daemon_counters = daemon->perf_counters;
- daemon_counters.update(m);
- auto p = m->config_bl.cbegin();
- if (p != m->config_bl.end()) {
- decode(daemon->config, p);
- decode(daemon->ignored_mon_config, p);
- dout(20) << " got config " << daemon->config
- << " ignored " << daemon->ignored_mon_config << dendl;
+ lock.unlock();
+ return false;
}
- if (daemon->service_daemon) {
- utime_t now = ceph_clock_now();
- if (m->daemon_status) {
- daemon->service_status = *m->daemon_status;
- daemon->service_status_stamp = now;
+ // Update the DaemonState
+ ceph_assert(daemon != nullptr);
+ {
+ std::lock_guard l(daemon->lock);
+ auto &daemon_counters = daemon->perf_counters;
+ daemon_counters.update(m);
+
+ auto p = m->config_bl.cbegin();
+ if (p != m->config_bl.end()) {
+ decode(daemon->config, p);
+ decode(daemon->ignored_mon_config, p);
+ dout(20) << " got config " << daemon->config
+ << " ignored " << daemon->ignored_mon_config << dendl;
+ }
+
+ if (daemon->service_daemon) {
+ utime_t now = ceph_clock_now();
+ if (m->daemon_status) {
+ daemon->service_status_stamp = now;
+ daemon->service_status = *m->daemon_status;
+ }
+ if (m->task_status && !map_compare(daemon->task_status, *m->task_status)) {
+ auto d = pending_service_map.get_daemon(m->service_name, m->daemon_name);
+ if (d->gid) {
+ daemon->task_status = *m->task_status;
+ d->task_status = *m->task_status;
+ pending_service_map_dirty = pending_service_map.epoch;
+ }
+ }
+ daemon->last_service_beacon = now;
+ } else if (m->daemon_status) {
+ derr << "got status from non-daemon " << key << dendl;
+ }
+ if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
+ // only OSD and MON send health_checks to me now
+ daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
+ dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
+ << dendl;
}
- daemon->last_service_beacon = now;
- } else if (m->daemon_status) {
- derr << "got status from non-daemon " << key << dendl;
- }
- if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
- // only OSD and MON send health_checks to me now
- daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
- dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
- << dendl;
}
+
+ lock.unlock();
}
// if there are any schema updates, notify the python modules
void ServiceMap::Daemon::encode(bufferlist& bl, uint64_t features) const
{
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(gid, bl);
encode(addr, bl, features);
encode(start_epoch, bl);
encode(start_stamp, bl);
encode(metadata, bl);
+ encode(task_status, bl);
ENCODE_FINISH(bl);
}
void ServiceMap::Daemon::decode(bufferlist::const_iterator& p)
{
- DECODE_START(1, p);
+ DECODE_START(2, p);
decode(gid, p);
decode(addr, p);
decode(start_epoch, p);
decode(start_stamp, p);
decode(metadata, p);
+ if (struct_v >= 2) {
+ decode(task_status, p);
+ }
DECODE_FINISH(p);
}
{
f->dump_unsigned("start_epoch", start_epoch);
f->dump_stream("start_stamp") << start_stamp;
- f->dump_unsigned("gid", gid);
+ f->dump_unsigned("gid", *gid);
f->dump_string("addr", addr.get_legacy_str());
f->open_object_section("metadata");
for (auto& p : metadata) {
f->dump_string(p.first.c_str(), p.second);
}
f->close_section();
+ f->open_object_section("task_status");
+ for (auto& p : task_status) {
+ f->dump_string(p.first.c_str(), p.second);
+ }
+ f->close_section();
}
void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
ls.push_back(new Daemon);
ls.back()->gid = 222;
ls.back()->metadata["this"] = "that";
+ ls.back()->task_status["task1"] = "running";
}
// Service
#include "include/buffer.h"
#include "msg/msg_types.h"
+#include <boost/optional.hpp>
+
namespace ceph {
class Formatter;
}
struct ServiceMap {
struct Daemon {
- uint64_t gid = 0;
+ boost::optional<uint64_t> gid;
entity_addr_t addr;
epoch_t start_epoch = 0; ///< epoch first registered
utime_t start_stamp; ///< timestamp daemon started/registered
std::map<std::string,std::string> metadata; ///< static metadata
+ std::map<std::string,std::string> task_status; ///< running task status
void encode(bufferlist& bl, uint64_t features) const;
void decode(bufferlist::const_iterator& p);
return ss.str();
}
- void count_metadata(const string& field,
+ std::string get_task_summary(const std::string_view task_prefix) const {
+ // contruct a map similar to:
+ // {"service1 status" -> {"service1.0" -> "running"}}
+ // {"service2 status" -> {"service2.0" -> "idle"},
+ // {"service2.1" -> "running"}}
+ std::map<std::string, std::map<std::string, std::string>> by_task;
+ for (const auto &p : daemons) {
+ std::stringstream d;
+ d << task_prefix << "." << p.first;
+ for (const auto &q : p.second.task_status) {
+ auto p1 = by_task.emplace(q.first, std::map<std::string, std::string>{}).first;
+ auto p2 = p1->second.emplace(d.str(), std::string()).first;
+ p2->second = q.second;
+ }
+ }
+
+ std::stringstream ss;
+ for (const auto &p : by_task) {
+ ss << "\n " << p.first << ":";
+ for (auto q : p.second) {
+ ss << "\n " << q.first << ": " << q.second;
+ }
+ }
+
+ return ss.str();
+ }
+
+ void count_metadata(const std::string& field,
std::map<std::string,int> *out) const {
for (auto& p : daemons) {
auto q = p.second.metadata.find(field);