From: John Spray Date: Sun, 31 Jul 2016 17:07:20 +0000 (+0100) Subject: mgr: handle PGStats with a PGMap X-Git-Tag: v11.0.1~60^2~31 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fa147e3a591a4d74a7a8692b17add420ffc9bf98;p=ceph.git mgr: handle PGStats with a PGMap No longer need the mon to send us the pg_summary json hack. Signed-off-by: John Spray --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 92a19e6295d7..5428d264ca7a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -517,6 +517,7 @@ add_subdirectory(libradosstriper) if (WITH_MGR) set(mgr_srcs ceph_mgr.cc + mon/PGMap.cc mgr/DaemonState.cc mgr/DaemonServer.cc mgr/ClusterState.cc diff --git a/src/ceph_mgr.cc b/src/ceph_mgr.cc index c3d032c13268..9365fb83fd17 100644 --- a/src/ceph_mgr.cc +++ b/src/ceph_mgr.cc @@ -14,6 +14,7 @@ * */ +#include #include "include/types.h" #include "common/config.h" diff --git a/src/mgr/ClusterState.cc b/src/mgr/ClusterState.cc index 1e16810799f8..e5d29a707b30 100644 --- a/src/mgr/ClusterState.cc +++ b/src/mgr/ClusterState.cc @@ -12,9 +12,13 @@ */ #include "messages/MMgrDigest.h" +#include "messages/MPGStats.h" #include "mgr/ClusterState.h" +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_) : monc(monc_), objecter(objecter_), lock("ClusterState") @@ -36,8 +40,298 @@ void ClusterState::set_fsmap(FSMap const &new_fsmap) void ClusterState::load_digest(MMgrDigest *m) { - pg_summary_json = std::move(m->pg_summary_json); health_json = std::move(m->health_json); mon_status_json = std::move(m->mon_status_json); } +void ClusterState::ingest_pgstats(MPGStats *stats) +{ + Mutex::Locker l(lock); + PGMap::Incremental pending_inc; + pending_inc.version = pg_map.version + 1; // to make apply_incremental happy + + const int from = stats->get_orig_source().num(); + bool is_in = false; + objecter->with_osdmap([&is_in, from](const OSDMap &osd_map){ + is_in = osd_map.is_in(from); + }); + + if (is_in) { + pending_inc.update_stat(from, stats->epoch, stats->osd_stat); + } else { + pending_inc.update_stat(from, stats->epoch, osd_stat_t()); + } + + for (auto p : stats->pg_stat) { + pg_t pgid = p.first; + const auto &pg_stats = p.second; + + // In case we're hearing about a PG that according to last + // OSDMap update should not exist + if (pg_map.pg_stat.count(pgid) == 0) { + dout(15) << " got " << pgid << " reported at " << pg_stats.reported_epoch << ":" + << pg_stats.reported_seq + << " state " << pg_state_string(pg_stats.state) + << " but DNE in pg_map; pool was probably deleted." + << dendl; + continue; + } + + // In case we already heard about more recent stats from this PG + // from another OSD + if (pg_map.pg_stat.count(pgid) && + pg_map.pg_stat[pgid].get_version_pair() > pg_stats.get_version_pair()) { + dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":" + << pg_map.pg_stat[pgid].reported_seq << dendl; + continue; + } + + pending_inc.pg_stat_updates[pgid] = pg_stats; + } + + pg_map.apply_incremental(g_ceph_context, pending_inc); +} + +void ClusterState::notify_osdmap(const OSDMap &osd_map) +{ + Mutex::Locker l(lock); + + PGMap::Incremental pending_inc; + pending_inc.version = pg_map.version + 1; // to make apply_incremental happy + + _update_creating_pgs(osd_map, &pending_inc); + _register_new_pgs(osd_map, &pending_inc); + + pg_map.apply_incremental(g_ceph_context, pending_inc); + + // TODO: Reinstate check_down_pgs logic? +} + +void ClusterState::_register_new_pgs( + const OSDMap &osd_map, + PGMap::Incremental *pending_inc) +{ + // iterate over crush mapspace + epoch_t epoch = osd_map.get_epoch(); + dout(10) << "checking pg pools for osdmap epoch " << epoch + << ", last_pg_scan " << pg_map.last_pg_scan << dendl; + + int created = 0; + for (const auto & p : osd_map.pools) { + int64_t poolid = p.first; + const pg_pool_t &pool = p.second; + + int ruleno = osd_map.crush->find_rule(pool.get_crush_ruleset(), + pool.get_type(), pool.get_size()); + if (ruleno < 0 || !osd_map.crush->rule_exists(ruleno)) + continue; + + if (pool.get_last_change() <= pg_map.last_pg_scan || + pool.get_last_change() <= pending_inc->pg_scan) { + dout(10) << " no change in pool " << poolid << " " << pool << dendl; + continue; + } + + dout(10) << "scanning pool " << poolid + << " " << pool << dendl; + + // first pgs in this pool + bool new_pool = pg_map.pg_pool_sum.count(poolid) == 0; + + for (ps_t ps = 0; ps < pool.get_pg_num(); ps++) { + pg_t pgid(ps, poolid, -1); + if (pg_map.pg_stat.count(pgid)) { + dout(20) << "register_new_pgs have " << pgid << dendl; + continue; + } + created++; + _register_pg(osd_map, pgid, pool.get_last_change(), new_pool, + pending_inc); + } + } + + int removed = 0; + for (const auto &p : pg_map.creating_pgs) { + if (p.preferred() >= 0) { + dout(20) << " removing creating_pg " << p + << " because it is localized and obsolete" << dendl; + pending_inc->pg_remove.insert(p); + removed++; + } + if (!osd_map.have_pg_pool(p.pool())) { + dout(20) << " removing creating_pg " << p + << " because containing pool deleted" << dendl; + pending_inc->pg_remove.insert(p); + ++removed; + } + } + + // deleted pools? + for (const auto & p : pg_map.pg_stat) { + if (!osd_map.have_pg_pool(p.first.pool())) { + dout(20) << " removing pg_stat " << p.first << " because " + << "containing pool deleted" << dendl; + pending_inc->pg_remove.insert(p.first); + ++removed; + } + if (p.first.preferred() >= 0) { + dout(20) << " removing localized pg " << p.first << dendl; + pending_inc->pg_remove.insert(p.first); + ++removed; + } + } + + // we don't want to redo this work if we can avoid it. + pending_inc->pg_scan = epoch; + + dout(10) << "register_new_pgs registered " << created << " new pgs, removed " + << removed << " uncreated pgs" << dendl; +} + +void ClusterState::_register_pg( + const OSDMap &osd_map, + pg_t pgid, epoch_t epoch, + bool new_pool, + PGMap::Incremental *pending_inc) +{ + pg_t parent; + int split_bits = 0; + bool parent_found = false; + if (!new_pool) { + parent = pgid; + while (1) { + // remove most significant bit + int msb = cbits(parent.ps()); + if (!msb) + break; + parent.set_ps(parent.ps() & ~(1<<(msb-1))); + split_bits++; + dout(30) << " is " << pgid << " parent " << parent << " ?" << dendl; + if (pg_map.pg_stat.count(parent) && + pg_map.pg_stat[parent].state != PG_STATE_CREATING) { + dout(10) << " parent is " << parent << dendl; + parent_found = true; + break; + } + } + } + + pg_stat_t &stats = pending_inc->pg_stat_updates[pgid]; + stats.state = PG_STATE_CREATING; + stats.created = epoch; + stats.parent = parent; + stats.parent_split_bits = split_bits; + stats.mapping_epoch = epoch; + + if (parent_found) { + pg_stat_t &ps = pg_map.pg_stat[parent]; + stats.last_fresh = ps.last_fresh; + stats.last_active = ps.last_active; + stats.last_change = ps.last_change; + stats.last_peered = ps.last_peered; + stats.last_clean = ps.last_clean; + stats.last_unstale = ps.last_unstale; + stats.last_undegraded = ps.last_undegraded; + stats.last_fullsized = ps.last_fullsized; + stats.last_scrub_stamp = ps.last_scrub_stamp; + stats.last_deep_scrub_stamp = ps.last_deep_scrub_stamp; + stats.last_clean_scrub_stamp = ps.last_clean_scrub_stamp; + } else { + utime_t now = ceph_clock_now(g_ceph_context); + stats.last_fresh = now; + stats.last_active = now; + stats.last_change = now; + stats.last_peered = now; + stats.last_clean = now; + stats.last_unstale = now; + stats.last_undegraded = now; + stats.last_fullsized = now; + stats.last_scrub_stamp = now; + stats.last_deep_scrub_stamp = now; + stats.last_clean_scrub_stamp = now; + } + + osd_map.pg_to_up_acting_osds( + pgid, + &stats.up, + &stats.up_primary, + &stats.acting, + &stats.acting_primary); + + if (split_bits == 0) { + dout(10) << " will create " << pgid + << " primary " << stats.acting_primary + << " acting " << stats.acting + << dendl; + } else { + dout(10) << " will create " << pgid + << " primary " << stats.acting_primary + << " acting " << stats.acting + << " parent " << parent + << " by " << split_bits << " bits" + << dendl; + } +} + +// This was PGMonitor::map_pg_creates +void ClusterState::_update_creating_pgs( + const OSDMap &osd_map, + PGMap::Incremental *pending_inc) +{ + assert(pending_inc != nullptr); + + dout(10) << "to " << pg_map.creating_pgs.size() + << " pgs, osdmap epoch " << osd_map.get_epoch() + << dendl; + + for (set::const_iterator p = pg_map.creating_pgs.begin(); + p != pg_map.creating_pgs.end(); + ++p) { + pg_t pgid = *p; + pg_t on = pgid; + ceph::unordered_map::const_iterator q = + pg_map.pg_stat.find(pgid); + assert(q != pg_map.pg_stat.end()); + const pg_stat_t *s = &q->second; + + if (s->parent_split_bits) + on = s->parent; + + vector up, acting; + int up_primary, acting_primary; + osd_map.pg_to_up_acting_osds( + on, + &up, + &up_primary, + &acting, + &acting_primary); + + if (up != s->up || + up_primary != s->up_primary || + acting != s->acting || + acting_primary != s->acting_primary) { + pg_stat_t *ns = &pending_inc->pg_stat_updates[pgid]; + dout(20) << pgid << " " + << " acting_primary: " << s->acting_primary + << " -> " << acting_primary + << " acting: " << s->acting << " -> " << acting + << " up_primary: " << s->up_primary << " -> " << up_primary + << " up: " << s->up << " -> " << up + << dendl; + + // only initialize if it wasn't already a pending update + if (ns->reported_epoch == 0) + *ns = *s; + + // note epoch if the target of the create message changed + if (acting_primary != ns->acting_primary) + ns->mapping_epoch = osd_map.get_epoch(); + + ns->up = up; + ns->up_primary = up_primary; + ns->acting = acting; + ns->acting_primary = acting_primary; + } + } +} + diff --git a/src/mgr/ClusterState.h b/src/mgr/ClusterState.h index 44f69074daec..bd9ee4d06dd4 100644 --- a/src/mgr/ClusterState.h +++ b/src/mgr/ClusterState.h @@ -19,8 +19,10 @@ #include "osdc/Objecter.h" #include "mon/MonClient.h" +#include "mon/PGMap.h" class MMgrDigest; +class MPGStats; /** @@ -35,15 +37,30 @@ protected: FSMap fsmap; Mutex lock; - bufferlist pg_summary_json; + PGMap pg_map; + bufferlist health_json; bufferlist mon_status_json; + void _update_creating_pgs( + const OSDMap &osd_map, + PGMap::Incremental *pending_inc); + + void _register_pg( + const OSDMap &osd_map, + pg_t pgid, epoch_t epoch, + bool new_pool, + PGMap::Incremental *pending_inc); + + void _register_new_pgs( + const OSDMap &osd_map, + PGMap::Incremental *pending_inc); + public: void load_digest(MMgrDigest *m); + void ingest_pgstats(MPGStats *stats); - const bufferlist &get_pg_summary() const {return pg_summary_json;} const bufferlist &get_health() const {return health_json;} const bufferlist &get_mon_status() const {return mon_status_json;} @@ -52,12 +69,22 @@ public: void set_objecter(Objecter *objecter_); void set_fsmap(FSMap const &new_fsmap); + void notify_osdmap(const OSDMap &osd_map); + template void with_fsmap(Callback&& cb, Args&&...args) { - Mutex::Locker l(lock); - std::forward(cb)(const_cast(fsmap), - std::forward(args)...); + Mutex::Locker l(lock); + std::forward(cb)(const_cast(fsmap), + std::forward(args)...); + } + + template + void with_pgmap(Callback&& cb, Args&&...args) + { + Mutex::Locker l(lock); + std::forward(cb)(const_cast(pg_map), + std::forward(args)...); } template diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index a27955123778..cddbf0a69fb4 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -17,6 +17,7 @@ #include "messages/MMgrConfigure.h" #include "messages/MCommand.h" #include "messages/MCommandReply.h" +#include "messages/MPGStats.h" #define dout_subsys ceph_subsys_mgr #undef dout_prefix @@ -24,9 +25,11 @@ DaemonServer::DaemonServer(MonClient *monc_, DaemonStateIndex &daemon_state_, + ClusterState &cluster_state_, PyModules &py_modules_) : Dispatcher(g_ceph_context), msgr(nullptr), monc(monc_), daemon_state(daemon_state_), + cluster_state(cluster_state_), py_modules(py_modules_), auth_registry(g_ceph_context, g_conf->auth_supported.empty() ? @@ -120,6 +123,10 @@ bool DaemonServer::ms_dispatch(Message *m) Mutex::Locker l(lock); switch(m->get_type()) { + case MSG_PGSTATS: + cluster_state.ingest_pgstats(static_cast(m)); + m->put(); + return true; case MSG_MGR_REPORT: return handle_report(static_cast(m)); case MSG_MGR_OPEN: diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h index f37e7f015b62..2540e38b1ec3 100644 --- a/src/mgr/DaemonServer.h +++ b/src/mgr/DaemonServer.h @@ -43,6 +43,7 @@ protected: Messenger *msgr; MonClient *monc; DaemonStateIndex &daemon_state; + ClusterState &cluster_state; PyModules &py_modules; AuthAuthorizeHandlerRegistry auth_registry; @@ -58,6 +59,7 @@ public: DaemonServer(MonClient *monc_, DaemonStateIndex &daemon_state_, + ClusterState &cluster_state_, PyModules &py_modules_); ~DaemonServer(); diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index 5328270d9545..2a1cd246e51b 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -11,6 +11,8 @@ * Foundation. See file COPYING. */ +#include + #include "osdc/Objecter.h" #include "common/errno.h" #include "mon/MonClient.h" @@ -44,7 +46,7 @@ Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_) : waiting_for_fs_map(NULL), py_modules(daemon_state, cluster_state, *monc, finisher), cluster_state(monc, nullptr), - server(monc, daemon_state, py_modules), + server(monc, daemon_state, cluster_state, py_modules), initialized(false), initializing(false) { @@ -165,6 +167,11 @@ void Mgr::init() objecter->wait_for_osd_map(); lock.Lock(); + // Populate PGs in ClusterState + objecter->with_osdmap([this](const OSDMap &osd_map) { + cluster_state.notify_osdmap(osd_map); + }); + monc->sub_want("mgrdigest", 0, 0); // Prepare to receive FSMap and request it @@ -394,6 +401,8 @@ void Mgr::handle_osd_map() assert(r == 0); // start_mon_command defined to not fail } } + + cluster_state.notify_osdmap(osd_map); }); // TODO: same culling for MonMap and FSMap diff --git a/src/mgr/MgrStandby.cc b/src/mgr/MgrStandby.cc index 0cb0cd01c1cf..7264caab5eaf 100644 --- a/src/mgr/MgrStandby.cc +++ b/src/mgr/MgrStandby.cc @@ -11,6 +11,8 @@ * Foundation. See file COPYING. */ +#include + #include "common/errno.h" #include "mon/MonClient.h" #include "include/stringify.h" diff --git a/src/mgr/PyModules.cc b/src/mgr/PyModules.cc index 0a7c0564fc36..e7147e736bb9 100644 --- a/src/mgr/PyModules.cc +++ b/src/mgr/PyModules.cc @@ -11,11 +11,13 @@ * Foundation. See file COPYING. */ +// Include this first to get python headers earlier +#include "PyState.h" #include #include "common/errno.h" +#include "include/stringify.h" -#include "PyState.h" #include "PyFormatter.h" #include "osd/OSDMap.h" @@ -171,12 +173,56 @@ PyObject *PyModules::get_python(const std::string &what) f.close_section(); } return f.get(); - } else if (what == "pg_summary" || what == "health" || what == "mon_status") { + } else if (what == "pg_summary") { + PyFormatter f; + cluster_state.with_pgmap( + [&f](const PGMap &pg_map) { + // f.open_object_section("outer"); + std::map > osds; + std::map > pools; + std::map all; + for (const auto &i : pg_map.pg_stat) { + const auto pool = i.first.m_pool; + const std::string state = pg_state_string(i.second.state); + // Insert to per-pool map + pools[stringify(pool)][state]++; + for (const auto &osd_id : i.second.acting) { + osds[stringify(osd_id)][state]++; + } + all[state]++; + } + f.open_object_section("by_osd"); + for (const auto &i : osds) { + f.open_object_section(i.first.c_str()); + for (const auto &j : i.second) { + f.dump_int(j.first.c_str(), j.second); + } + f.close_section(); + } + f.close_section(); + f.open_object_section("by_pool"); + for (const auto &i : pools) { + f.open_object_section(i.first.c_str()); + for (const auto &j : i.second) { + f.dump_int(j.first.c_str(), j.second); + } + f.close_section(); + } + f.close_section(); + f.open_object_section("all"); + for (const auto &i : all) { + f.dump_int(i.first.c_str(), i.second); + } + f.close_section(); + // f.close_section(); + } + ); + return f.get(); + + } else if (what == "health" || what == "mon_status") { PyFormatter f; bufferlist json; - if (what == "pg_summary") { - json = cluster_state.get_pg_summary(); - } else if (what == "health") { + if (what == "health") { json = cluster_state.get_health(); } else if (what == "mon_status") { json = cluster_state.get_mon_status();