]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: allow/track service registrations
authorSage Weil <sage@redhat.com>
Mon, 26 Jun 2017 17:22:17 +0000 (13:22 -0400)
committerSage Weil <sage@redhat.com>
Sun, 9 Jul 2017 02:30:28 +0000 (22:30 -0400)
Signed-off-by: Sage Weil <sage@redhat.com>
12 files changed:
src/common/config_opts.h
src/messages/MMgrOpen.h
src/messages/MMgrReport.h
src/mgr/ClusterState.cc
src/mgr/ClusterState.h
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/DaemonState.h
src/mgr/Mgr.cc
src/mgr/Mgr.h
src/mgr/MgrClient.cc
src/mgr/MgrClient.h

index b1f88440d0fbf9055ba62b58a38860185d9df175..8d95edd09d069066fdacf24e4f93f846ddafc743 100644 (file)
@@ -1749,6 +1749,7 @@ OPTION(mgr_mon_bytes, OPT_U64, 128*1048576)   // bytes from mons
 OPTION(mgr_mon_messages, OPT_U64, 128)        // messages from mons
 
 OPTION(mgr_connect_retry_interval, OPT_DOUBLE, 1.0)
+OPTION(mgr_service_beacon_grace, OPT_DOUBLE, 60.0)
 
 OPTION(mon_mgr_digest_period, OPT_INT, 5)  // How frequently to send digests
 OPTION(mon_mgr_beacon_grace, OPT_INT, 30)  // How long to wait to failover
