]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr, mon: allow normal ceph services to register with manager
authorVenky Shankar <vshankar@redhat.com>
Tue, 2 Jul 2019 08:11:35 +0000 (04:11 -0400)
committerVenky Shankar <vshankar@redhat.com>
Thu, 26 Mar 2020 02:44:12 +0000 (22:44 -0400)
Additionally, introduce `task status` field in manager report
messages to forward status of executing tasks in daemons (e.g.,
status of executing scrubs in ceph metadata servers).

`task status` makes its way upto service map which is then used
to display the relevant information in ceph status.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit 5c25a018643b10aa78db8270cae1476f71d8f4f4)

 Conflicts:
src/messages/MMgrReport.h
src/mgr/DaemonServer.cc
src/mgr/ServiceMap.h

src/messages/MMgrReport.h
src/mgr/DaemonServer.cc
src/mgr/DaemonState.h
src/mgr/MgrClient.cc
src/mgr/MgrClient.h
src/mgr/ServiceMap.cc
src/mgr/ServiceMap.h
src/mon/Monitor.cc

index dcfbf658546ab4bc095598b0171da1f472aa9605..84c96e7f200f3e711eef74f95b5d34a9faa9cb1a 100644 (file)
@@ -75,8 +75,7 @@ class MMgrReport : public MessageInstance<MMgrReport> {
 public:
   friend factory;
 private:
-
-  static constexpr int HEAD_VERSION = 7;
+  static constexpr int HEAD_VERSION = 8;
   static constexpr int COMPAT_VERSION = 1;
 
 public:
@@ -101,6 +100,7 @@ public:
 
   // for service registration
   boost::optional<std::map<std::string,std::string>> daemon_status;
+  boost::optional<std::map<std::string,std::string>> task_status;
 
   std::vector<DaemonHealthMetric> daemon_health_metrics;
 
@@ -130,6 +130,9 @@ public:
     if (header.version >= 7) {
       decode(osd_perf_metric_reports, p);
     }
+    if (header.version >= 8) {
+      decode(task_status, p);
+    }
   }
 
   void encode_payload(uint64_t features) override {
@@ -143,6 +146,7 @@ public:
     encode(daemon_health_metrics, payload);
     encode(config_bl, payload);
     encode(osd_perf_metric_reports, payload);
+    encode(task_status, payload);
   }
 
   std::string_view get_type_name() const override { return "mgrreport"; }
@@ -163,6 +167,9 @@ public:
     if (!daemon_health_metrics.empty()) {
       out << " daemon_metrics=" << daemon_health_metrics.size();
     }
+    if (task_status) {
+      out << " task_status=" << task_status->size();
+    }
     out << ")";
   }
 
index e0da9d03bd544a379c6211f8aa2ca7a111c4e7be..61a0950391be6e23ab8e8747fcb4d1ec639c5746 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
 
