]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: handle PGStats with a PGMap
authorJohn Spray <john.spray@redhat.com>
Sun, 31 Jul 2016 17:07:20 +0000 (18:07 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 29 Sep 2016 16:27:02 +0000 (17:27 +0100)
No longer need the mon to send us
the pg_summary json hack.

Signed-off-by: John Spray <john.spray@redhat.com>
src/CMakeLists.txt
src/ceph_mgr.cc
src/mgr/ClusterState.cc
src/mgr/ClusterState.h
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/Mgr.cc
src/mgr/MgrStandby.cc
src/mgr/PyModules.cc

index 92a19e6295d7f9fd1847a1442a3ade5ad4f73a3e..5428d264ca7a64df2073450a3679f2c1246cefb0 100644 (file)
@@ -517,6 +517,7 @@ add_subdirectory(libradosstriper)
 if (WITH_MGR)
   set(mgr_srcs
       ceph_mgr.cc
+      mon/PGMap.cc
       mgr/DaemonState.cc
       mgr/DaemonServer.cc
       mgr/ClusterState.cc
index c3d032c132685c0f040d82fe22475cbbd6d99edb..9365fb83fd176610e7192c662dd3092af2047334 100644 (file)
@@ -14,6 +14,7 @@
  *
  */
 
+#include <Python.h>
 
 #include "include/types.h"
 #include "common/config.h"
index 1e16810799f8b6a55c217d98dfe9f02059382e22..e5d29a707b30e437dc99de78b4616787f3d9b429 100644 (file)
  */
 
 #include "messages/MMgrDigest.h"
+#include "messages/MPGStats.h"
 
 #include "mgr/ClusterState.h"
 
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
 
 ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_)
   : monc(monc_), objecter(objecter_), lock("ClusterState")
@@ -36,8 +40,298 @@ void ClusterState::set_fsmap(FSMap const &new_fsmap)
 
 void ClusterState::load_digest(MMgrDigest *m)
 {
-  pg_summary_json = std::move(m->pg_summary_json);
   health_json = std::move(m->health_json);
   mon_status_json = std::move(m->mon_status_json);
 }
 
+void ClusterState::ingest_pgstats(MPGStats *stats)
+{
+  Mutex::Locker l(lock);
+  PGMap::Incremental pending_inc;
+  pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+
+  const int from = stats->get_orig_source().num();
+  bool is_in = false;
+  objecter->with_osdmap([&is_in, from](const OSDMap &osd_map){
+      is_in = osd_map.is_in(from);
+  });
+
+  if (is_in) {
+    pending_inc.update_stat(from, stats->epoch, stats->osd_stat);
+  } else {
+    pending_inc.update_stat(from, stats->epoch, osd_stat_t());
+  }
+
+  for (auto p : stats->pg_stat) {
+    pg_t pgid = p.first;
+    const auto &pg_stats = p.second;
+
+    // In case we're hearing about a PG that according to last
+    // OSDMap update should not exist
+    if (pg_map.pg_stat.count(pgid) == 0) {
+      dout(15) << " got " << pgid << " reported at " << pg_stats.reported_epoch << ":"
+               << pg_stats.reported_seq
+               << " state " << pg_state_string(pg_stats.state)
+               << " but DNE in pg_map; pool was probably deleted."
+               << dendl;
+      continue;
+    }
+
+    // In case we already heard about more recent stats from this PG
+    // from another OSD
+    if (pg_map.pg_stat.count(pgid) &&
+        pg_map.pg_stat[pgid].get_version_pair() > pg_stats.get_version_pair()) {
+      dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":"
+               << pg_map.pg_stat[pgid].reported_seq << dendl;
+      continue;
+    }
+
+    pending_inc.pg_stat_updates[pgid] = pg_stats;
+  }
+
+  pg_map.apply_incremental(g_ceph_context, pending_inc);
+}
+
+void ClusterState::notify_osdmap(const OSDMap &osd_map)
+{
+  Mutex::Locker l(lock);
+
+  PGMap::Incremental pending_inc;
+  pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+
+  _update_creating_pgs(osd_map, &pending_inc);
+  _register_new_pgs(osd_map, &pending_inc);
+
+  pg_map.apply_incremental(g_ceph_context, pending_inc);
+
+  // TODO: Reinstate check_down_pgs logic?
+}
+
+void ClusterState::_register_new_pgs(
+    const OSDMap &osd_map,
+    PGMap::Incremental *pending_inc)
+{
+  // iterate over crush mapspace
+  epoch_t epoch = osd_map.get_epoch();
+  dout(10) << "checking pg pools for osdmap epoch " << epoch
+           << ", last_pg_scan " << pg_map.last_pg_scan << dendl;
+
+  int created = 0;
+  for (const auto & p : osd_map.pools) {
+    int64_t poolid = p.first;
+    const pg_pool_t &pool = p.second;
+
+    int ruleno = osd_map.crush->find_rule(pool.get_crush_ruleset(),
+                                          pool.get_type(), pool.get_size());
+    if (ruleno < 0 || !osd_map.crush->rule_exists(ruleno))
+      continue;
+
+    if (pool.get_last_change() <= pg_map.last_pg_scan ||
+        pool.get_last_change() <= pending_inc->pg_scan) {
+      dout(10) << " no change in pool " << poolid << " " << pool << dendl;
+      continue;
+    }
+
+    dout(10) << "scanning pool " << poolid
+             << " " << pool << dendl;
+
+    // first pgs in this pool
+    bool new_pool = pg_map.pg_pool_sum.count(poolid) == 0;
+
+    for (ps_t ps = 0; ps < pool.get_pg_num(); ps++) {
+      pg_t pgid(ps, poolid, -1);
+      if (pg_map.pg_stat.count(pgid)) {
+       dout(20) << "register_new_pgs  have " << pgid << dendl;
+       continue;
+      }
+      created++;
+      _register_pg(osd_map, pgid, pool.get_last_change(), new_pool,
+          pending_inc);
+    }
+  }
+
+  int removed = 0;
+  for (const auto &p : pg_map.creating_pgs) {
+    if (p.preferred() >= 0) {
+      dout(20) << " removing creating_pg " << p
+               << " because it is localized and obsolete" << dendl;
+      pending_inc->pg_remove.insert(p);
+      removed++;
+    }
+    if (!osd_map.have_pg_pool(p.pool())) {
+      dout(20) << " removing creating_pg " << p
+               << " because containing pool deleted" << dendl;
+      pending_inc->pg_remove.insert(p);
+      ++removed;
+    }
+  }
+
+  // deleted pools?
+  for (const auto & p : pg_map.pg_stat) {
+    if (!osd_map.have_pg_pool(p.first.pool())) {
+      dout(20) << " removing pg_stat " << p.first << " because "
+               << "containing pool deleted" << dendl;
+      pending_inc->pg_remove.insert(p.first);
+      ++removed;
+    }
+    if (p.first.preferred() >= 0) {
+      dout(20) << " removing localized pg " << p.first << dendl;
+      pending_inc->pg_remove.insert(p.first);
+      ++removed;
+    }
+  }
+
+  // we don't want to redo this work if we can avoid it.
+  pending_inc->pg_scan = epoch;
+
+  dout(10) << "register_new_pgs registered " << created << " new pgs, removed "
+           << removed << " uncreated pgs" << dendl;
+}
+
+void ClusterState::_register_pg(
+    const OSDMap &osd_map,
+    pg_t pgid, epoch_t epoch,
+    bool new_pool,
+    PGMap::Incremental *pending_inc)
+{
+  pg_t parent;
+  int split_bits = 0;
+  bool parent_found = false;
+  if (!new_pool) {
+    parent = pgid;
+    while (1) {
+      // remove most significant bit
+      int msb = cbits(parent.ps());
+      if (!msb)
+       break;
+      parent.set_ps(parent.ps() & ~(1<<(msb-1)));
+      split_bits++;
+      dout(30) << " is " << pgid << " parent " << parent << " ?" << dendl;
+      if (pg_map.pg_stat.count(parent) &&
+          pg_map.pg_stat[parent].state != PG_STATE_CREATING) {
+       dout(10) << "  parent is " << parent << dendl;
+       parent_found = true;
+       break;
+      }
+    }
+  }
+
+  pg_stat_t &stats = pending_inc->pg_stat_updates[pgid];
+  stats.state = PG_STATE_CREATING;
+  stats.created = epoch;
+  stats.parent = parent;
+  stats.parent_split_bits = split_bits;
+  stats.mapping_epoch = epoch;
+
+  if (parent_found) {
+    pg_stat_t &ps = pg_map.pg_stat[parent];
+    stats.last_fresh = ps.last_fresh;
+    stats.last_active = ps.last_active;
+    stats.last_change = ps.last_change;
+    stats.last_peered = ps.last_peered;
+    stats.last_clean = ps.last_clean;
+    stats.last_unstale = ps.last_unstale;
+    stats.last_undegraded = ps.last_undegraded;
+    stats.last_fullsized = ps.last_fullsized;
+    stats.last_scrub_stamp = ps.last_scrub_stamp;
+    stats.last_deep_scrub_stamp = ps.last_deep_scrub_stamp;
+    stats.last_clean_scrub_stamp = ps.last_clean_scrub_stamp;
+  } else {
+    utime_t now = ceph_clock_now(g_ceph_context);
+    stats.last_fresh = now;
+    stats.last_active = now;
+    stats.last_change = now;
+    stats.last_peered = now;
+    stats.last_clean = now;
+    stats.last_unstale = now;
+    stats.last_undegraded = now;
+    stats.last_fullsized = now;
+    stats.last_scrub_stamp = now;
+    stats.last_deep_scrub_stamp = now;
+    stats.last_clean_scrub_stamp = now;
+  }
+
+  osd_map.pg_to_up_acting_osds(
+    pgid,
+    &stats.up,
+    &stats.up_primary,
+    &stats.acting,
+    &stats.acting_primary);
+
+  if (split_bits == 0) {
+    dout(10) << " will create " << pgid
+             << " primary " << stats.acting_primary
+             << " acting " << stats.acting
+             << dendl;
+  } else {
+    dout(10) << " will create " << pgid
+             << " primary " << stats.acting_primary
+             << " acting " << stats.acting
+             << " parent " << parent
+             << " by " << split_bits << " bits"
+             << dendl;
+  }
+}
+
+// This was PGMonitor::map_pg_creates
+void ClusterState::_update_creating_pgs(
+    const OSDMap &osd_map,
+    PGMap::Incremental *pending_inc)
+{
+  assert(pending_inc != nullptr);
+
+  dout(10) << "to " << pg_map.creating_pgs.size()
+           << " pgs, osdmap epoch " << osd_map.get_epoch()
+           << dendl;
+
+  for (set<pg_t>::const_iterator p = pg_map.creating_pgs.begin();
+       p != pg_map.creating_pgs.end();
+       ++p) {
+    pg_t pgid = *p;
+    pg_t on = pgid;
+    ceph::unordered_map<pg_t,pg_stat_t>::const_iterator q =
+      pg_map.pg_stat.find(pgid);
+    assert(q != pg_map.pg_stat.end());
+    const pg_stat_t *s = &q->second;
+
+    if (s->parent_split_bits)
+      on = s->parent;
+
+    vector<int> up, acting;
+    int up_primary, acting_primary;
+    osd_map.pg_to_up_acting_osds(
+      on,
+      &up,
+      &up_primary,
+      &acting,
+      &acting_primary);
+
+    if (up != s->up ||
+        up_primary != s->up_primary ||
+        acting !=  s->acting ||
+        acting_primary != s->acting_primary) {
+      pg_stat_t *ns = &pending_inc->pg_stat_updates[pgid];
+      dout(20) << pgid << " "
+               << " acting_primary: " << s->acting_primary
+               << " -> " << acting_primary
+               << " acting: " << s->acting << " -> " << acting
+               << " up_primary: " << s->up_primary << " -> " << up_primary
+               << " up: " << s->up << " -> " << up
+               << dendl;
+
+      // only initialize if it wasn't already a pending update
+      if (ns->reported_epoch == 0)
+        *ns = *s;
+
+      // note epoch if the target of the create message changed
+      if (acting_primary != ns->acting_primary)
+        ns->mapping_epoch = osd_map.get_epoch();
+
+      ns->up = up;
+      ns->up_primary = up_primary;
+      ns->acting = acting;
+      ns->acting_primary = acting_primary;
+    }
+  }
+}
+
index 44f69074daec0c18c5bea5d637af5926cedc9199..bd9ee4d06dd4b2f1c6f6a3b2718cb86580f5bd4e 100644 (file)
 
 #include "osdc/Objecter.h"
 #include "mon/MonClient.h"
+#include "mon/PGMap.h"
 
 class MMgrDigest;
+class MPGStats;
 
 
 /**
@@ -35,15 +37,30 @@ protected:
   FSMap fsmap;
   Mutex lock;
 
-  bufferlist pg_summary_json;
+  PGMap pg_map;
+
   bufferlist health_json;
   bufferlist mon_status_json;
 
+  void _update_creating_pgs(
+      const OSDMap &osd_map,
+      PGMap::Incremental *pending_inc);
+
+  void _register_pg(
+      const OSDMap &osd_map,
+      pg_t pgid, epoch_t epoch,
+      bool new_pool,
+      PGMap::Incremental *pending_inc);
+
+  void _register_new_pgs(
+      const OSDMap &osd_map,
+      PGMap::Incremental *pending_inc);
+
 public:
 
   void load_digest(MMgrDigest *m);
+  void ingest_pgstats(MPGStats *stats);
 
-  const bufferlist &get_pg_summary() const {return pg_summary_json;}
   const bufferlist &get_health() const {return health_json;}
   const bufferlist &get_mon_status() const {return mon_status_json;}
 
@@ -52,12 +69,22 @@ public:
   void set_objecter(Objecter *objecter_);
   void set_fsmap(FSMap const &new_fsmap);
 
+  void notify_osdmap(const OSDMap &osd_map);
+
   template<typename Callback, typename...Args>
   void with_fsmap(Callback&& cb, Args&&...args)
   {
-  Mutex::Locker l(lock);
-  std::forward<Callback>(cb)(const_cast<const FSMap&>(fsmap),
-      std::forward<Args>(args)...);
+    Mutex::Locker l(lock);
+    std::forward<Callback>(cb)(const_cast<const FSMap&>(fsmap),
+        std::forward<Args>(args)...);
+  }
+
+  template<typename Callback, typename...Args>
+  void with_pgmap(Callback&& cb, Args&&...args)
+  {
+    Mutex::Locker l(lock);
+    std::forward<Callback>(cb)(const_cast<const PGMap&>(pg_map),
+        std::forward<Args>(args)...);
   }
 
   template<typename Callback, typename...Args>
index a27955123778b036a3a36ebae7e2c4910e0b5485..cddbf0a69fb40e7b9731b5bc21b29eb0f94957f5 100644 (file)
@@ -17,6 +17,7 @@
 #include "messages/MMgrConfigure.h"
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
+#include "messages/MPGStats.h"
 
 #define dout_subsys ceph_subsys_mgr
 #undef dout_prefix
 
 DaemonServer::DaemonServer(MonClient *monc_,
   DaemonStateIndex &daemon_state_,
+  ClusterState &cluster_state_,
   PyModules &py_modules_)
     : Dispatcher(g_ceph_context), msgr(nullptr), monc(monc_),
       daemon_state(daemon_state_),
+      cluster_state(cluster_state_),
       py_modules(py_modules_),
       auth_registry(g_ceph_context,
                     g_conf->auth_supported.empty() ?
@@ -120,6 +123,10 @@ bool DaemonServer::ms_dispatch(Message *m)
   Mutex::Locker l(lock);
 
   switch(m->get_type()) {
+    case MSG_PGSTATS:
+      cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
+      m->put();
+      return true;
     case MSG_MGR_REPORT:
       return handle_report(static_cast<MMgrReport*>(m));
     case MSG_MGR_OPEN:
index f37e7f015b624b999c431ec043252b8b5df63099..2540e38b1ec3532b1c9dfcd10a9ee40c4f65f0f4 100644 (file)
@@ -43,6 +43,7 @@ protected:
   Messenger *msgr;
   MonClient *monc;
   DaemonStateIndex &daemon_state;
+  ClusterState &cluster_state;
   PyModules &py_modules;
 
   AuthAuthorizeHandlerRegistry auth_registry;
@@ -58,6 +59,7 @@ public:
 
   DaemonServer(MonClient *monc_,
       DaemonStateIndex &daemon_state_,
+      ClusterState &cluster_state_,
       PyModules &py_modules_);
   ~DaemonServer();
 
index 5328270d9545f81c3048372bba06d66c93df3539..2a1cd246e51b2e614a7c085f91b10b87b251c98c 100644 (file)
@@ -11,6 +11,8 @@
  * Foundation.  See file COPYING.
  */
 
+#include <Python.h>
+
 #include "osdc/Objecter.h"
 #include "common/errno.h"
 #include "mon/MonClient.h"
@@ -44,7 +46,7 @@ Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_) :
   waiting_for_fs_map(NULL),
   py_modules(daemon_state, cluster_state, *monc, finisher),
   cluster_state(monc, nullptr),