index 13c67586b99f6b21e213fddd628aa1323f193f13..5db75e3f19586bb2be337ef89ec26f34d4945427 100644 (file)
 
 class MMgrOpen : public Message
 {
-  static const int HEAD_VERSION = 1;
+  static const int HEAD_VERSION = 2;
   static const int COMPAT_VERSION = 1;
 
 public:
 
   std::string daemon_name;
+  std::string service_name;  // optional; otherwise infer from entity type
+
+  bool service_daemon = false;
+  std::map<std::string,std::string> daemon_metadata;
+  std::map<std::string,std::string> daemon_status;
 
   void decode_payload() override
   {
     bufferlist::iterator p = payload.begin();
     ::decode(daemon_name, p);
+    if (header.version >= 2) {
+      ::decode(service_name, p);
+      ::decode(service_daemon, p);
+      if (service_daemon) {
+       ::decode(daemon_metadata, p);
+       ::decode(daemon_status, p);
+      }
+    }
   }
 
   void encode_payload(uint64_t features) override {
     ::encode(daemon_name, payload);
+    ::encode(service_name, payload);
+    ::encode(service_daemon, payload);
+    if (service_daemon) {
+      ::encode(daemon_metadata, payload);
+      ::encode(daemon_status, payload);
+    }
   }
 
   const char *get_type_name() const override { return "mgropen"; }
   void print(ostream& out) const override {
-    out << get_type_name() << "(" << daemon_name << ")"; 
+    out << get_type_name() << "(";
+    if (service_name.length()) {
+      out << service_name;
+    } else {
+      out << ceph_entity_type_name(get_source().type());
+    }
+    out << "." << daemon_name;
+    if (service_daemon) {
+      out << " daemon";
+    }
+    out << ")";
   }
 
   MMgrOpen()
index 52090b256b3594ac377bf55c3400e4780fec6966..9b033ec23c23645bf7bf474635a99fa6e93da662 100644 (file)
@@ -15,6 +15,8 @@
 #ifndef CEPH_MMGRREPORT_H_
 #define CEPH_MMGRREPORT_H_
 
+#include <boost/optional.hpp>
+
 #include "msg/Message.h"
 
 #include "common/perf_counters.h"
@@ -55,7 +57,7 @@ WRITE_CLASS_ENCODER(PerfCounterType)
 
 class MMgrReport : public Message
 {
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 4;
   static const int COMPAT_VERSION = 1;
 
 public:
@@ -76,6 +78,10 @@ public:
   bufferlist packed;
 
   std::string daemon_name;
+  std::string service_name;  // optional; otherwise infer from entity type
+
+  // for service registration
+  boost::optional<std::map<std::string,std::string>> daemon_status;
 
   void decode_payload() override
   {
@@ -85,6 +91,10 @@ public:
     ::decode(packed, p);
     if (header.version >= 2)
       ::decode(undeclare_types, p);
+    if (header.version >= 3) {
+      ::decode(service_name, p);
+      ::decode(daemon_status, p);
+    }
   }
 
   void encode_payload(uint64_t features) override {
@@ -92,12 +102,26 @@ public:
     ::encode(declare_types, payload);
     ::encode(packed, payload);
     ::encode(undeclare_types, payload);
+    ::encode(service_name, payload);
+    ::encode(daemon_status, payload);
   }
 
   const char *get_type_name() const override { return "mgrreport"; }
   void print(ostream& out) const override {
-    out << get_type_name() << "(+" << declare_types.size() << "-" << undeclare_types.size()
-        << " packed " << packed.length() << ")";
+    out << get_type_name() << "(";
+    if (service_name.length()) {
+      out << service_name;
+    } else {
+      out << ceph_entity_type_name(get_source().type());
+    }
+    out << "." << daemon_name
+       << " +" << declare_types.size()
+       << "-" << undeclare_types.size()
+        << " packed " << packed.length();
+    if (daemon_status) {
+      out << " status=" << daemon_status->size();
+    }
+    out << ")";
   }
 
   MMgrReport()
index 44ba19d0b74843f75a8e0da7a87a2a2889157208..6dee866c7d81166de063836047294c3d91094de3 100644 (file)
@@ -53,6 +53,12 @@ void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
   mgr_map = new_mgrmap;
 }
 
+void ClusterState::set_service_map(ServiceMap const &new_service_map)
+{
+  Mutex::Locker l(lock);
+  servicemap = new_service_map;
+}
+
 void ClusterState::load_digest(MMgrDigest *m)
 {
   health_json = std::move(m->health_json);
index 7b4f3149a62bc71e7537b407bf98febff359a2b9..9513c763bd0ad2790a5b01e2ba810fc281f3032b 100644 (file)
@@ -21,6 +21,7 @@
 #include "osdc/Objecter.h"
 #include "mon/MonClient.h"
 #include "mon/PGMap.h"
+#include "mgr/ServiceMap.h"
 
 class MMgrDigest;
 class MMonMgrReport;
@@ -37,6 +38,7 @@ protected:
   MonClient *monc;
   Objecter *objecter;
   FSMap fsmap;
+  ServiceMap servicemap;
   mutable Mutex lock;
 
   MgrMap mgr_map;
@@ -65,6 +67,7 @@ public:
   void set_objecter(Objecter *objecter_);
   void set_fsmap(FSMap const &new_fsmap);
   void set_mgr_map(MgrMap const &new_mgrmap);
+  void set_service_map(ServiceMap const &new_service_map);
 
   void notify_osdmap(const OSDMap &osd_map);
 
@@ -73,6 +76,13 @@ public:
     return fsmap.get_epoch() > 0;
   }
 
+  template<typename Callback, typename...Args>
+  void with_servicemap(Callback&& cb, Args&&...args) const
+  {
+    Mutex::Locker l(lock);
+    std::forward<Callback>(cb)(servicemap, std::forward<Args>(args)...);
+  }
+
   template<typename Callback, typename...Args>
   void with_fsmap(Callback&& cb, Args&&...args) const
   {
index b2e40b41659ee9af395830d0c4da6f079f0f92e3..0362e84addd8c8445d926ef03b6e71074bdddaf0 100644 (file)
@@ -261,19 +261,18 @@ void DaemonServer::shutdown()
 
 bool DaemonServer::handle_open(MMgrOpen *m)
 {
-  uint32_t type = m->get_connection()->get_peer_type();
-  DaemonKey key(ceph_entity_type_name(type), m->daemon_name);
+  DaemonKey key;
+  if (!m->service_name.empty()) {
+    key.first = m->service_name;
+  } else {
+    key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+  }
+  key.second = m->daemon_name;
 
-  dout(4) << "from " << m->get_connection() << " name "
-          << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+  dout(4) << "from " << m->get_connection() << "  " << key << dendl;
 
   auto configure = new MMgrConfigure();
-  if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
-    // We don't want clients to send us stats
-    configure->stats_period = 0;
-  } else {
-    configure->stats_period = g_conf->mgr_stats_period;
-  }
+  configure->stats_period = g_conf->mgr_stats_period;
   m->get_connection()->send_message(configure);
 
   if (daemon_state.exists(key)) {
@@ -281,42 +280,88 @@ bool DaemonServer::handle_open(MMgrOpen *m)
     daemon_state.get(key)->perf_counters.clear();
   }
 
+  if (m->service_daemon) {
+    DaemonStatePtr daemon;
+    if (daemon_state.exists(key)) {
+      daemon = daemon_state.get(key);
+    } else {
+      dout(4) << "constructing new DaemonState for " << key << dendl;
+      daemon = std::make_shared<DaemonState>(daemon_state.types);
+      // FIXME: crap, we don't know the hostname at this stage.
+      daemon->key = key;
+      daemon_state.insert(daemon);
+    }
+    daemon->service_daemon = true;
+    daemon->metadata = m->daemon_metadata;
+    daemon->service_status = m->daemon_status;
+
+    utime_t now = ceph_clock_now();
+    auto d = pending_service_map.get_daemon(m->service_name,
+                                           m->daemon_name);
+    if (d->gid != (uint64_t)m->get_source().num()) {
+      dout(10) << "registering " << key << " in pending_service_map" << dendl;
+      d->gid = m->get_source().num();
+      d->addr = m->get_source_addr();
+      d->start_epoch = pending_service_map.epoch;
+      d->start_stamp = now;
+      d->metadata = m->daemon_metadata;
+      pending_service_map_dirty = pending_service_map.epoch;
+    }
+  }
+
   m->put();
   return true;
 }
 
 bool DaemonServer::handle_report(MMgrReport *m)
 {
-  uint32_t type = m->get_connection()->get_peer_type();
-  DaemonKey key(ceph_entity_type_name(type), m->daemon_name);
+  DaemonKey key;
+  if (!m->service_name.empty()) {
+    key.first = m->service_name;
+  } else {
+    key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+  }
+  key.second = m->daemon_name;
 
-  dout(4) << "from " << m->get_connection() << " name "
-          << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+  dout(4) << "from " << m->get_connection() << " " << key << dendl;
 
-  if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
-    // Clients should not be sending us stats
-    dout(4) << "rejecting report from client " << m->daemon_name << dendl;
+  if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
+      m->service_name.empty()) {
+    // Clients should not be sending us stats unless they are declaring
+    // themselves to be a daemon for some service.
+    dout(4) << "rejecting report from non-daemon client " << m->daemon_name
+           << dendl;
     m->put();
     return true;
   }
 
   DaemonStatePtr daemon;
   if (daemon_state.exists(key)) {
-    dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
+    dout(20) << "updating existing DaemonState for " << key << dendl;
     daemon = daemon_state.get(key);
   } else {
-    dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl;
+    dout(4) << "constructing new DaemonState for " << key << dendl;
     daemon = std::make_shared<DaemonState>(daemon_state.types);
     // FIXME: crap, we don't know the hostname at this stage.
     daemon->key = key;
     daemon_state.insert(daemon);
     // FIXME: we should request metadata at this stage
   }
-
   assert(daemon != nullptr);
   auto &daemon_counters = daemon->perf_counters;
   daemon_counters.update(m);
-  
+
+  if (daemon->service_daemon) {
+    utime_t now = ceph_clock_now();
+    if (m->daemon_status) {
+      daemon->service_status = *m->daemon_status;
+      daemon->service_status_stamp = now;
+    }
+    daemon->last_service_beacon = now;
+  } else if (m->daemon_status) {
+    derr << "got status from non-daemon " << key << dendl;
+  }
+
   m->put();
   return true;
 }
