OPTION(mgr_mon_messages, OPT_U64, 128) // messages from mons
OPTION(mgr_connect_retry_interval, OPT_DOUBLE, 1.0)
+OPTION(mgr_service_beacon_grace, OPT_DOUBLE, 60.0)
OPTION(mon_mgr_digest_period, OPT_INT, 5) // How frequently to send digests
OPTION(mon_mgr_beacon_grace, OPT_INT, 30) // How long to wait to failover
class MMgrOpen : public Message
{
- static const int HEAD_VERSION = 1;
+ static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
public:
std::string daemon_name;
+ std::string service_name; // optional; otherwise infer from entity type
+
+ bool service_daemon = false;
+ std::map<std::string,std::string> daemon_metadata;
+ std::map<std::string,std::string> daemon_status;
void decode_payload() override
{
bufferlist::iterator p = payload.begin();
::decode(daemon_name, p);
+ if (header.version >= 2) {
+ ::decode(service_name, p);
+ ::decode(service_daemon, p);
+ if (service_daemon) {
+ ::decode(daemon_metadata, p);
+ ::decode(daemon_status, p);
+ }
+ }
}
void encode_payload(uint64_t features) override {
::encode(daemon_name, payload);
+ ::encode(service_name, payload);
+ ::encode(service_daemon, payload);
+ if (service_daemon) {
+ ::encode(daemon_metadata, payload);
+ ::encode(daemon_status, payload);
+ }
}
const char *get_type_name() const override { return "mgropen"; }
void print(ostream& out) const override {
- out << get_type_name() << "(" << daemon_name << ")";
+ out << get_type_name() << "(";
+ if (service_name.length()) {
+ out << service_name;
+ } else {
+ out << ceph_entity_type_name(get_source().type());
+ }
+ out << "." << daemon_name;
+ if (service_daemon) {
+ out << " daemon";
+ }
+ out << ")";
}
MMgrOpen()
#ifndef CEPH_MMGRREPORT_H_
#define CEPH_MMGRREPORT_H_
+#include <boost/optional.hpp>
+
#include "msg/Message.h"
#include "common/perf_counters.h"
class MMgrReport : public Message
{
- static const int HEAD_VERSION = 2;
+ static const int HEAD_VERSION = 4;
static const int COMPAT_VERSION = 1;
public:
bufferlist packed;
std::string daemon_name;
+ std::string service_name; // optional; otherwise infer from entity type
+
+ // for service registration
+ boost::optional<std::map<std::string,std::string>> daemon_status;
void decode_payload() override
{
::decode(packed, p);
if (header.version >= 2)
::decode(undeclare_types, p);
+ if (header.version >= 3) {
+ ::decode(service_name, p);
+ ::decode(daemon_status, p);
+ }
}
void encode_payload(uint64_t features) override {
::encode(declare_types, payload);
::encode(packed, payload);
::encode(undeclare_types, payload);
+ ::encode(service_name, payload);
+ ::encode(daemon_status, payload);
}
const char *get_type_name() const override { return "mgrreport"; }
void print(ostream& out) const override {
- out << get_type_name() << "(+" << declare_types.size() << "-" << undeclare_types.size()
- << " packed " << packed.length() << ")";
+ out << get_type_name() << "(";
+ if (service_name.length()) {
+ out << service_name;
+ } else {
+ out << ceph_entity_type_name(get_source().type());
+ }
+ out << "." << daemon_name
+ << " +" << declare_types.size()
+ << "-" << undeclare_types.size()
+ << " packed " << packed.length();
+ if (daemon_status) {
+ out << " status=" << daemon_status->size();
+ }
+ out << ")";
}
MMgrReport()
mgr_map = new_mgrmap;
}
+void ClusterState::set_service_map(ServiceMap const &new_service_map)
+{
+ Mutex::Locker l(lock);
+ servicemap = new_service_map;
+}
+
void ClusterState::load_digest(MMgrDigest *m)
{
health_json = std::move(m->health_json);
#include "osdc/Objecter.h"
#include "mon/MonClient.h"
#include "mon/PGMap.h"
+#include "mgr/ServiceMap.h"
class MMgrDigest;
class MMonMgrReport;
MonClient *monc;
Objecter *objecter;
FSMap fsmap;
+ ServiceMap servicemap;
mutable Mutex lock;
MgrMap mgr_map;
void set_objecter(Objecter *objecter_);
void set_fsmap(FSMap const &new_fsmap);
void set_mgr_map(MgrMap const &new_mgrmap);
+ void set_service_map(ServiceMap const &new_service_map);
void notify_osdmap(const OSDMap &osd_map);
return fsmap.get_epoch() > 0;
}
+ template<typename Callback, typename...Args>
+ void with_servicemap(Callback&& cb, Args&&...args) const
+ {
+ Mutex::Locker l(lock);
+ std::forward<Callback>(cb)(servicemap, std::forward<Args>(args)...);
+ }
+
template<typename Callback, typename...Args>
void with_fsmap(Callback&& cb, Args&&...args) const
{
bool DaemonServer::handle_open(MMgrOpen *m)
{
- uint32_t type = m->get_connection()->get_peer_type();
- DaemonKey key(ceph_entity_type_name(type), m->daemon_name);
+ DaemonKey key;
+ if (!m->service_name.empty()) {
+ key.first = m->service_name;
+ } else {
+ key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ }
+ key.second = m->daemon_name;
- dout(4) << "from " << m->get_connection() << " name "
- << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+ dout(4) << "from " << m->get_connection() << " " << key << dendl;
auto configure = new MMgrConfigure();
- if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
- // We don't want clients to send us stats
- configure->stats_period = 0;
- } else {
- configure->stats_period = g_conf->mgr_stats_period;
- }
+ configure->stats_period = g_conf->mgr_stats_period;
m->get_connection()->send_message(configure);
if (daemon_state.exists(key)) {
daemon_state.get(key)->perf_counters.clear();
}
+ if (m->service_daemon) {
+ DaemonStatePtr daemon;
+ if (daemon_state.exists(key)) {
+ daemon = daemon_state.get(key);
+ } else {
+ dout(4) << "constructing new DaemonState for " << key << dendl;
+ daemon = std::make_shared<DaemonState>(daemon_state.types);
+ // FIXME: crap, we don't know the hostname at this stage.
+ daemon->key = key;
+ daemon_state.insert(daemon);
+ }
+ daemon->service_daemon = true;
+ daemon->metadata = m->daemon_metadata;
+ daemon->service_status = m->daemon_status;
+
+ utime_t now = ceph_clock_now();
+ auto d = pending_service_map.get_daemon(m->service_name,
+ m->daemon_name);
+ if (d->gid != (uint64_t)m->get_source().num()) {
+ dout(10) << "registering " << key << " in pending_service_map" << dendl;
+ d->gid = m->get_source().num();
+ d->addr = m->get_source_addr();
+ d->start_epoch = pending_service_map.epoch;
+ d->start_stamp = now;
+ d->metadata = m->daemon_metadata;
+ pending_service_map_dirty = pending_service_map.epoch;
+ }
+ }
+
m->put();
return true;
}
bool DaemonServer::handle_report(MMgrReport *m)
{
- uint32_t type = m->get_connection()->get_peer_type();
- DaemonKey key(ceph_entity_type_name(type), m->daemon_name);
+ DaemonKey key;
+ if (!m->service_name.empty()) {
+ key.first = m->service_name;
+ } else {
+ key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ }
+ key.second = m->daemon_name;
- dout(4) << "from " << m->get_connection() << " name "
- << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+ dout(4) << "from " << m->get_connection() << " " << key << dendl;
- if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
- // Clients should not be sending us stats
- dout(4) << "rejecting report from client " << m->daemon_name << dendl;
+ if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
+ m->service_name.empty()) {
+ // Clients should not be sending us stats unless they are declaring
+ // themselves to be a daemon for some service.
+ dout(4) << "rejecting report from non-daemon client " << m->daemon_name
+ << dendl;
m->put();
return true;
}
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
- dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
+ dout(20) << "updating existing DaemonState for " << key << dendl;
daemon = daemon_state.get(key);
} else {
- dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl;
+ dout(4) << "constructing new DaemonState for " << key << dendl;
daemon = std::make_shared<DaemonState>(daemon_state.types);
// FIXME: crap, we don't know the hostname at this stage.
daemon->key = key;
daemon_state.insert(daemon);
// FIXME: we should request metadata at this stage
}
-
assert(daemon != nullptr);
auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(m);
-
+
+ if (daemon->service_daemon) {
+ utime_t now = ceph_clock_now();
+ if (m->daemon_status) {
+ daemon->service_status = *m->daemon_status;
+ daemon->service_status_stamp = now;
+ }
+ daemon->last_service_beacon = now;
+ } else if (m->daemon_status) {
+ derr << "got status from non-daemon " << key << dendl;
+ }
+
m->put();
return true;
}
}
}
+void DaemonServer::_prune_pending_service_map()
+{
+ utime_t cutoff = ceph_clock_now();
+ cutoff -= g_conf->mgr_service_beacon_grace;
+ auto p = pending_service_map.services.begin();
+ while (p != pending_service_map.services.end()) {
+ auto q = p->second.daemons.begin();
+ while (q != p->second.daemons.end()) {
+ DaemonKey key(p->first, q->first);
+ if (!daemon_state.exists(key)) {
+ derr << "missing key " << key << dendl;
+ ++q;
+ continue;
+ }
+ auto daemon = daemon_state.get(key);
+ if (daemon->last_service_beacon == utime_t()) {
+ // we must have just restarted; assume they are alive now.
+ daemon->last_service_beacon = ceph_clock_now();
+ ++q;
+ continue;
+ }
+ if (daemon->last_service_beacon < cutoff) {
+ dout(10) << "pruning stale " << p->first << "." << q->first
+ << " last_beacon " << daemon->last_service_beacon << dendl;
+ q = p->second.daemons.erase(q);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ ++q;
+ }
+ }
+ if (p->second.daemons.empty()) {
+ p = pending_service_map.services.erase(p);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ ++p;
+ }
+ }
+}
+
void DaemonServer::send_report()
{
auto m = new MMonMgrReport();
cluster_state.with_pgmap([&](const PGMap& pg_map) {
cluster_state.update_delta_stats();
+ if (pending_service_map.epoch) {
+ _prune_pending_service_map();
+ if (pending_service_map_dirty >= pending_service_map.epoch) {
+ pending_service_map.modified = ceph_clock_now();
+ ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
+ dout(10) << "sending service_map e" << pending_service_map.epoch
+ << dendl;
+ pending_service_map.epoch++;
+ }
+ }
+
// FIXME: reporting health detail here might be a bad idea?
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
// FIXME: no easy way to get mon features here. this will do for
// so, or the state is updated.
monc->send_mon_message(m);
}
+
+void DaemonServer::got_service_map()
+{
+ Mutex::Locker l(lock);
+
+ cluster_state.with_servicemap([&](const ServiceMap& service_map) {
+ if (pending_service_map.epoch == 0) {
+ // we just started up
+ dout(10) << "got initial map e" << service_map.epoch << dendl;
+ pending_service_map = service_map;
+ } else {
+ // we we already active and therefore must have persisted it,
+ // which means ours is the same or newer.
+ dout(10) << "got updated map e" << service_map.epoch << dendl;
+ }
+ pending_service_map.epoch = service_map.epoch + 1;
+ });
+
+ // cull missing daemons, populate new ones
+ for (auto& p : pending_service_map.services) {
+ std::set<std::string> names;
+ for (auto& q : p.second.daemons) {
+ names.insert(q.first);
+ DaemonKey key(p.first, q.first);
+ if (!daemon_state.exists(key)) {
+ auto daemon = std::make_shared<DaemonState>(daemon_state.types);
+ daemon->key = key;
+ daemon_state.insert(daemon);
+ daemon->metadata = q.second.metadata;
+ daemon->service_daemon = true;
+ dout(10) << "added missing " << key << dendl;
+ }
+ }
+ daemon_state.cull(p.first, names);
+ }
+}
#include "auth/AuthAuthorizeHandler.h"
+#include "ServiceMap.h"
#include "MgrSession.h"
#include "DaemonState.h"
/// connections for osds
ceph::unordered_map<int,set<ConnectionRef>> osd_cons;
+ ServiceMap pending_service_map; // uncommitted
+ epoch_t pending_service_map_dirty = 0;
+
Mutex lock;
static void _generate_command_map(map<string,cmd_vartype>& cmdmap,
bool _reply(MCommand* m,
int ret, const std::string& s, const bufferlist& payload);
+ void _prune_pending_service_map();
+
public:
int init(uint64_t gid, entity_addr_t client_addr);
void shutdown();
bool handle_report(MMgrReport *m);
bool handle_command(MCommand *m);
void send_report();
+ void got_service_map();
};
#endif
// The metadata (hostname, version, etc) sent from the daemon
std::map<std::string, std::string> metadata;
+ // Ephemeral state
+ bool service_daemon = false;
+ utime_t service_status_stamp;
+ std::map<std::string, std::string> service_status;
+ utime_t last_service_beacon;
+
// The perf counters received in MMgrReport messages
DaemonPerfCounters perf_counters;
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
#include "messages/MLog.h"
+#include "messages/MServiceMap.h"
#include "Mgr.h"
monc->sub_want("log-info", 0, 0);
monc->sub_want("mgrdigest", 0, 0);
monc->sub_want("fsmap", 0, 0);
+ monc->sub_want("servicemap", 0, 0);
dout(4) << "waiting for OSDMap..." << dendl;
// Subscribe to OSDMap update to pass on to ClusterState
m->put();
}
+void Mgr::handle_service_map(MServiceMap *m)
+{
+ dout(10) << "e" << m->service_map.epoch << dendl;
+ cluster_state.set_service_map(m->service_map);
+ server.got_service_map();
+}
+
bool Mgr::ms_dispatch(Message *m)
{
dout(4) << *m << dendl;
objecter->maybe_request_map();
m->put();
break;
+ case MSG_SERVICE_MAP:
+ handle_service_map((MServiceMap*)m);
+ py_modules.notify_all("service_map", "");
+ m->put();
+ break;
case MSG_LOG:
handle_log(static_cast<MLog *>(m));
break;
class MCommand;
class MMgrDigest;
class MLog;
+class MServiceMap;
class Objecter;
class Client;
-
class MgrPyModule;
class Mgr {
void handle_fs_map(MFSMap* m);
void handle_osd_map();
void handle_log(MLog *m);
+ void handle_service_map(MServiceMap *m);
bool got_mgr_map(const MgrMap& m);
session.reset(new MgrSessionState());
session->con = msgr->get_connection(inst);
+ if (service_daemon) {
+ daemon_dirty_status = true;
+ }
+
// Don't send an open if we're just a client (i.e. doing
// command-sending, not stats etc)
- if (g_conf && !g_conf->name.is_client()) {
- auto open = new MMgrOpen();
- open->daemon_name = g_conf->name.get_id();
- session->con->send_message(open);
+ if ((g_conf && !g_conf->name.is_client()) ||
+ service_daemon) {
+ _send_open();
}
// resend any pending commands
}
}
+void MgrClient::_send_open()
+{
+ if (session && session->con) {
+ auto open = new MMgrOpen();
+ if (!service_name.empty()) {
+ open->service_name = service_name;
+ open->daemon_name = daemon_name;
+ } else {
+ open->daemon_name = g_conf->name.get_id();
+ }
+ if (service_daemon) {
+ open->service_daemon = service_daemon;
+ open->daemon_metadata = daemon_metadata;
+ }
+ session->con->send_message(open);
+ }
+}
+
bool MgrClient::handle_mgr_map(MMgrMap *m)
{
assert(lock.is_locked_by_me());
ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
- report->daemon_name = g_conf->name.get_id();
+ if (daemon_name.size()) {
+ report->daemon_name = daemon_name;
+ } else {
+ report->daemon_name = g_conf->name.get_id();
+ }
+ report->service_name = service_name;
+
+ if (daemon_dirty_status) {
+ report->daemon_status = daemon_status;
+ daemon_dirty_status = false;
+ }
session->con->send_message(report);
return true;
}
+int MgrClient::service_daemon_register(
+ const std::string& service,
+ const std::string& name,
+ const std::map<std::string,std::string>& metadata)
+{
+ Mutex::Locker l(lock);
+ if (name == "osd" ||
+ name == "mds" ||
+ name == "client" ||
+ name == "mon" ||
+ name == "mgr") {
+ // normal ceph entity types are not allowed!
+ return -EINVAL;
+ }
+ if (service_daemon) {
+ return -EEXIST;
+ }
+ ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
+ service_daemon = true;
+ service_name = service;
+ daemon_name = name;
+ daemon_metadata = metadata;
+ daemon_dirty_status = true;
+
+ // late register?
+ if (g_conf->name.is_client() && session && session->con) {
+ _send_open();
+ }
+
+ return 0;
+}
+
+int MgrClient::service_daemon_update_status(
+ const std::map<std::string,std::string>& status)
+{
+ Mutex::Locker l(lock);
+ ldout(cct,10) << status << dendl;
+ daemon_status = status;
+ daemon_dirty_status = true;
+ return 0;
+}
// our reports (hook for use by OSD)
std::function<MPGStats*()> pgstats_cb;
+ // for service registration and beacon
+ bool service_daemon = false;
+ bool daemon_dirty_status = false;
+ std::string service_name, daemon_name;
+ std::map<std::string,std::string> daemon_metadata;
+ std::map<std::string,std::string> daemon_status;
+
void reconnect();
+ void _send_open();
public:
MgrClient(CephContext *cct_, Messenger *msgr_);
int start_command(const vector<string>& cmd, const bufferlist& inbl,
bufferlist *outbl, string *outs,
Context *onfinish);
+
+ int service_daemon_register(
+ const std::string& service,
+ const std::string& name,
+ const std::map<std::string,std::string>& metadata);
+ int service_daemon_update_status(
+ const std::map<std::string,std::string>& status);
};
#endif