#define dout_subsys ceph_subsys_mon
#include "common/debug.h"
-
+#include "include/stringify.h"
#include "common/Formatter.h"
#include "include/ceph_features.h"
+#include "mon/MonitorDBStore.h"
// --
}
if (ratios_changed)
redo_full_sets();
+
for (map<pg_t,pg_stat_t>::const_iterator p = inc.pg_stat_updates.begin();
p != inc.pg_stat_updates.end();
++p) {
redo_full_sets();
}
+void PGMap::update_pg(pg_t pgid, bufferlist& bl)
+{
+ bufferlist::iterator p = bl.begin();
+ hash_map<pg_t,pg_stat_t>::iterator s = pg_stat.find(pgid);
+ if (s != pg_stat.end())
+ stat_pg_sub(pgid, s->second);
+ pg_stat_t& r = pg_stat[pgid];
+ ::decode(r, p);
+ stat_pg_add(pgid, r);
+}
+
+void PGMap::remove_pg(pg_t pgid)
+{
+ hash_map<pg_t,pg_stat_t>::iterator s = pg_stat.find(pgid);
+ if (s != pg_stat.end()) {
+ stat_pg_sub(pgid, s->second);
+ pg_stat.erase(s);
+ }
+}
+
+void PGMap::update_osd(int osd, bufferlist& bl)
+{
+ bufferlist::iterator p = bl.begin();
+ hash_map<int32_t,osd_stat_t>::iterator o = osd_stat.find(osd);
+ if (o != osd_stat.end())
+ stat_osd_sub(o->second);
+ osd_stat_t& r = osd_stat[osd];
+ ::decode(r, p);
+ stat_osd_add(r);
+
+ // adjust [near]full status
+ register_nearfull_status(osd, r);
+}
+
+void PGMap::remove_osd(int osd)
+{
+ hash_map<int32_t,osd_stat_t>::iterator o = osd_stat.find(osd);
+ if (o != osd_stat.end()) {
+ stat_osd_sub(o->second);
+ osd_stat.erase(o);
+
+ // remove these old osds from full/nearfull set(s), too
+ nearfull_osds.erase(osd);
+ full_osds.erase(osd);
+ }
+}
+
void PGMap::stat_pg_add(const pg_t &pgid, const pg_stat_t &s)
{
num_pg++;
calc_stats();
}
+void PGMap::dirty_all(Incremental& inc)
+{
+ inc.osdmap_epoch = last_osdmap_epoch;
+ inc.pg_scan = last_pg_scan;
+ inc.full_ratio = full_ratio;
+ inc.nearfull_ratio = nearfull_ratio;
+
+ for (hash_map<pg_t,pg_stat_t>::const_iterator p = pg_stat.begin(); p != pg_stat.end(); ++p) {
+ inc.pg_stat_updates[p->first] = p->second;
+ }
+ for (hash_map<int32_t, osd_stat_t>::const_iterator p = osd_stat.begin(); p != osd_stat.end(); ++p) {
+ inc.osd_stat_updates[p->first] = p->second;
+ }
+}
+
void PGMap::dump(Formatter *f) const
{
dump_basic(f);
}
}
+void PGMap::update_delta(CephContext *cct, utime_t inc_stamp, pool_stat_t& pg_sum_old)
+{
+ utime_t delta_t;
+ delta_t = inc_stamp;
+ delta_t -= stamp;
+ stamp = inc_stamp;
+
+ // calculate a delta, and average over the last 2 deltas.
+ pool_stat_t d = pg_sum;
+ d.stats.sub(pg_sum_old.stats);
+ pg_sum_deltas.push_back(make_pair(d, delta_t));
+ stamp_delta += delta_t;
+
+ pg_sum_delta.stats.add(d.stats);
+ if (pg_sum_deltas.size() > (std::list< pair<pool_stat_t, utime_t> >::size_type)MAX(1, cct ? cct->_conf->mon_stat_smooth_intervals : 1)) {
+ pg_sum_delta.stats.sub(pg_sum_deltas.front().first.stats);
+ stamp_delta -= pg_sum_deltas.front().second;
+ pg_sum_deltas.pop_front();
+ }
+}
+
void PGMap::clear_delta()
{
pg_sum_delta = pool_stat_t();
void PGMonitor::create_initial()
{
dout(10) << "create_initial -- creating initial map" << dendl;
+ format_version = 1;
}
void PGMonitor::update_from_paxos(bool *need_bootstrap)
return;
assert(version >= pg_map.version);
- /* Obtain latest full pgmap version, if available and whose version is
- * greater than the current pgmap's version.
- */
- version_t latest_full = get_version_latest_full();
- if ((latest_full > 0) && (latest_full > pg_map.version)) {
- bufferlist latest_bl;
- int err = get_version_full(latest_full, latest_bl);
- assert(err == 0);
- dout(7) << __func__ << " loading latest full pgmap v"
- << latest_full << dendl;
- try {
- PGMap tmp_pg_map;
- bufferlist::iterator p = latest_bl.begin();
- tmp_pg_map.decode(p);
- pg_map = tmp_pg_map;
- } catch (const std::exception& e) {
- dout(0) << __func__ << ": error parsing update: "
- << e.what() << dendl;
- assert(0 == "update_from_paxos: error parsing update");
- return;
+ if (format_version == 0) {
+ // old format
+
+ /* Obtain latest full pgmap version, if available and whose version is
+ * greater than the current pgmap's version.
+ */
+ version_t latest_full = get_version_latest_full();
+ if ((latest_full > 0) && (latest_full > pg_map.version)) {
+ bufferlist latest_bl;
+ int err = get_version_full(latest_full, latest_bl);
+ assert(err == 0);
+ dout(7) << __func__ << " loading latest full pgmap v"
+ << latest_full << dendl;
+ try {
+ PGMap tmp_pg_map;
+ bufferlist::iterator p = latest_bl.begin();
+ tmp_pg_map.decode(p);
+ pg_map = tmp_pg_map;
+ } catch (const std::exception& e) {
+ dout(0) << __func__ << ": error parsing update: "
+ << e.what() << dendl;
+ assert(0 == "update_from_paxos: error parsing update");
+ return;
+ }
}
- }
- // walk through incrementals
- while (version > pg_map.version) {
- bufferlist bl;
- int err = get_version(pg_map.version+1, bl);
- assert(err == 0);
- assert(bl.length());
+ // walk through incrementals
+ while (version > pg_map.version) {
+ bufferlist bl;
+ int err = get_version(pg_map.version+1, bl);
+ assert(err == 0);
+ assert(bl.length());
+
+ dout(7) << "update_from_paxos applying incremental " << pg_map.version+1 << dendl;
+ PGMap::Incremental inc;
+ try {
+ bufferlist::iterator p = bl.begin();
+ inc.decode(p);
+ }
+ catch (const std::exception &e) {
+ dout(0) << "update_from_paxos: error parsing "
+ << "incremental update: " << e.what() << dendl;
+ assert(0 == "update_from_paxos: error parsing incremental update");
+ return;
+ }
- dout(7) << "update_from_paxos applying incremental " << pg_map.version+1 << dendl;
- PGMap::Incremental inc;
- try {
- bufferlist::iterator p = bl.begin();
- inc.decode(p);
+ pg_map.apply_incremental(g_ceph_context, inc);
+
+ dout(10) << pg_map << dendl;
+
+ if (inc.pg_scan)
+ last_sent_pg_create.clear(); // reset pg_create throttle timer
}
- catch (const std::exception &e) {
- dout(0) << "update_from_paxos: error parsing "
- << "incremental update: " << e.what() << dendl;
- assert(0 == "update_from_paxos: error parsing incremental update");
- return;
+
+ } else if (format_version == 1) {
+ // pg/osd keys in leveldb
+
+ // read meta
+ epoch_t last_pg_scan = pg_map.last_pg_scan;
+
+ while (version > pg_map.version) {
+ // load full state?
+ if (pg_map.version == 0) {
+ dout(10) << __func__ << " v0, read_full" << dendl;
+ read_pgmap_full();
+ goto out;
+ }
+
+ // incremental state?
+ dout(10) << __func__ << " read_incremental" << dendl;
+ bufferlist bl;
+ int r = get_version(pg_map.version + 1, bl);
+ if (r == -ENOENT) {
+ dout(10) << __func__ << " failed to read_incremental, read_full" << dendl;
+ read_pgmap_full();
+ goto out;
+ }
+ assert(r == 0);
+ apply_pgmap_delta(bl);
}
- pg_map.apply_incremental(g_ceph_context, inc);
-
- dout(10) << pg_map << dendl;
+ read_pgmap_meta();
- if (inc.pg_scan)
+ out:
+ if (last_pg_scan != pg_map.last_pg_scan)
last_sent_pg_create.clear(); // reset pg_create throttle timer
}
assert(version == pg_map.version);
- /* If we dump the summaries onto the k/v store, they hardly would be useful
- * without a tool created with reading them in mind.
- * Comment this out until we decide what is the best course of action.
- *
- // dump pgmap summaries? (useful for debugging)
- if (0) {
- stringstream ds;
- pg_map.dump(ds);
- bufferlist d;
- d.append(ds);
- mon->store->put_bl_sn(d, "pgmap_dump", version);
- }
- */
-
- update_trim();
-
if (mon->osdmon()->osdmap.get_epoch()) {
map_pg_creates();
send_pg_creates();
update_logger();
}
+void PGMonitor::upgrade_format()
+{
+ unsigned current = 1;
+ assert(format_version <= current);
+ if (format_version == current)
+ return;
+
+ dout(1) << __func__ << " to " << current << dendl;
+
+ // upgrade by dirtying it all
+ pg_map.dirty_all(pending_inc);
+
+ format_version = current;
+ propose_pending();
+}
+
void PGMonitor::init()
{
if (mon->osdmon()->osdmap.get_epoch()) {
dout(10) << "create_pending v " << pending_inc.version << dendl;
}
+void PGMonitor::read_pgmap_meta()
+{
+ dout(10) << __func__ << dendl;
+
+ string prefix = "pgmap_meta";
+
+ version_t version = mon->store->get(prefix, "version");
+ epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch");
+ epoch_t last_pg_scan = mon->store->get(prefix, "last_pg_scan");
+ pg_map.set_version(version);
+ pg_map.set_last_osdmap_epoch(last_osdmap_epoch);
+ pg_map.set_last_pg_scan(last_pg_scan);
+
+ float full_ratio, nearfull_ratio;
+ {
+ bufferlist bl;
+ mon->store->get(prefix, "full_ratio", bl);
+ bufferlist::iterator p = bl.begin();
+ ::decode(full_ratio, p);
+ }
+ {
+ bufferlist bl;
+ mon->store->get(prefix, "nearfull_ratio", bl);
+ bufferlist::iterator p = bl.begin();
+ ::decode(nearfull_ratio, p);
+ }
+ pg_map.set_full_ratios(full_ratio, nearfull_ratio);
+ {
+ bufferlist bl;
+ mon->store->get(prefix, "stamp", bl);
+ bufferlist::iterator p = bl.begin();
+ utime_t stamp;
+ ::decode(stamp, p);
+ pg_map.set_stamp(stamp);
+ }
+}
+
+void PGMonitor::read_pgmap_full()
+{
+ read_pgmap_meta();
+
+ string prefix = "pgmap_pg";
+ for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
+ string key = i->key();
+ pg_t pgid;
+ if (!pgid.parse(key.c_str())) {
+ dout(0) << "unable to parse key " << key << dendl;
+ continue;
+ }
+ bufferlist bl = i->value();
+ pg_map.update_pg(pgid, bl);
+ dout(20) << " got " << pgid << dendl;
+ }
+
+ prefix = "pgmap_osd";
+ for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
+ string key = i->key();
+ int osd;
+ osd = atoi(key.c_str());
+ bufferlist bl = i->value();
+ pg_map.update_osd(osd, bl);
+ dout(20) << " got osd." << osd << dendl;
+ }
+}
+
+void PGMonitor::apply_pgmap_delta(bufferlist& bl)
+{
+ version_t v = pg_map.version + 1;
+
+ utime_t inc_stamp;
+ bufferlist dirty_pgs, dirty_osds;
+ {
+ bufferlist::iterator p = bl.begin();
+ ::decode(inc_stamp, p);
+ ::decode(dirty_pgs, p);
+ ::decode(dirty_osds, p);
+ }
+
+ pool_stat_t pg_sum_old = pg_map.pg_sum;
+
+ // pgs
+ bufferlist::iterator p = dirty_pgs.begin();
+ while (!p.end()) {
+ pg_t pgid;
+ ::decode(pgid, p);
+ dout(20) << " refreshing pg " << pgid << dendl;
+ bufferlist bl;
+ int r = mon->store->get("pgmap_pg", stringify(pgid), bl);
+ if (r >= 0) {
+ pg_map.update_pg(pgid, bl);
+ } else {
+ pg_map.remove_pg(pgid);
+ }
+ }
+
+ // osds
+ p = dirty_osds.begin();
+ while (!p.end()) {
+ int32_t osd;
+ ::decode(osd, p);
+ dout(20) << " refreshing osd." << osd << dendl;
+ bufferlist bl;
+ int r = mon->store->get("pgmap_osd", stringify(osd), bl);
+ if (r >= 0) {
+ pg_map.update_osd(osd, bl);
+ } else {
+ pg_map.remove_osd(osd);
+ }
+ }
+
+ pg_map.update_delta(g_ceph_context, inc_stamp, pg_sum_old);
+
+ // ok, we're now on the new version
+ pg_map.version = v;
+}
+
+
void PGMonitor::encode_pending(MonitorDBStore::Transaction *t)
{
version_t version = pending_inc.version;
assert(get_last_committed() + 1 == version);
pending_inc.stamp = ceph_clock_now(g_ceph_context);
- bufferlist bl;
- pending_inc.encode(bl, mon->get_quorum_features());
+ uint64_t features = mon->get_quorum_features();
- put_version(t, version, bl);
- put_last_committed(t, version);
-}
+ string prefix = "pgmap_meta";
-void PGMonitor::encode_full(MonitorDBStore::Transaction *t)
-{
- dout(10) << __func__ << " pgmap v " << pg_map.version << dendl;
- assert(get_last_committed() == pg_map.version);
+ t->put(prefix, "version", pending_inc.version);
+ {
+ bufferlist bl;
+ ::encode(pending_inc.stamp, bl);
+ t->put(prefix, "stamp", bl);
+ }
- bufferlist full_bl;
- pg_map.encode(full_bl, mon->get_quorum_features());
+ if (pending_inc.osdmap_epoch)
+ t->put(prefix, "last_osdmap_epoch", pending_inc.osdmap_epoch);
+ if (pending_inc.pg_scan)
+ t->put(prefix, "last_pg_scan", pending_inc.pg_scan);
+ if (pending_inc.full_ratio > 0) {
+ bufferlist bl;
+ ::encode(pending_inc.full_ratio, bl);
+ t->put(prefix, "full_ratio", bl);
+ }
+ if (pending_inc.nearfull_ratio > 0) {
+ bufferlist bl;
+ ::encode(pending_inc.nearfull_ratio, bl);
+ t->put(prefix, "nearfull_ratio", bl);
+ }
- put_version_full(t, pg_map.version, full_bl);
- put_version_latest_full(t, pg_map.version);
+ bufferlist incbl;
+ ::encode(pending_inc.stamp, incbl);
+ {
+ bufferlist dirty;
+ string prefix = "pgmap_pg";
+ for (map<pg_t,pg_stat_t>::const_iterator p = pending_inc.pg_stat_updates.begin();
+ p != pending_inc.pg_stat_updates.end();
+ ++p) {
+ ::encode(p->first, dirty);
+ bufferlist bl;
+ ::encode(p->second, bl, features);
+ t->put(prefix, stringify(p->first), bl);
+ }
+ for (set<pg_t>::const_iterator p = pending_inc.pg_remove.begin(); p != pending_inc.pg_remove.end(); ++p) {
+ ::encode(*p, dirty);
+ t->erase(prefix, stringify(*p));
+ }
+ ::encode(dirty, incbl);
+ }
+ {
+ bufferlist dirty;
+ string prefix = "pgmap_osd";
+ for (map<int32_t,osd_stat_t>::const_iterator p = pending_inc.osd_stat_updates.begin();
+ p != pending_inc.osd_stat_updates.end();
+ ++p) {
+ ::encode(p->first, dirty);
+ bufferlist bl;
+ ::encode(p->second, bl, features);
+ t->put(prefix, stringify(p->first), bl);
+ }
+ for (set<int32_t>::const_iterator p = pending_inc.osd_stat_rm.begin(); p != pending_inc.osd_stat_rm.end(); ++p) {
+ ::encode(*p, dirty);
+ t->erase(prefix, stringify(*p));
+ }
+ ::encode(dirty, incbl);
+ }
+
+ put_version(t, version, incbl);
+
+ put_last_committed(t, version);
}
void PGMonitor::update_trim()
set_trim_to(version - max);
}
-
bool PGMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;