@@ -842,12 +887,62 @@ bool DaemonServer::handle_command(MCommand *m)
   }
 }
 
+void DaemonServer::_prune_pending_service_map()
+{
+  utime_t cutoff = ceph_clock_now();
+  cutoff -= g_conf->mgr_service_beacon_grace;
+  auto p = pending_service_map.services.begin();
+  while (p != pending_service_map.services.end()) {
+    auto q = p->second.daemons.begin();
+    while (q != p->second.daemons.end()) {
+      DaemonKey key(p->first, q->first);
+      if (!daemon_state.exists(key)) {
+       derr << "missing key " << key << dendl;
+       ++q;
+       continue;
+      }
+      auto daemon = daemon_state.get(key);
+      if (daemon->last_service_beacon == utime_t()) {
+       // we must have just restarted; assume they are alive now.
+       daemon->last_service_beacon = ceph_clock_now();
+       ++q;
+       continue;
+      }
+      if (daemon->last_service_beacon < cutoff) {
+       dout(10) << "pruning stale " << p->first << "." << q->first
+                << " last_beacon " << daemon->last_service_beacon << dendl;
+       q = p->second.daemons.erase(q);
+       pending_service_map_dirty = pending_service_map.epoch;
+      } else {
+       ++q;
+      }
+    }
+    if (p->second.daemons.empty()) {
+      p = pending_service_map.services.erase(p);
+      pending_service_map_dirty = pending_service_map.epoch;
+    } else {
+      ++p;
+    }
+  }
+}
+
 void DaemonServer::send_report()
 {
   auto m = new MMonMgrReport();
   cluster_state.with_pgmap([&](const PGMap& pg_map) {
       cluster_state.update_delta_stats();
 
+      if (pending_service_map.epoch) {
+       _prune_pending_service_map();
+       if (pending_service_map_dirty >= pending_service_map.epoch) {
+         pending_service_map.modified = ceph_clock_now();
+         ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
+         dout(10) << "sending service_map e" << pending_service_map.epoch
+                  << dendl;
+         pending_service_map.epoch++;
+       }
+      }
+
       // FIXME: reporting health detail here might be a bad idea?
       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
          // FIXME: no easy way to get mon features here.  this will do for
@@ -864,3 +959,39 @@ void DaemonServer::send_report()
   //       so, or the state is updated.
   monc->send_mon_message(m);
 }
+
+void DaemonServer::got_service_map()
+{
+  Mutex::Locker l(lock);
+
+  cluster_state.with_servicemap([&](const ServiceMap& service_map) {
+      if (pending_service_map.epoch == 0) {
+       // we just started up
+       dout(10) << "got initial map e" << service_map.epoch << dendl;
+       pending_service_map = service_map;
+      } else {
+       // we we already active and therefore must have persisted it,
+       // which means ours is the same or newer.
+       dout(10) << "got updated map e" << service_map.epoch << dendl;
+      }
+      pending_service_map.epoch = service_map.epoch + 1;
+    });
+
+  // cull missing daemons, populate new ones
+  for (auto& p : pending_service_map.services) {
+    std::set<std::string> names;
+    for (auto& q : p.second.daemons) {
+      names.insert(q.first);
+      DaemonKey key(p.first, q.first);
+      if (!daemon_state.exists(key)) {
+       auto daemon = std::make_shared<DaemonState>(daemon_state.types);
+       daemon->key = key;
+       daemon_state.insert(daemon);
+       daemon->metadata = q.second.metadata;
+       daemon->service_daemon = true;
+       dout(10) << "added missing " << key << dendl;
+      }
+    }
+    daemon_state.cull(p.first, names);
+  }
+}
index a1ed292e1a0c1a15cf5b7a52641c70da22fd58db..06ee68b8adc80cdca81fb3de10f90a2686944c3f 100644 (file)
@@ -27,6 +27,7 @@
 
 #include "auth/AuthAuthorizeHandler.h"
 
+#include "ServiceMap.h"
 #include "MgrSession.h"
 #include "DaemonState.h"
 
@@ -66,6 +67,9 @@ protected:
   /// connections for osds
   ceph::unordered_map<int,set<ConnectionRef>> osd_cons;
 
+  ServiceMap pending_service_map;  // uncommitted
+  epoch_t pending_service_map_dirty = 0;
+
   Mutex lock;
 
   static void _generate_command_map(map<string,cmd_vartype>& cmdmap,
@@ -83,6 +87,8 @@ private:
   bool _reply(MCommand* m,
              int ret, const std::string& s, const bufferlist& payload);
 
+  void _prune_pending_service_map();
+
 public:
   int init(uint64_t gid, entity_addr_t client_addr);
   void shutdown();
@@ -116,6 +122,7 @@ public:
   bool handle_report(MMgrReport *m);
   bool handle_command(MCommand *m);
   void send_report();
+  void got_service_map();
 };
 
 #endif
index bad83bb8fec7787c594a69884778172e2f8b1887..e75b968425e977c0532686b500846eb592ea7cab 100644 (file)
@@ -102,6 +102,12 @@ class DaemonState
   // The metadata (hostname, version, etc) sent from the daemon
   std::map<std::string, std::string> metadata;
 
+  // Ephemeral state
+  bool service_daemon = false;
+  utime_t service_status_stamp;
+  std::map<std::string, std::string> service_status;
+  utime_t last_service_beacon;
+
   // The perf counters received in MMgrReport messages
   DaemonPerfCounters perf_counters;
 
index e5ed5eedaa9e3d0480d27937ffe722ceb1e8ce09..c74324009533c23c2b6a8090577b4a127b54bbad 100644 (file)
@@ -30,6 +30,7 @@
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
 #include "messages/MLog.h"
+#include "messages/MServiceMap.h"
 
 #include "Mgr.h"
 
@@ -184,6 +185,7 @@ void Mgr::init()
   monc->sub_want("log-info", 0, 0);
   monc->sub_want("mgrdigest", 0, 0);
   monc->sub_want("fsmap", 0, 0);
+  monc->sub_want("servicemap", 0, 0);
 
   dout(4) << "waiting for OSDMap..." << dendl;
   // Subscribe to OSDMap update to pass on to ClusterState
@@ -457,6 +459,13 @@ void Mgr::handle_log(MLog *m)
   m->put();
 }
 
+void Mgr::handle_service_map(MServiceMap *m)
+{
+  dout(10) << "e" << m->service_map.epoch << dendl;
+  cluster_state.set_service_map(m->service_map);
+  server.got_service_map();
+}
+
 bool Mgr::ms_dispatch(Message *m)
 {
   dout(4) << *m << dendl;
@@ -485,6 +494,11 @@ bool Mgr::ms_dispatch(Message *m)
       objecter->maybe_request_map();
       m->put();
       break;
+    case MSG_SERVICE_MAP:
+      handle_service_map((MServiceMap*)m);
+      py_modules.notify_all("service_map", "");
+      m->put();
+      break;
     case MSG_LOG:
       handle_log(static_cast<MLog *>(m));
       break;
index 3638f5b16884f55e7860076b1de0a1713bc195ee..90472cfec640f537f9da68d487daa8784488c38c 100644 (file)
 class MCommand;
 class MMgrDigest;
 class MLog;
+class MServiceMap;
 class Objecter;
 class Client;
 
-
 class MgrPyModule;
 
 class Mgr {
@@ -84,6 +84,7 @@ public:
   void handle_fs_map(MFSMap* m);
   void handle_osd_map();
   void handle_log(MLog *m);
+  void handle_service_map(MServiceMap *m);
 
   bool got_mgr_map(const MgrMap& m);
 
index c7774959d22e0c3fd5ffef0ac205988bc08ec46a..624fc0f4f60504c67bd567d4daecbe1d1749edfa 100644 (file)
@@ -140,12 +140,15 @@ void MgrClient::reconnect()
   session.reset(new MgrSessionState());
   session->con = msgr->get_connection(inst);
 
+  if (service_daemon) {
+    daemon_dirty_status = true;
+  }
+
   // Don't send an open if we're just a client (i.e. doing
   // command-sending, not stats etc)
-  if (g_conf && !g_conf->name.is_client()) {
-    auto open = new MMgrOpen();
-    open->daemon_name = g_conf->name.get_id();
-    session->con->send_message(open);
+  if ((g_conf && !g_conf->name.is_client()) ||
+      service_daemon) {
+    _send_open();
   }
 
   // resend any pending commands
@@ -157,6 +160,24 @@ void MgrClient::reconnect()
   }
 }
 
+void MgrClient::_send_open()
+{
+  if (session && session->con) {
+    auto open = new MMgrOpen();
+    if (!service_name.empty()) {
+      open->service_name = service_name;
+      open->daemon_name = daemon_name;
+    } else {
+      open->daemon_name = g_conf->name.get_id();
+    }
+    if (service_daemon) {
+      open->service_daemon = service_daemon;
+      open->daemon_metadata = daemon_metadata;
+    }
+    session->con->send_message(open);
+  }
+}
+
 bool MgrClient::handle_mgr_map(MMgrMap *m)
 {
   assert(lock.is_locked_by_me());
@@ -250,7 +271,17 @@ void MgrClient::send_report()
 
   ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
 
-  report->daemon_name = g_conf->name.get_id();
+  if (daemon_name.size()) {
+    report->daemon_name = daemon_name;
+  } else {
+    report->daemon_name = g_conf->name.get_id();
+  }
+  report->service_name = service_name;
+
+  if (daemon_dirty_status) {
+    report->daemon_status = daemon_status;
+    daemon_dirty_status = false;
+  }
 
   session->con->send_message(report);
 
@@ -354,3 +385,44 @@ bool MgrClient::handle_command_reply(MCommandReply *m)
   return true;
 }
 
+int MgrClient::service_daemon_register(
+  const std::string& service,
+  const std::string& name,
+  const std::map<std::string,std::string>& metadata)
+{
+  Mutex::Locker l(lock);
+  if (name == "osd" ||
+      name == "mds" ||
+      name == "client" ||
+      name == "mon" ||
+      name == "mgr") {
+    // normal ceph entity types are not allowed!
+    return -EINVAL;
+  }
+  if (service_daemon) {
+    return -EEXIST;
+  }
+  ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
+  service_daemon = true;
+  service_name = service;
+  daemon_name = name;
+  daemon_metadata = metadata;
+  daemon_dirty_status = true;
+
+  // late register?
+  if (g_conf->name.is_client() && session && session->con) {
+    _send_open();
+  }
+
+  return 0;
+}
+
+int MgrClient::service_daemon_update_status(
+  const std::map<std::string,std::string>& status)
+{
+  Mutex::Locker l(lock);
+  ldout(cct,10) << status << dendl;
+  daemon_status = status;
+  daemon_dirty_status = true;
+  return 0;
+}
index 87df28650a2d7cc753d5351568ada304c35dae94..09fe831b3948364988c6c15a8d678eded1e2cf7e 100644 (file)
@@ -72,7 +72,15 @@ protected:
   // our reports (hook for use by OSD)
   std::function<MPGStats*()> pgstats_cb;
 
+  // for service registration and beacon
+  bool service_daemon = false;
+  bool daemon_dirty_status = false;
+  std::string service_name, daemon_name;
+  std::map<std::string,std::string> daemon_metadata;
+  std::map<std::string,std::string> daemon_status;
+
   void reconnect();
+  void _send_open();
 
 public:
   MgrClient(CephContext *cct_, Messenger *msgr_);
@@ -103,6 +111,13 @@ public:
   int start_command(const vector<string>& cmd, const bufferlist& inbl,
                    bufferlist *outbl, string *outs,
                    Context *onfinish);
+
+  int service_daemon_register(
+    const std::string& service,
+    const std::string& name,
+    const std::map<std::string,std::string>& metadata);
+  int service_daemon_update_status(
+    const std::map<std::string,std::string>& status);
 };
 
 #endif