-  server(monc, daemon_state, py_modules),
+  server(monc, daemon_state, cluster_state, py_modules),
   initialized(false),
   initializing(false)
 {
@@ -165,6 +167,11 @@ void Mgr::init()
   objecter->wait_for_osd_map();
   lock.Lock();
 
+  // Populate PGs in ClusterState
+  objecter->with_osdmap([this](const OSDMap &osd_map) {
+    cluster_state.notify_osdmap(osd_map);
+  });
+
   monc->sub_want("mgrdigest", 0, 0);
 
   // Prepare to receive FSMap and request it
@@ -394,6 +401,8 @@ void Mgr::handle_osd_map()
         assert(r == 0);  // start_mon_command defined to not fail
       }
     }
+
+    cluster_state.notify_osdmap(osd_map);
   });
 
   // TODO: same culling for MonMap and FSMap
index 0cb0cd01c1cfff49382fde4400dd2fcf01494328..7264caab5eafadb033796f67c5997ea3deaa1fd1 100644 (file)
@@ -11,6 +11,8 @@
  * Foundation.  See file COPYING.
  */
 
+#include <Python.h>
+
 #include "common/errno.h"
 #include "mon/MonClient.h"
 #include "include/stringify.h"
index 0a7c0564fc3690442d272cd3b1a93938d1ea9453..e7147e736bb90c57298b36f3df4d59f243a84ef1 100644 (file)
  * Foundation.  See file COPYING.
  */
 