-
+namespace {
+  template <typename Map>
+  bool map_compare(Map const &lhs, Map const &rhs) {
+    return lhs.size() == rhs.size()
+      && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
+                    [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
+  }
+}
 
 DaemonServer::DaemonServer(MonClient *monc_,
                            Finisher &finisher_,
@@ -457,13 +464,14 @@ bool DaemonServer::handle_open(MMgrOpen *m)
     std::lock_guard l(daemon->lock);
     daemon->perf_counters.clear();
 
+    daemon->service_daemon = m->service_daemon;
     if (m->service_daemon) {
       daemon->service_status = m->daemon_status;
 
       utime_t now = ceph_clock_now();
       auto d = pending_service_map.get_daemon(m->service_name,
                                              m->daemon_name);
-      if (d->gid != (uint64_t)m->get_source().num()) {
+      if (!d->gid || d->gid != (uint64_t)m->get_source().num()) {
        dout(10) << "registering " << key << " in pending_service_map" << dendl;
        d->gid = m->get_source().num();
        d->addr = m->get_source_addr();
@@ -549,98 +557,114 @@ bool DaemonServer::handle_report(MMgrReport *m)
     return true;
   }
 
-  // Look up the DaemonState
-  DaemonStatePtr daemon;
-  if (daemon_state.exists(key)) {
-    dout(20) << "updating existing DaemonState for " << key << dendl;
-    daemon = daemon_state.get(key);
-  } else {
-    // we don't know the hostname at this stage, reject MMgrReport here.
-    dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
-           << dendl;
 
-    // issue metadata request in background
-    if (!daemon_state.is_updating(key) && 
-       (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
+  {
+    lock.lock();
+
+    DaemonStatePtr daemon;
+    // Look up the DaemonState
+    if (daemon_state.exists(key)) {
+      dout(20) << "updating existing DaemonState for " << key << dendl;
+      daemon = daemon_state.get(key);
+    } else {
+      lock.unlock();
 
-      std::ostringstream oss;
-      auto c = new MetadataUpdate(daemon_state, key);
-      if (key.first == "osd") {
-        oss << "{\"prefix\": \"osd metadata\", \"id\": "
-            << key.second<< "}";
+      // we don't know the hostname at this stage, reject MMgrReport here.
+      dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
+              << dendl;
+      // issue metadata request in background
+      if (!daemon_state.is_updating(key) && 
+          (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
 
-      } else if (key.first == "mds") {
-        c->set_default("addr", stringify(m->get_source_addr()));
-        oss << "{\"prefix\": \"mds metadata\", \"who\": \""
-            << key.second << "\"}";
+        std::ostringstream oss;
+        auto c = new MetadataUpdate(daemon_state, key);
+        if (key.first == "osd") {
+          oss << "{\"prefix\": \"osd metadata\", \"id\": "
+              << key.second<< "}";
+
+        } else if (key.first == "mds") {
+          c->set_default("addr", stringify(m->get_source_addr()));
+          oss << "{\"prefix\": \"mds metadata\", \"who\": \""
+              << key.second << "\"}";
  
-      } else if (key.first == "mon") {
-        oss << "{\"prefix\": \"mon metadata\", \"id\": \""
-            << key.second << "\"}";
-      } else {
-       ceph_abort();
+        } else if (key.first == "mon") {
+          oss << "{\"prefix\": \"mon metadata\", \"id\": \""
+              << key.second << "\"}";
+        } else {
+          ceph_abort();
+        }
+
+        monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
       }
 
-      monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
-    }
-    
-    {
-      std::lock_guard l(lock);
+      lock.lock();
+
       // kill session
       auto priv = m->get_connection()->get_priv();
       auto session = static_cast<MgrSession*>(priv.get());
       if (!session) {
-       return false;
+        return false;
       }
       m->get_connection()->mark_down();
 
       dout(10) << "unregistering osd." << session->osd_id
-              << "  session " << session << " con " << m->get_connection() << dendl;
+               << "  session " << session << " con " << m->get_connection() << dendl;
       
       if (osd_cons.find(session->osd_id) != osd_cons.end()) {
-          osd_cons[session->osd_id].erase(m->get_connection());
-      } 
+        osd_cons[session->osd_id].erase(m->get_connection());
+      }
 
       auto iter = daemon_connections.find(m->get_connection());
       if (iter != daemon_connections.end()) {
-       daemon_connections.erase(iter);
+        daemon_connections.erase(iter);
       }
-    }
-
-    return false;
-  }
-
-  // Update the DaemonState
-  ceph_assert(daemon != nullptr);
-  {
-    std::lock_guard l(daemon->lock);
-    auto &daemon_counters = daemon->perf_counters;
-    daemon_counters.update(m);
 
-    auto p = m->config_bl.cbegin();
-    if (p != m->config_bl.end()) {
-      decode(daemon->config, p);
-      decode(daemon->ignored_mon_config, p);
-      dout(20) << " got config " << daemon->config
-              << " ignored " << daemon->ignored_mon_config << dendl;
+      lock.unlock();
+      return false;
     }
 
-    if (daemon->service_daemon) {
-      utime_t now = ceph_clock_now();
-      if (m->daemon_status) {
-        daemon->service_status = *m->daemon_status;
-        daemon->service_status_stamp = now;
+    // Update the DaemonState
+    ceph_assert(daemon != nullptr);
+    {
+      std::lock_guard l(daemon->lock);
+      auto &daemon_counters = daemon->perf_counters;
+      daemon_counters.update(m);
+
+      auto p = m->config_bl.cbegin();
+      if (p != m->config_bl.end()) {
+        decode(daemon->config, p);
+        decode(daemon->ignored_mon_config, p);
+        dout(20) << " got config " << daemon->config
+                 << " ignored " << daemon->ignored_mon_config << dendl;
+      }
+
+      if (daemon->service_daemon) {
+        utime_t now = ceph_clock_now();
+        if (m->daemon_status) {
+          daemon->service_status_stamp = now;
+          daemon->service_status = *m->daemon_status;
+        }
+        if (m->task_status && !map_compare(daemon->task_status, *m->task_status)) {
+          auto d = pending_service_map.get_daemon(m->service_name, m->daemon_name);
+          if (d->gid) {
+            daemon->task_status = *m->task_status;
+            d->task_status = *m->task_status;
+            pending_service_map_dirty = pending_service_map.epoch;
+          }
+        }
+        daemon->last_service_beacon = now;
+      } else if (m->daemon_status) {
+        derr << "got status from non-daemon " << key << dendl;
+      }
+      if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
+        // only OSD and MON send health_checks to me now
+        daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
+        dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
+                 << dendl;
       }
-      daemon->last_service_beacon = now;
-    } else if (m->daemon_status) {
-      derr << "got status from non-daemon " << key << dendl;
-    }
-    if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
-      // only OSD and MON send health_checks to me now
-      daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
-      dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
-              << dendl;
     }
+
+    lock.unlock();
   }
 
   // if there are any schema updates, notify the python modules
index 0661f61a01ce17ded4e9347f757dab8c5070bc4e..a8c878c73f4ed12a7bab143ea063b85b116e7fdd 100644 (file)
@@ -147,6 +147,7 @@ class DaemonState
   bool service_daemon = false;
   utime_t service_status_stamp;
   std::map<std::string, std::string> service_status;
+  std::map<std::string, std::string> task_status;
   utime_t last_service_beacon;
 
   // running config
index 4737ea428d547f411d6e1daee899f20a5e4bc821..0d768dd216652f9d9ffb4ad3c7b32236824d6ac8 100644 (file)
@@ -345,6 +345,11 @@ void MgrClient::_send_report()
     daemon_dirty_status = false;
   }
 
+  if (task_dirty_status) {
+    report->task_status = task_status;
+    task_dirty_status = false;
+  }
+
   report->daemon_health_metrics = std::move(daemon_health_metrics);
 
   cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
@@ -480,14 +485,6 @@ int MgrClient::service_daemon_register(
   const std::map<std::string,std::string>& metadata)
 {
   std::lock_guard l(lock);
-  if (service == "osd" ||
-      service == "mds" ||
-      service == "client" ||
-      service == "mon" ||
-      service == "mgr") {
-    // normal ceph entity types are not allowed!
-    return -EINVAL;
-  }
   if (service_daemon) {
     return -EEXIST;
   }
@@ -516,6 +513,15 @@ int MgrClient::service_daemon_update_status(
   return 0;
 }
 
+int MgrClient::service_daemon_update_task_status(
+  std::map<std::string,std::string> &&status) {
+  std::lock_guard l(lock);
+  ldout(cct,10) << status << dendl;
+  task_status = std::move(status);
+  task_dirty_status = true;
+  return 0;
+}
+
 void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
 {
   std::lock_guard l(lock);
index 09764afaf8cce679bda05f67751e6cf0b61ebb3a..35c36e23fd8c72661c0433a2e0b5fc6ffb1548b1 100644 (file)
@@ -87,9 +87,11 @@ protected:
   // for service registration and beacon
   bool service_daemon = false;
   bool daemon_dirty_status = false;
+  bool task_dirty_status = false;
   std::string service_name, daemon_name;
   std::map<std::string,std::string> daemon_metadata;
   std::map<std::string,std::string> daemon_status;
+  std::map<std::string,std::string> task_status;
   std::vector<DaemonHealthMetric> daemon_health_metrics;
 
   void reconnect();
@@ -147,6 +149,8 @@ public:
     const std::map<std::string,std::string>& metadata);
   int service_daemon_update_status(
     std::map<std::string,std::string>&& status);
+  int service_daemon_update_task_status(
+    std::map<std::string,std::string> &&task_status);
   void update_daemon_health(std::vector<DaemonHealthMetric>&& metrics);
 
 private:
index ba7a43b24a8f4250cb022e32e195901d186db6b6..eca0370095585c191b639c1e0e4f83cfc15294e6 100644 (file)
@@ -9,23 +9,27 @@
 
 void ServiceMap::Daemon::encode(bufferlist& bl, uint64_t features) const
 {
-  ENCODE_START(1, 1, bl);
+  ENCODE_START(2, 1, bl);
   encode(gid, bl);
   encode(addr, bl, features);
   encode(start_epoch, bl);
   encode(start_stamp, bl);
   encode(metadata, bl);
+  encode(task_status, bl);
   ENCODE_FINISH(bl);
 }
 
 void ServiceMap::Daemon::decode(bufferlist::const_iterator& p)
 {
-  DECODE_START(1, p);
+  DECODE_START(2, p);
   decode(gid, p);
   decode(addr, p);
   decode(start_epoch, p);
   decode(start_stamp, p);
   decode(metadata, p);
+  if (struct_v >= 2) {
+    decode(task_status, p);
+  }
   DECODE_FINISH(p);
 }
 
@@ -33,13 +37,18 @@ void ServiceMap::Daemon::dump(Formatter *f) const
 {
   f->dump_unsigned("start_epoch", start_epoch);
   f->dump_stream("start_stamp") << start_stamp;
-  f->dump_unsigned("gid", gid);
+  f->dump_unsigned("gid", *gid);
   f->dump_string("addr", addr.get_legacy_str());
   f->open_object_section("metadata");
   for (auto& p : metadata) {
     f->dump_string(p.first.c_str(), p.second);
   }
   f->close_section();
+  f->open_object_section("task_status");
+  for (auto& p : task_status) {
+    f->dump_string(p.first.c_str(), p.second);
+  }
+  f->close_section();
 }
 
 void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
@@ -48,6 +57,7 @@ void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
   ls.push_back(new Daemon);
   ls.back()->gid = 222;
   ls.back()->metadata["this"] = "that";
+  ls.back()->task_status["task1"] = "running";
 }
 
 // Service
index d2a1c9b37c140b5d1a77b9e31a2b25a81d5950d6..3d226dd5454bbcdb9bac086ca82382ed4834e642 100644 (file)
 #include "include/buffer.h"
 #include "msg/msg_types.h"
 
+#include <boost/optional.hpp>
+
 namespace ceph {
   class Formatter;
 }
 
 struct ServiceMap {
   struct Daemon {
-    uint64_t gid = 0;
+    boost::optional<uint64_t> gid;
     entity_addr_t addr;
     epoch_t start_epoch = 0;   ///< epoch first registered
     utime_t start_stamp;       ///< timestamp daemon started/registered
     std::map<std::string,std::string> metadata;  ///< static metadata
+    std::map<std::string,std::string> task_status; ///< running task status
 
     void encode(bufferlist& bl, uint64_t features) const;
     void decode(bufferlist::const_iterator& p);
@@ -64,7 +67,34 @@ struct ServiceMap {
       return ss.str();
     }
 
-    void count_metadata(const string& field,
+    std::string get_task_summary(const std::string_view task_prefix) const {
+      // contruct a map similar to:
+      //     {"service1 status" -> {"service1.0" -> "running"}}
+      //     {"service2 status" -> {"service2.0" -> "idle"},
+      //                           {"service2.1" -> "running"}}
+      std::map<std::string, std::map<std::string, std::string>> by_task;
+      for (const auto &p : daemons) {
+        std::stringstream d;
+        d << task_prefix << "." << p.first;
+        for (const auto &q : p.second.task_status) {
+          auto p1 = by_task.emplace(q.first, std::map<std::string, std::string>{}).first;
+          auto p2 = p1->second.emplace(d.str(), std::string()).first;
+          p2->second = q.second;
+        }
+      }
+
+      std::stringstream ss;
+      for (const auto &p : by_task) {
+        ss << "\n    " << p.first << ":";
+        for (auto q : p.second) {
+          ss << "\n        " << q.first << ": " << q.second;
+        }
+      }
+
+      return ss.str();
+    }
+
+    void count_metadata(const std::string& field,
                        std::map<std::string,int> *out) const {
       for (auto& p : daemons) {
        auto q = p.second.metadata.find(field);
index b90901d6f48edd2ca66dea24573eb16bd83daa95..14d9a9b88fbc3b0fe44737d3cb4f974560963cbf 100644 (file)
@@ -3010,11 +3010,32 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
       osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
       ss << "\n";
       for (auto& p : service_map.services) {
+        const std::string &service = p.first;
+        // filter out normal ceph entity types
+        if (service == "osd" ||
+            service == "client" ||
+            service == "mon" ||
+            service == "mds" ||
+            service == "mgr") {
+          continue;
+        }
        ss << "    " << p.first << ": " << string(maxlen - p.first.size(), ' ')
           << p.second.get_summary() << "\n";
       }
     }
 
+    {
+      auto& service_map = mgrstatmon()->get_service_map();
+      if (!service_map.services.empty()) {
+        ss << "\n \n  task status:\n";
+        {
+          for (auto &p : service_map.services) {
+            ss << p.second.get_task_summary(p.first);
+          }
+        }
+      }
+    }
+
     ss << "\n \n  data:\n";
     mgrstatmon()->print_summary(NULL, &ss);