From 2d2aa00ed356228042c7765d4c2335e77ca1f46f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 25 Jun 2013 12:01:53 -0700 Subject: [PATCH] mon/PGMonitor: store PGMap directly in store, bypassing PaxosService stash_full Instead of encoding incrementals and periodically dumping the whole encoded PGMap, instead store everything in a range of keys, and update them between versions using transactions. The per-version values are now breadcrumbs indicating which keys were dirtied so they can be refreshed via update_from_paxos(). This has several benefits: - we avoid every encoding the entire PGMap - we avoid dumping that blob into leveldb keys - we limit the amount of data living in forward-moving keys, which leveldb has a hard time compacting away - pgmap data instead lives over a fixed range of keys, which leveldb excels at - we only keep the latest copy of the PGMap (which is all we care about) Bump the internal monitor protocol version. Signed-off-by: Sage Weil --- src/mon/Monitor.h | 2 +- src/mon/PGMap.cc | 87 ++++++++++- src/mon/PGMap.h | 43 +++++ src/mon/PGMonitor.cc | 345 ++++++++++++++++++++++++++++++++--------- src/mon/PGMonitor.h | 20 ++- src/mon/PaxosService.h | 1 + 6 files changed, 422 insertions(+), 76 deletions(-) diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index a9420ddefcaed..a5f72042b2b63 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -54,7 +54,7 @@ #include -#define CEPH_MON_PROTOCOL 11 /* cluster internal */ +#define CEPH_MON_PROTOCOL 12 /* cluster internal */ enum { diff --git a/src/mon/PGMap.cc b/src/mon/PGMap.cc index e1f3e77e0a2e3..8be36c857bd7f 100644 --- a/src/mon/PGMap.cc +++ b/src/mon/PGMap.cc @@ -5,9 +5,10 @@ #define dout_subsys ceph_subsys_mon #include "common/debug.h" - +#include "include/stringify.h" #include "common/Formatter.h" #include "include/ceph_features.h" +#include "mon/MonitorDBStore.h" // -- @@ -176,6 +177,7 @@ void PGMap::apply_incremental(CephContext *cct, const Incremental& inc) } if (ratios_changed) redo_full_sets(); + for (map::const_iterator p = inc.pg_stat_updates.begin(); p != inc.pg_stat_updates.end(); ++p) { @@ -308,6 +310,53 @@ void PGMap::calc_stats() redo_full_sets(); } +void PGMap::update_pg(pg_t pgid, bufferlist& bl) +{ + bufferlist::iterator p = bl.begin(); + hash_map::iterator s = pg_stat.find(pgid); + if (s != pg_stat.end()) + stat_pg_sub(pgid, s->second); + pg_stat_t& r = pg_stat[pgid]; + ::decode(r, p); + stat_pg_add(pgid, r); +} + +void PGMap::remove_pg(pg_t pgid) +{ + hash_map::iterator s = pg_stat.find(pgid); + if (s != pg_stat.end()) { + stat_pg_sub(pgid, s->second); + pg_stat.erase(s); + } +} + +void PGMap::update_osd(int osd, bufferlist& bl) +{ + bufferlist::iterator p = bl.begin(); + hash_map::iterator o = osd_stat.find(osd); + if (o != osd_stat.end()) + stat_osd_sub(o->second); + osd_stat_t& r = osd_stat[osd]; + ::decode(r, p); + stat_osd_add(r); + + // adjust [near]full status + register_nearfull_status(osd, r); +} + +void PGMap::remove_osd(int osd) +{ + hash_map::iterator o = osd_stat.find(osd); + if (o != osd_stat.end()) { + stat_osd_sub(o->second); + osd_stat.erase(o); + + // remove these old osds from full/nearfull set(s), too + nearfull_osds.erase(osd); + full_osds.erase(osd); + } +} + void PGMap::stat_pg_add(const pg_t &pgid, const pg_stat_t &s) { num_pg++; @@ -426,6 +475,21 @@ void PGMap::decode(bufferlist::iterator &bl) calc_stats(); } +void PGMap::dirty_all(Incremental& inc) +{ + inc.osdmap_epoch = last_osdmap_epoch; + inc.pg_scan = last_pg_scan; + inc.full_ratio = full_ratio; + inc.nearfull_ratio = nearfull_ratio; + + for (hash_map::const_iterator p = pg_stat.begin(); p != pg_stat.end(); ++p) { + inc.pg_stat_updates[p->first] = p->second; + } + for (hash_map::const_iterator p = osd_stat.begin(); p != osd_stat.end(); ++p) { + inc.osd_stat_updates[p->first] = p->second; + } +} + void PGMap::dump(Formatter *f) const { dump_basic(f); @@ -675,6 +739,27 @@ void PGMap::recovery_summary(ostream& out) const } } +void PGMap::update_delta(CephContext *cct, utime_t inc_stamp, pool_stat_t& pg_sum_old) +{ + utime_t delta_t; + delta_t = inc_stamp; + delta_t -= stamp; + stamp = inc_stamp; + + // calculate a delta, and average over the last 2 deltas. + pool_stat_t d = pg_sum; + d.stats.sub(pg_sum_old.stats); + pg_sum_deltas.push_back(make_pair(d, delta_t)); + stamp_delta += delta_t; + + pg_sum_delta.stats.add(d.stats); + if (pg_sum_deltas.size() > (std::list< pair >::size_type)MAX(1, cct ? cct->_conf->mon_stat_smooth_intervals : 1)) { + pg_sum_delta.stats.sub(pg_sum_deltas.front().first.stats); + stamp_delta -= pg_sum_deltas.front().second; + pg_sum_deltas.pop_front(); + } +} + void PGMap::clear_delta() { pg_sum_delta = pool_stat_t(); diff --git a/src/mon/PGMap.h b/src/mon/PGMap.h index 7ab481f887ea3..d8cf1885a82d9 100644 --- a/src/mon/PGMap.h +++ b/src/mon/PGMap.h @@ -26,6 +26,8 @@ #include "common/config.h" #include +#include "MonitorDBStore.h" + namespace ceph { class Formatter; } class PGMap { @@ -78,6 +80,7 @@ public: pool_stat_t pg_sum_delta; utime_t stamp_delta; + void update_delta(CephContext *cct, utime_t inc_stamp, pool_stat_t& pg_sum_old); void clear_delta(); set creating_pgs; // lru: front = new additions, back = recently pinged @@ -98,6 +101,44 @@ public: num_osd(0) {} + void set_full_ratios(float full, float nearfull) { + if (full_ratio == full && nearfull_ratio == nearfull) + return; + full_ratio = full; + nearfull_ratio = nearfull; + redo_full_sets(); + } + + version_t get_version() const { + return version; + } + void set_version(version_t v) { + version = v; + } + epoch_t get_last_osdmap_epoch() const { + return last_osdmap_epoch; + } + void set_last_osdmap_epoch(epoch_t e) { + last_osdmap_epoch = e; + } + epoch_t get_last_pg_scan() const { + return last_pg_scan; + } + void set_last_pg_scan(epoch_t e) { + last_pg_scan = e; + } + utime_t get_stamp() const { + return stamp; + } + void set_stamp(utime_t s) { + stamp = s; + } + + void update_pg(pg_t pgid, bufferlist& bl); + void remove_pg(pg_t pgid); + void update_osd(int osd, bufferlist& bl); + void remove_osd(int osd); + void apply_incremental(CephContext *cct, const Incremental& inc); void redo_full_sets(); void register_nearfull_status(int osd, const osd_stat_t& s); @@ -110,6 +151,8 @@ public: void encode(bufferlist &bl, uint64_t features=-1) const; void decode(bufferlist::iterator &bl); + void dirty_all(Incremental& inc); + void dump(Formatter *f) const; void dump_basic(Formatter *f) const; void dump_pg_stats(Formatter *f) const; diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 5f37729559b92..479b498f934ed 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -147,6 +147,7 @@ void PGMonitor::tick() void PGMonitor::create_initial() { dout(10) << "create_initial -- creating initial map" << dendl; + format_version = 1; } void PGMonitor::update_from_paxos(bool *need_bootstrap) @@ -156,75 +157,96 @@ void PGMonitor::update_from_paxos(bool *need_bootstrap) return; assert(version >= pg_map.version); - /* Obtain latest full pgmap version, if available and whose version is - * greater than the current pgmap's version. - */ - version_t latest_full = get_version_latest_full(); - if ((latest_full > 0) && (latest_full > pg_map.version)) { - bufferlist latest_bl; - int err = get_version_full(latest_full, latest_bl); - assert(err == 0); - dout(7) << __func__ << " loading latest full pgmap v" - << latest_full << dendl; - try { - PGMap tmp_pg_map; - bufferlist::iterator p = latest_bl.begin(); - tmp_pg_map.decode(p); - pg_map = tmp_pg_map; - } catch (const std::exception& e) { - dout(0) << __func__ << ": error parsing update: " - << e.what() << dendl; - assert(0 == "update_from_paxos: error parsing update"); - return; + if (format_version == 0) { + // old format + + /* Obtain latest full pgmap version, if available and whose version is + * greater than the current pgmap's version. + */ + version_t latest_full = get_version_latest_full(); + if ((latest_full > 0) && (latest_full > pg_map.version)) { + bufferlist latest_bl; + int err = get_version_full(latest_full, latest_bl); + assert(err == 0); + dout(7) << __func__ << " loading latest full pgmap v" + << latest_full << dendl; + try { + PGMap tmp_pg_map; + bufferlist::iterator p = latest_bl.begin(); + tmp_pg_map.decode(p); + pg_map = tmp_pg_map; + } catch (const std::exception& e) { + dout(0) << __func__ << ": error parsing update: " + << e.what() << dendl; + assert(0 == "update_from_paxos: error parsing update"); + return; + } } - } - // walk through incrementals - while (version > pg_map.version) { - bufferlist bl; - int err = get_version(pg_map.version+1, bl); - assert(err == 0); - assert(bl.length()); + // walk through incrementals + while (version > pg_map.version) { + bufferlist bl; + int err = get_version(pg_map.version+1, bl); + assert(err == 0); + assert(bl.length()); + + dout(7) << "update_from_paxos applying incremental " << pg_map.version+1 << dendl; + PGMap::Incremental inc; + try { + bufferlist::iterator p = bl.begin(); + inc.decode(p); + } + catch (const std::exception &e) { + dout(0) << "update_from_paxos: error parsing " + << "incremental update: " << e.what() << dendl; + assert(0 == "update_from_paxos: error parsing incremental update"); + return; + } - dout(7) << "update_from_paxos applying incremental " << pg_map.version+1 << dendl; - PGMap::Incremental inc; - try { - bufferlist::iterator p = bl.begin(); - inc.decode(p); + pg_map.apply_incremental(g_ceph_context, inc); + + dout(10) << pg_map << dendl; + + if (inc.pg_scan) + last_sent_pg_create.clear(); // reset pg_create throttle timer } - catch (const std::exception &e) { - dout(0) << "update_from_paxos: error parsing " - << "incremental update: " << e.what() << dendl; - assert(0 == "update_from_paxos: error parsing incremental update"); - return; + + } else if (format_version == 1) { + // pg/osd keys in leveldb + + // read meta + epoch_t last_pg_scan = pg_map.last_pg_scan; + + while (version > pg_map.version) { + // load full state? + if (pg_map.version == 0) { + dout(10) << __func__ << " v0, read_full" << dendl; + read_pgmap_full(); + goto out; + } + + // incremental state? + dout(10) << __func__ << " read_incremental" << dendl; + bufferlist bl; + int r = get_version(pg_map.version + 1, bl); + if (r == -ENOENT) { + dout(10) << __func__ << " failed to read_incremental, read_full" << dendl; + read_pgmap_full(); + goto out; + } + assert(r == 0); + apply_pgmap_delta(bl); } - pg_map.apply_incremental(g_ceph_context, inc); - - dout(10) << pg_map << dendl; + read_pgmap_meta(); - if (inc.pg_scan) + out: + if (last_pg_scan != pg_map.last_pg_scan) last_sent_pg_create.clear(); // reset pg_create throttle timer } assert(version == pg_map.version); - /* If we dump the summaries onto the k/v store, they hardly would be useful - * without a tool created with reading them in mind. - * Comment this out until we decide what is the best course of action. - * - // dump pgmap summaries? (useful for debugging) - if (0) { - stringstream ds; - pg_map.dump(ds); - bufferlist d; - d.append(ds); - mon->store->put_bl_sn(d, "pgmap_dump", version); - } - */ - - update_trim(); - if (mon->osdmon()->osdmap.get_epoch()) { map_pg_creates(); send_pg_creates(); @@ -233,6 +255,22 @@ void PGMonitor::update_from_paxos(bool *need_bootstrap) update_logger(); } +void PGMonitor::upgrade_format() +{ + unsigned current = 1; + assert(format_version <= current); + if (format_version == current) + return; + + dout(1) << __func__ << " to " << current << dendl; + + // upgrade by dirtying it all + pg_map.dirty_all(pending_inc); + + format_version = current; + propose_pending(); +} + void PGMonitor::init() { if (mon->osdmon()->osdmap.get_epoch()) { @@ -274,6 +312,123 @@ void PGMonitor::create_pending() dout(10) << "create_pending v " << pending_inc.version << dendl; } +void PGMonitor::read_pgmap_meta() +{ + dout(10) << __func__ << dendl; + + string prefix = "pgmap_meta"; + + version_t version = mon->store->get(prefix, "version"); + epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch"); + epoch_t last_pg_scan = mon->store->get(prefix, "last_pg_scan"); + pg_map.set_version(version); + pg_map.set_last_osdmap_epoch(last_osdmap_epoch); + pg_map.set_last_pg_scan(last_pg_scan); + + float full_ratio, nearfull_ratio; + { + bufferlist bl; + mon->store->get(prefix, "full_ratio", bl); + bufferlist::iterator p = bl.begin(); + ::decode(full_ratio, p); + } + { + bufferlist bl; + mon->store->get(prefix, "nearfull_ratio", bl); + bufferlist::iterator p = bl.begin(); + ::decode(nearfull_ratio, p); + } + pg_map.set_full_ratios(full_ratio, nearfull_ratio); + { + bufferlist bl; + mon->store->get(prefix, "stamp", bl); + bufferlist::iterator p = bl.begin(); + utime_t stamp; + ::decode(stamp, p); + pg_map.set_stamp(stamp); + } +} + +void PGMonitor::read_pgmap_full() +{ + read_pgmap_meta(); + + string prefix = "pgmap_pg"; + for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) { + string key = i->key(); + pg_t pgid; + if (!pgid.parse(key.c_str())) { + dout(0) << "unable to parse key " << key << dendl; + continue; + } + bufferlist bl = i->value(); + pg_map.update_pg(pgid, bl); + dout(20) << " got " << pgid << dendl; + } + + prefix = "pgmap_osd"; + for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) { + string key = i->key(); + int osd; + osd = atoi(key.c_str()); + bufferlist bl = i->value(); + pg_map.update_osd(osd, bl); + dout(20) << " got osd." << osd << dendl; + } +} + +void PGMonitor::apply_pgmap_delta(bufferlist& bl) +{ + version_t v = pg_map.version + 1; + + utime_t inc_stamp; + bufferlist dirty_pgs, dirty_osds; + { + bufferlist::iterator p = bl.begin(); + ::decode(inc_stamp, p); + ::decode(dirty_pgs, p); + ::decode(dirty_osds, p); + } + + pool_stat_t pg_sum_old = pg_map.pg_sum; + + // pgs + bufferlist::iterator p = dirty_pgs.begin(); + while (!p.end()) { + pg_t pgid; + ::decode(pgid, p); + dout(20) << " refreshing pg " << pgid << dendl; + bufferlist bl; + int r = mon->store->get("pgmap_pg", stringify(pgid), bl); + if (r >= 0) { + pg_map.update_pg(pgid, bl); + } else { + pg_map.remove_pg(pgid); + } + } + + // osds + p = dirty_osds.begin(); + while (!p.end()) { + int32_t osd; + ::decode(osd, p); + dout(20) << " refreshing osd." << osd << dendl; + bufferlist bl; + int r = mon->store->get("pgmap_osd", stringify(osd), bl); + if (r >= 0) { + pg_map.update_osd(osd, bl); + } else { + pg_map.remove_osd(osd); + } + } + + pg_map.update_delta(g_ceph_context, inc_stamp, pg_sum_old); + + // ok, we're now on the new version + pg_map.version = v; +} + + void PGMonitor::encode_pending(MonitorDBStore::Transaction *t) { version_t version = pending_inc.version; @@ -281,23 +436,72 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t) assert(get_last_committed() + 1 == version); pending_inc.stamp = ceph_clock_now(g_ceph_context); - bufferlist bl; - pending_inc.encode(bl, mon->get_quorum_features()); + uint64_t features = mon->get_quorum_features(); - put_version(t, version, bl); - put_last_committed(t, version); -} + string prefix = "pgmap_meta"; -void PGMonitor::encode_full(MonitorDBStore::Transaction *t) -{ - dout(10) << __func__ << " pgmap v " << pg_map.version << dendl; - assert(get_last_committed() == pg_map.version); + t->put(prefix, "version", pending_inc.version); + { + bufferlist bl; + ::encode(pending_inc.stamp, bl); + t->put(prefix, "stamp", bl); + } - bufferlist full_bl; - pg_map.encode(full_bl, mon->get_quorum_features()); + if (pending_inc.osdmap_epoch) + t->put(prefix, "last_osdmap_epoch", pending_inc.osdmap_epoch); + if (pending_inc.pg_scan) + t->put(prefix, "last_pg_scan", pending_inc.pg_scan); + if (pending_inc.full_ratio > 0) { + bufferlist bl; + ::encode(pending_inc.full_ratio, bl); + t->put(prefix, "full_ratio", bl); + } + if (pending_inc.nearfull_ratio > 0) { + bufferlist bl; + ::encode(pending_inc.nearfull_ratio, bl); + t->put(prefix, "nearfull_ratio", bl); + } - put_version_full(t, pg_map.version, full_bl); - put_version_latest_full(t, pg_map.version); + bufferlist incbl; + ::encode(pending_inc.stamp, incbl); + { + bufferlist dirty; + string prefix = "pgmap_pg"; + for (map::const_iterator p = pending_inc.pg_stat_updates.begin(); + p != pending_inc.pg_stat_updates.end(); + ++p) { + ::encode(p->first, dirty); + bufferlist bl; + ::encode(p->second, bl, features); + t->put(prefix, stringify(p->first), bl); + } + for (set::const_iterator p = pending_inc.pg_remove.begin(); p != pending_inc.pg_remove.end(); ++p) { + ::encode(*p, dirty); + t->erase(prefix, stringify(*p)); + } + ::encode(dirty, incbl); + } + { + bufferlist dirty; + string prefix = "pgmap_osd"; + for (map::const_iterator p = pending_inc.osd_stat_updates.begin(); + p != pending_inc.osd_stat_updates.end(); + ++p) { + ::encode(p->first, dirty); + bufferlist bl; + ::encode(p->second, bl, features); + t->put(prefix, stringify(p->first), bl); + } + for (set::const_iterator p = pending_inc.osd_stat_rm.begin(); p != pending_inc.osd_stat_rm.end(); ++p) { + ::encode(*p, dirty); + t->erase(prefix, stringify(*p)); + } + ::encode(dirty, incbl); + } + + put_version(t, version, incbl); + + put_last_committed(t, version); } void PGMonitor::update_trim() @@ -308,7 +512,6 @@ void PGMonitor::update_trim() set_trim_to(version - max); } - bool PGMonitor::preprocess_query(PaxosServiceMessage *m) { dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index b18b76b1288a3..f8cf279a3b1f1 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -52,15 +52,19 @@ private: void create_initial(); void update_from_paxos(bool *need_bootstrap); + void upgrade_format(); void init(); void handle_osd_timeouts(); void create_pending(); // prepare a new pending // propose pending update to peers void update_trim(); - void encode_pending(MonitorDBStore::Transaction *t); - virtual void encode_full(MonitorDBStore::Transaction *t); void update_logger(); + void encode_pending(MonitorDBStore::Transaction *t); + void read_pgmap_meta(); + void read_pgmap_full(); + void apply_pgmap_delta(bufferlist& bl); + bool preprocess_query(PaxosServiceMessage *m); // true if processed. bool prepare_update(PaxosServiceMessage *m); @@ -137,7 +141,9 @@ private: public: PGMonitor(Monitor *mn, Paxos *p, const string& service_name) - : PaxosService(mn, p, service_name), need_check_down_pgs(false) { } + : PaxosService(mn, p, service_name), + need_check_down_pgs(false) + { } ~PGMonitor() { } virtual void on_restart(); @@ -147,6 +153,14 @@ public: * haven't lost any PGs from new pools. */ virtual void on_active(); + bool should_stash_full() { + return false; // never + } + virtual void encode_full(MonitorDBStore::Transaction *t) { + assert(0 == "unimplemented encode_full"); + } + + void tick(); // check state, take actions void check_osd_map(epoch_t epoch); diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h index 289e25c8ecff8..d92824f6dd657 100644 --- a/src/mon/PaxosService.h +++ b/src/mon/PaxosService.h @@ -746,6 +746,7 @@ public: * @param t Transaction on which the full version shall be encoded. */ virtual void encode_full(MonitorDBStore::Transaction *t) = 0; + /** * @} */ -- 2.39.5