From: Sage Weil Date: Mon, 26 Jun 2017 17:22:17 +0000 (-0400) Subject: mgr: allow/track service registrations X-Git-Tag: v12.1.1~98^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=97cfc3cb694eb2723cae22d71057323773746cee;p=ceph.git mgr: allow/track service registrations Signed-off-by: Sage Weil --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index b1f88440d0fbf..8d95edd09d069 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1749,6 +1749,7 @@ OPTION(mgr_mon_bytes, OPT_U64, 128*1048576) // bytes from mons OPTION(mgr_mon_messages, OPT_U64, 128) // messages from mons OPTION(mgr_connect_retry_interval, OPT_DOUBLE, 1.0) +OPTION(mgr_service_beacon_grace, OPT_DOUBLE, 60.0) OPTION(mon_mgr_digest_period, OPT_INT, 5) // How frequently to send digests OPTION(mon_mgr_beacon_grace, OPT_INT, 30) // How long to wait to failover diff --git a/src/messages/MMgrOpen.h b/src/messages/MMgrOpen.h index 13c67586b99f6..5db75e3f19586 100644 --- a/src/messages/MMgrOpen.h +++ b/src/messages/MMgrOpen.h @@ -19,26 +19,55 @@ class MMgrOpen : public Message { - static const int HEAD_VERSION = 1; + static const int HEAD_VERSION = 2; static const int COMPAT_VERSION = 1; public: std::string daemon_name; + std::string service_name; // optional; otherwise infer from entity type + + bool service_daemon = false; + std::map daemon_metadata; + std::map daemon_status; void decode_payload() override { bufferlist::iterator p = payload.begin(); ::decode(daemon_name, p); + if (header.version >= 2) { + ::decode(service_name, p); + ::decode(service_daemon, p); + if (service_daemon) { + ::decode(daemon_metadata, p); + ::decode(daemon_status, p); + } + } } void encode_payload(uint64_t features) override { ::encode(daemon_name, payload); + ::encode(service_name, payload); + ::encode(service_daemon, payload); + if (service_daemon) { + ::encode(daemon_metadata, payload); + ::encode(daemon_status, payload); + } } const char *get_type_name() const override { return "mgropen"; } void print(ostream& out) const override { - out << get_type_name() << "(" << daemon_name << ")"; + out << get_type_name() << "("; + if (service_name.length()) { + out << service_name; + } else { + out << ceph_entity_type_name(get_source().type()); + } + out << "." << daemon_name; + if (service_daemon) { + out << " daemon"; + } + out << ")"; } MMgrOpen() diff --git a/src/messages/MMgrReport.h b/src/messages/MMgrReport.h index 52090b256b359..9b033ec23c236 100644 --- a/src/messages/MMgrReport.h +++ b/src/messages/MMgrReport.h @@ -15,6 +15,8 @@ #ifndef CEPH_MMGRREPORT_H_ #define CEPH_MMGRREPORT_H_ +#include + #include "msg/Message.h" #include "common/perf_counters.h" @@ -55,7 +57,7 @@ WRITE_CLASS_ENCODER(PerfCounterType) class MMgrReport : public Message { - static const int HEAD_VERSION = 2; + static const int HEAD_VERSION = 4; static const int COMPAT_VERSION = 1; public: @@ -76,6 +78,10 @@ public: bufferlist packed; std::string daemon_name; + std::string service_name; // optional; otherwise infer from entity type + + // for service registration + boost::optional> daemon_status; void decode_payload() override { @@ -85,6 +91,10 @@ public: ::decode(packed, p); if (header.version >= 2) ::decode(undeclare_types, p); + if (header.version >= 3) { + ::decode(service_name, p); + ::decode(daemon_status, p); + } } void encode_payload(uint64_t features) override { @@ -92,12 +102,26 @@ public: ::encode(declare_types, payload); ::encode(packed, payload); ::encode(undeclare_types, payload); + ::encode(service_name, payload); + ::encode(daemon_status, payload); } const char *get_type_name() const override { return "mgrreport"; } void print(ostream& out) const override { - out << get_type_name() << "(+" << declare_types.size() << "-" << undeclare_types.size() - << " packed " << packed.length() << ")"; + out << get_type_name() << "("; + if (service_name.length()) { + out << service_name; + } else { + out << ceph_entity_type_name(get_source().type()); + } + out << "." << daemon_name + << " +" << declare_types.size() + << "-" << undeclare_types.size() + << " packed " << packed.length(); + if (daemon_status) { + out << " status=" << daemon_status->size(); + } + out << ")"; } MMgrReport() diff --git a/src/mgr/ClusterState.cc b/src/mgr/ClusterState.cc index 44ba19d0b7484..6dee866c7d811 100644 --- a/src/mgr/ClusterState.cc +++ b/src/mgr/ClusterState.cc @@ -53,6 +53,12 @@ void ClusterState::set_mgr_map(MgrMap const &new_mgrmap) mgr_map = new_mgrmap; } +void ClusterState::set_service_map(ServiceMap const &new_service_map) +{ + Mutex::Locker l(lock); + servicemap = new_service_map; +} + void ClusterState::load_digest(MMgrDigest *m) { health_json = std::move(m->health_json); diff --git a/src/mgr/ClusterState.h b/src/mgr/ClusterState.h index 7b4f3149a62bc..9513c763bd0ad 100644 --- a/src/mgr/ClusterState.h +++ b/src/mgr/ClusterState.h @@ -21,6 +21,7 @@ #include "osdc/Objecter.h" #include "mon/MonClient.h" #include "mon/PGMap.h" +#include "mgr/ServiceMap.h" class MMgrDigest; class MMonMgrReport; @@ -37,6 +38,7 @@ protected: MonClient *monc; Objecter *objecter; FSMap fsmap; + ServiceMap servicemap; mutable Mutex lock; MgrMap mgr_map; @@ -65,6 +67,7 @@ public: void set_objecter(Objecter *objecter_); void set_fsmap(FSMap const &new_fsmap); void set_mgr_map(MgrMap const &new_mgrmap); + void set_service_map(ServiceMap const &new_service_map); void notify_osdmap(const OSDMap &osd_map); @@ -73,6 +76,13 @@ public: return fsmap.get_epoch() > 0; } + template + void with_servicemap(Callback&& cb, Args&&...args) const + { + Mutex::Locker l(lock); + std::forward(cb)(servicemap, std::forward(args)...); + } + template void with_fsmap(Callback&& cb, Args&&...args) const { diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index b2e40b41659ee..0362e84addd8c 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -261,19 +261,18 @@ void DaemonServer::shutdown() bool DaemonServer::handle_open(MMgrOpen *m) { - uint32_t type = m->get_connection()->get_peer_type(); - DaemonKey key(ceph_entity_type_name(type), m->daemon_name); + DaemonKey key; + if (!m->service_name.empty()) { + key.first = m->service_name; + } else { + key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); + } + key.second = m->daemon_name; - dout(4) << "from " << m->get_connection() << " name " - << ceph_entity_type_name(type) << "." << m->daemon_name << dendl; + dout(4) << "from " << m->get_connection() << " " << key << dendl; auto configure = new MMgrConfigure(); - if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) { - // We don't want clients to send us stats - configure->stats_period = 0; - } else { - configure->stats_period = g_conf->mgr_stats_period; - } + configure->stats_period = g_conf->mgr_stats_period; m->get_connection()->send_message(configure); if (daemon_state.exists(key)) { @@ -281,42 +280,88 @@ bool DaemonServer::handle_open(MMgrOpen *m) daemon_state.get(key)->perf_counters.clear(); } + if (m->service_daemon) { + DaemonStatePtr daemon; + if (daemon_state.exists(key)) { + daemon = daemon_state.get(key); + } else { + dout(4) << "constructing new DaemonState for " << key << dendl; + daemon = std::make_shared(daemon_state.types); + // FIXME: crap, we don't know the hostname at this stage. + daemon->key = key; + daemon_state.insert(daemon); + } + daemon->service_daemon = true; + daemon->metadata = m->daemon_metadata; + daemon->service_status = m->daemon_status; + + utime_t now = ceph_clock_now(); + auto d = pending_service_map.get_daemon(m->service_name, + m->daemon_name); + if (d->gid != (uint64_t)m->get_source().num()) { + dout(10) << "registering " << key << " in pending_service_map" << dendl; + d->gid = m->get_source().num(); + d->addr = m->get_source_addr(); + d->start_epoch = pending_service_map.epoch; + d->start_stamp = now; + d->metadata = m->daemon_metadata; + pending_service_map_dirty = pending_service_map.epoch; + } + } + m->put(); return true; } bool DaemonServer::handle_report(MMgrReport *m) { - uint32_t type = m->get_connection()->get_peer_type(); - DaemonKey key(ceph_entity_type_name(type), m->daemon_name); + DaemonKey key; + if (!m->service_name.empty()) { + key.first = m->service_name; + } else { + key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); + } + key.second = m->daemon_name; - dout(4) << "from " << m->get_connection() << " name " - << ceph_entity_type_name(type) << "." << m->daemon_name << dendl; + dout(4) << "from " << m->get_connection() << " " << key << dendl; - if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) { - // Clients should not be sending us stats - dout(4) << "rejecting report from client " << m->daemon_name << dendl; + if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT && + m->service_name.empty()) { + // Clients should not be sending us stats unless they are declaring + // themselves to be a daemon for some service. + dout(4) << "rejecting report from non-daemon client " << m->daemon_name + << dendl; m->put(); return true; } DaemonStatePtr daemon; if (daemon_state.exists(key)) { - dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl; + dout(20) << "updating existing DaemonState for " << key << dendl; daemon = daemon_state.get(key); } else { - dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl; + dout(4) << "constructing new DaemonState for " << key << dendl; daemon = std::make_shared(daemon_state.types); // FIXME: crap, we don't know the hostname at this stage. daemon->key = key; daemon_state.insert(daemon); // FIXME: we should request metadata at this stage } - assert(daemon != nullptr); auto &daemon_counters = daemon->perf_counters; daemon_counters.update(m); - + + if (daemon->service_daemon) { + utime_t now = ceph_clock_now(); + if (m->daemon_status) { + daemon->service_status = *m->daemon_status; + daemon->service_status_stamp = now; + } + daemon->last_service_beacon = now; + } else if (m->daemon_status) { + derr << "got status from non-daemon " << key << dendl; + } + m->put(); return true; } @@ -842,12 +887,62 @@ bool DaemonServer::handle_command(MCommand *m) } } +void DaemonServer::_prune_pending_service_map() +{ + utime_t cutoff = ceph_clock_now(); + cutoff -= g_conf->mgr_service_beacon_grace; + auto p = pending_service_map.services.begin(); + while (p != pending_service_map.services.end()) { + auto q = p->second.daemons.begin(); + while (q != p->second.daemons.end()) { + DaemonKey key(p->first, q->first); + if (!daemon_state.exists(key)) { + derr << "missing key " << key << dendl; + ++q; + continue; + } + auto daemon = daemon_state.get(key); + if (daemon->last_service_beacon == utime_t()) { + // we must have just restarted; assume they are alive now. + daemon->last_service_beacon = ceph_clock_now(); + ++q; + continue; + } + if (daemon->last_service_beacon < cutoff) { + dout(10) << "pruning stale " << p->first << "." << q->first + << " last_beacon " << daemon->last_service_beacon << dendl; + q = p->second.daemons.erase(q); + pending_service_map_dirty = pending_service_map.epoch; + } else { + ++q; + } + } + if (p->second.daemons.empty()) { + p = pending_service_map.services.erase(p); + pending_service_map_dirty = pending_service_map.epoch; + } else { + ++p; + } + } +} + void DaemonServer::send_report() { auto m = new MMonMgrReport(); cluster_state.with_pgmap([&](const PGMap& pg_map) { cluster_state.update_delta_stats(); + if (pending_service_map.epoch) { + _prune_pending_service_map(); + if (pending_service_map_dirty >= pending_service_map.epoch) { + pending_service_map.modified = ceph_clock_now(); + ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL); + dout(10) << "sending service_map e" << pending_service_map.epoch + << dendl; + pending_service_map.epoch++; + } + } + // FIXME: reporting health detail here might be a bad idea? cluster_state.with_osdmap([&](const OSDMap& osdmap) { // FIXME: no easy way to get mon features here. this will do for @@ -864,3 +959,39 @@ void DaemonServer::send_report() // so, or the state is updated. monc->send_mon_message(m); } + +void DaemonServer::got_service_map() +{ + Mutex::Locker l(lock); + + cluster_state.with_servicemap([&](const ServiceMap& service_map) { + if (pending_service_map.epoch == 0) { + // we just started up + dout(10) << "got initial map e" << service_map.epoch << dendl; + pending_service_map = service_map; + } else { + // we we already active and therefore must have persisted it, + // which means ours is the same or newer. + dout(10) << "got updated map e" << service_map.epoch << dendl; + } + pending_service_map.epoch = service_map.epoch + 1; + }); + + // cull missing daemons, populate new ones + for (auto& p : pending_service_map.services) { + std::set names; + for (auto& q : p.second.daemons) { + names.insert(q.first); + DaemonKey key(p.first, q.first); + if (!daemon_state.exists(key)) { + auto daemon = std::make_shared(daemon_state.types); + daemon->key = key; + daemon_state.insert(daemon); + daemon->metadata = q.second.metadata; + daemon->service_daemon = true; + dout(10) << "added missing " << key << dendl; + } + } + daemon_state.cull(p.first, names); + } +} diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h index a1ed292e1a0c1..06ee68b8adc80 100644 --- a/src/mgr/DaemonServer.h +++ b/src/mgr/DaemonServer.h @@ -27,6 +27,7 @@ #include "auth/AuthAuthorizeHandler.h" +#include "ServiceMap.h" #include "MgrSession.h" #include "DaemonState.h" @@ -66,6 +67,9 @@ protected: /// connections for osds ceph::unordered_map> osd_cons; + ServiceMap pending_service_map; // uncommitted + epoch_t pending_service_map_dirty = 0; + Mutex lock; static void _generate_command_map(map& cmdmap, @@ -83,6 +87,8 @@ private: bool _reply(MCommand* m, int ret, const std::string& s, const bufferlist& payload); + void _prune_pending_service_map(); + public: int init(uint64_t gid, entity_addr_t client_addr); void shutdown(); @@ -116,6 +122,7 @@ public: bool handle_report(MMgrReport *m); bool handle_command(MCommand *m); void send_report(); + void got_service_map(); }; #endif diff --git a/src/mgr/DaemonState.h b/src/mgr/DaemonState.h index bad83bb8fec77..e75b968425e97 100644 --- a/src/mgr/DaemonState.h +++ b/src/mgr/DaemonState.h @@ -102,6 +102,12 @@ class DaemonState // The metadata (hostname, version, etc) sent from the daemon std::map metadata; + // Ephemeral state + bool service_daemon = false; + utime_t service_status_stamp; + std::map service_status; + utime_t last_service_beacon; + // The perf counters received in MMgrReport messages DaemonPerfCounters perf_counters; diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index e5ed5eedaa9e3..c74324009533c 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -30,6 +30,7 @@ #include "messages/MCommand.h" #include "messages/MCommandReply.h" #include "messages/MLog.h" +#include "messages/MServiceMap.h" #include "Mgr.h" @@ -184,6 +185,7 @@ void Mgr::init() monc->sub_want("log-info", 0, 0); monc->sub_want("mgrdigest", 0, 0); monc->sub_want("fsmap", 0, 0); + monc->sub_want("servicemap", 0, 0); dout(4) << "waiting for OSDMap..." << dendl; // Subscribe to OSDMap update to pass on to ClusterState @@ -457,6 +459,13 @@ void Mgr::handle_log(MLog *m) m->put(); } +void Mgr::handle_service_map(MServiceMap *m) +{ + dout(10) << "e" << m->service_map.epoch << dendl; + cluster_state.set_service_map(m->service_map); + server.got_service_map(); +} + bool Mgr::ms_dispatch(Message *m) { dout(4) << *m << dendl; @@ -485,6 +494,11 @@ bool Mgr::ms_dispatch(Message *m) objecter->maybe_request_map(); m->put(); break; + case MSG_SERVICE_MAP: + handle_service_map((MServiceMap*)m); + py_modules.notify_all("service_map", ""); + m->put(); + break; case MSG_LOG: handle_log(static_cast(m)); break; diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h index 3638f5b16884f..90472cfec640f 100644 --- a/src/mgr/Mgr.h +++ b/src/mgr/Mgr.h @@ -39,10 +39,10 @@ class MCommand; class MMgrDigest; class MLog; +class MServiceMap; class Objecter; class Client; - class MgrPyModule; class Mgr { @@ -84,6 +84,7 @@ public: void handle_fs_map(MFSMap* m); void handle_osd_map(); void handle_log(MLog *m); + void handle_service_map(MServiceMap *m); bool got_mgr_map(const MgrMap& m); diff --git a/src/mgr/MgrClient.cc b/src/mgr/MgrClient.cc index c7774959d22e0..624fc0f4f6050 100644 --- a/src/mgr/MgrClient.cc +++ b/src/mgr/MgrClient.cc @@ -140,12 +140,15 @@ void MgrClient::reconnect() session.reset(new MgrSessionState()); session->con = msgr->get_connection(inst); + if (service_daemon) { + daemon_dirty_status = true; + } + // Don't send an open if we're just a client (i.e. doing // command-sending, not stats etc) - if (g_conf && !g_conf->name.is_client()) { - auto open = new MMgrOpen(); - open->daemon_name = g_conf->name.get_id(); - session->con->send_message(open); + if ((g_conf && !g_conf->name.is_client()) || + service_daemon) { + _send_open(); } // resend any pending commands @@ -157,6 +160,24 @@ void MgrClient::reconnect() } } +void MgrClient::_send_open() +{ + if (session && session->con) { + auto open = new MMgrOpen(); + if (!service_name.empty()) { + open->service_name = service_name; + open->daemon_name = daemon_name; + } else { + open->daemon_name = g_conf->name.get_id(); + } + if (service_daemon) { + open->service_daemon = service_daemon; + open->daemon_metadata = daemon_metadata; + } + session->con->send_message(open); + } +} + bool MgrClient::handle_mgr_map(MMgrMap *m) { assert(lock.is_locked_by_me()); @@ -250,7 +271,17 @@ void MgrClient::send_report() ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl; - report->daemon_name = g_conf->name.get_id(); + if (daemon_name.size()) { + report->daemon_name = daemon_name; + } else { + report->daemon_name = g_conf->name.get_id(); + } + report->service_name = service_name; + + if (daemon_dirty_status) { + report->daemon_status = daemon_status; + daemon_dirty_status = false; + } session->con->send_message(report); @@ -354,3 +385,44 @@ bool MgrClient::handle_command_reply(MCommandReply *m) return true; } +int MgrClient::service_daemon_register( + const std::string& service, + const std::string& name, + const std::map& metadata) +{ + Mutex::Locker l(lock); + if (name == "osd" || + name == "mds" || + name == "client" || + name == "mon" || + name == "mgr") { + // normal ceph entity types are not allowed! + return -EINVAL; + } + if (service_daemon) { + return -EEXIST; + } + ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl; + service_daemon = true; + service_name = service; + daemon_name = name; + daemon_metadata = metadata; + daemon_dirty_status = true; + + // late register? + if (g_conf->name.is_client() && session && session->con) { + _send_open(); + } + + return 0; +} + +int MgrClient::service_daemon_update_status( + const std::map& status) +{ + Mutex::Locker l(lock); + ldout(cct,10) << status << dendl; + daemon_status = status; + daemon_dirty_status = true; + return 0; +} diff --git a/src/mgr/MgrClient.h b/src/mgr/MgrClient.h index 87df28650a2d7..09fe831b39483 100644 --- a/src/mgr/MgrClient.h +++ b/src/mgr/MgrClient.h @@ -72,7 +72,15 @@ protected: // our reports (hook for use by OSD) std::function pgstats_cb; + // for service registration and beacon + bool service_daemon = false; + bool daemon_dirty_status = false; + std::string service_name, daemon_name; + std::map daemon_metadata; + std::map daemon_status; + void reconnect(); + void _send_open(); public: MgrClient(CephContext *cct_, Messenger *msgr_); @@ -103,6 +111,13 @@ public: int start_command(const vector& cmd, const bufferlist& inbl, bufferlist *outbl, string *outs, Context *onfinish); + + int service_daemon_register( + const std::string& service, + const std::string& name, + const std::map& metadata); + int service_daemon_update_status( + const std::map& status); }; #endif