+// Include this first to get python headers earlier
+#include "PyState.h"
 
 #include <boost/tokenizer.hpp>
 #include "common/errno.h"
+#include "include/stringify.h"
 
-#include "PyState.h"
 #include "PyFormatter.h"
 
 #include "osd/OSDMap.h"
@@ -171,12 +173,56 @@ PyObject *PyModules::get_python(const std::string &what)
       f.close_section();
     }
     return f.get();
-  } else if (what == "pg_summary" || what == "health" || what == "mon_status") {
+  } else if (what == "pg_summary") {
+    PyFormatter f;
+    cluster_state.with_pgmap(
+        [&f](const PGMap &pg_map) {
+      //    f.open_object_section("outer");
+          std::map<std::string, std::map<std::string, uint32_t> > osds;
+          std::map<std::string, std::map<std::string, uint32_t> > pools;
+          std::map<std::string, uint32_t> all;
+          for (const auto &i : pg_map.pg_stat) {
+            const auto pool = i.first.m_pool;
+            const std::string state = pg_state_string(i.second.state);
+            // Insert to per-pool map
+            pools[stringify(pool)][state]++;
+            for (const auto &osd_id : i.second.acting) {
+              osds[stringify(osd_id)][state]++;
+            }
+            all[state]++;
+          }
+          f.open_object_section("by_osd");
+          for (const auto &i : osds) {
+            f.open_object_section(i.first.c_str());
+            for (const auto &j : i.second) {
+              f.dump_int(j.first.c_str(), j.second);
+            }
+            f.close_section();
+          }
+          f.close_section();
+          f.open_object_section("by_pool");
+          for (const auto &i : pools) {
+            f.open_object_section(i.first.c_str());
+            for (const auto &j : i.second) {
+              f.dump_int(j.first.c_str(), j.second);
+            }
+            f.close_section();
+          }
+          f.close_section();
+          f.open_object_section("all");
+          for (const auto &i : all) {
+            f.dump_int(i.first.c_str(), i.second);
+          }
+          f.close_section();
+      //    f.close_section();
+        }
+    );
+    return f.get();
+
+  } else if (what == "health" || what == "mon_status") {
     PyFormatter f;
     bufferlist json;
-    if (what == "pg_summary") {
-      json = cluster_state.get_pg_summary();
-    } else if (what == "health") {
+    if (what == "health") {
       json = cluster_state.get_health();
     } else if (what == "mon_status") {
       json = cluster_state.get_mon_status();