#include "osd/OSDMap.h"
+#include "MonitorStore.h"
#include "MonitorDBStore.h"
#include "msg/Messenger.h"
CompatSet::FeatureSet ceph_mon_feature_ro_compat;
CompatSet::FeatureSet ceph_mon_feature_incompat;
ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
- ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_GV);
+ ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
ceph_mon_feature_incompat);
}
}
return true;
};
+
+#undef dout_prefix
+#define dout_prefix *_dout
+
+void Monitor::StoreConverter::_convert_finish_features(
+ MonitorDBStore::Transaction &t)
+{
+ dout(20) << __func__ << dendl;
+
+ assert(db->exists(MONITOR_NAME, COMPAT_SET_LOC));
+ bufferlist features_bl;
+ db->get(MONITOR_NAME, COMPAT_SET_LOC, features_bl);
+ assert(features_bl.length());
+
+ CompatSet features;
+ bufferlist::iterator p = features_bl.begin();
+ features.decode(p);
+
+ assert(features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV));
+ features.incompat.remove(CEPH_MON_FEATURE_INCOMPAT_GV);
+ assert(!features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV));
+
+ features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
+ assert(features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS));
+
+ features_bl.clear();
+ features.encode(features_bl);
+
+ dout(20) << __func__ << " new features " << features << dendl;
+ t.put(MONITOR_NAME, COMPAT_SET_LOC, features_bl);
+}
+
+
+bool Monitor::StoreConverter::_check_gv_store()
+{
+ dout(20) << __func__ << dendl;
+ if (!store->exists_bl_ss(COMPAT_SET_LOC, 0))
+ return false;
+
+ bufferlist features_bl;
+ store->get_bl_ss_safe(features_bl, COMPAT_SET_LOC, 0);
+ if (!features_bl.length()) {
+ dout(20) << __func__ << " on-disk features length is zero" << dendl;
+ return false;
+ }
+ CompatSet features;
+ bufferlist::iterator p = features_bl.begin();
+ features.decode(p);
+ return (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV));
+}
+
+bool Monitor::StoreConverter::needs_conversion()
+{
+ bool ret = false;
+
+ dout(10) << __func__ << dendl;
+ _init();
+ if (db->open(std::cerr) < 0) {
+ dout(1) << "unable to open monitor store at " << g_conf->mon_data << dendl;
+ dout(1) << "check for old monitor store format" << dendl;
+ assert(!store->mount());
+ bufferlist magicbl;
+ if (store->exists_bl_ss("magic", 0)) {
+ if (_check_gv_store()) {
+ dout(1) << "found old GV monitor store format "
+ << "-- should convert!" << dendl;
+ ret = true;
+ } else {
+ dout(0) << "Existing monitor store has not been converted "
+ << "to 0.52 (bobtail) format" << dendl;
+ assert(0 == "Existing store has not been converted to 0.52 format");
+ }
+ }
+ assert(!store->umount());
+ }
+ _deinit();
+ return ret;
+}
+
+int Monitor::StoreConverter::convert()
+{
+ _init();
+ assert(!db->create_and_open(std::cerr));
+ assert(!store->mount());
+ if (db->exists("mon_convert", "on_going")) {
+ dout(0) << __func__ << " found a mon store in mid-convertion; abort!"
+ << dendl;
+ return -EEXIST;
+ }
+
+ _mark_convert_start();
+ _convert_monitor();
+ _convert_machines();
+ _convert_paxos();
+ _mark_convert_finish();
+
+ store->umount();
+ _deinit();
+
+ dout(0) << __func__ << " finished conversion" << dendl;
+
+ return 0;
+}
+
+void Monitor::StoreConverter::_convert_monitor()
+{
+ dout(10) << __func__ << dendl;
+
+ assert(store->exists_bl_ss("magic"));
+ assert(store->exists_bl_ss("keyring"));
+ assert(store->exists_bl_ss("feature_set"));
+ assert(store->exists_bl_ss("election_epoch"));
+
+ MonitorDBStore::Transaction tx;
+
+ if (store->exists_bl_ss("joined")) {
+ version_t joined = store->get_int("joined");
+ tx.put(MONITOR_NAME, "joined", joined);
+ }
+
+ vector<string> keys;
+ keys.push_back("magic");
+ keys.push_back("feature_set");
+ keys.push_back("cluster_uuid");
+
+ vector<string>::iterator it;
+ for (it = keys.begin(); it != keys.end(); ++it) {
+ if (!store->exists_bl_ss((*it).c_str()))
+ continue;
+
+ bufferlist bl;
+ int r = store->get_bl_ss(bl, (*it).c_str(), 0);
+ assert(r > 0);
+ tx.put(MONITOR_NAME, *it, bl);
+ }
+ version_t election_epoch = store->get_int("election_epoch");
+ tx.put(MONITOR_NAME, "election_epoch", election_epoch);
+
+ assert(!tx.empty());
+ db->apply_transaction(tx);
+ dout(10) << __func__ << " finished" << dendl;
+}
+
+void Monitor::StoreConverter::_convert_machines(string machine)
+{
+ dout(10) << __func__ << " " << machine << dendl;
+
+ version_t first_committed =
+ store->get_int(machine.c_str(), "first_committed");
+ version_t last_committed =
+ store->get_int(machine.c_str(), "last_committed");
+
+ version_t accepted_pn = store->get_int(machine.c_str(), "accepted_pn");
+ version_t last_pn = store->get_int(machine.c_str(), "last_pn");
+
+ if (accepted_pn > highest_accepted_pn)
+ highest_accepted_pn = accepted_pn;
+ if (last_pn > highest_last_pn)
+ highest_last_pn = last_pn;
+
+ string machine_gv(machine);
+ machine_gv.append("_gv");
+ bool has_gv = true;
+
+ if (!store->exists_bl_ss(machine_gv.c_str())) {
+ dout(1) << __func__ << " " << machine
+ << " no gv dir '" << machine_gv << "'" << dendl;
+ has_gv = false;
+ }
+
+ for (version_t ver = first_committed; ver <= last_committed; ver++) {
+ if (!store->exists_bl_sn(machine.c_str(), ver)) {
+ dout(20) << __func__ << " " << machine
+ << " ver " << ver << " dne" << dendl;
+ continue;
+ }
+
+ bufferlist bl;
+ int r = store->get_bl_sn(bl, machine.c_str(), ver);
+ assert(r >= 0);
+ dout(20) << __func__ << " " << machine
+ << " ver " << ver << " bl " << bl.length() << dendl;
+
+ MonitorDBStore::Transaction tx;
+ tx.put(machine, ver, bl);
+ tx.put(machine, "last_committed", ver);
+
+ if (has_gv && store->exists_bl_sn(machine_gv.c_str(), ver)) {
+ stringstream s;
+ s << ver;
+ string ver_str = s.str();
+
+ version_t gv = store->get_int(machine_gv.c_str(), ver_str.c_str());
+ dout(20) << __func__ << " " << machine
+ << " ver " << ver << " -> " << gv << dendl;
+
+ if (gvs.count(gv) == 0) {
+ gvs.insert(gv);
+ } else {
+ dout(0) << __func__ << " " << machine
+ << " gv " << gv << " already exists"
+ << dendl;
+ assert(0 == "Duplicate GV -- something is wrong!");
+ }
+
+ bufferlist tx_bl;
+ tx.encode(tx_bl);
+ tx.put("paxos", gv, tx_bl);
+ }
+ db->apply_transaction(tx);
+ }
+
+ version_t lc = db->get(machine, "last_committed");
+ assert(lc == last_committed);
+
+ MonitorDBStore::Transaction tx;
+ tx.put(machine, "first_committed", first_committed);
+ tx.put(machine, "last_committed", last_committed);
+ tx.put(machine, "conversion_first", first_committed);
+
+ if (store->exists_bl_ss(machine.c_str(), "latest")) {
+ bufferlist latest_bl_raw;
+ int r = store->get_bl_ss(latest_bl_raw, machine.c_str(), "latest");
+ assert(r >= 0);
+ if (!latest_bl_raw.length()) {
+ dout(20) << __func__ << " machine " << machine
+ << " skip latest with size 0" << dendl;
+ goto out;
+ }
+
+ tx.put(machine, "latest", latest_bl_raw);
+
+ bufferlist::iterator lbl_it = latest_bl_raw.begin();
+ bufferlist latest_bl;
+ version_t latest_ver;
+ ::decode(latest_ver, lbl_it);
+ ::decode(latest_bl, lbl_it);
+
+ dout(20) << __func__ << " machine " << machine
+ << " latest ver " << latest_ver << dendl;
+
+ tx.put(machine, "full_latest", latest_ver);
+ stringstream os;
+ os << "full_" << latest_ver;
+ tx.put(machine, os.str(), latest_bl);
+ }
+out:
+ db->apply_transaction(tx);
+ dout(10) << __func__ << " machine " << machine << " finished" << dendl;
+}
+
+void Monitor::StoreConverter::_convert_paxos()
+{
+ dout(10) << __func__ << dendl;
+ assert(gvs.size() > 0);
+
+ set<version_t>::reverse_iterator rit = gvs.rbegin();
+ version_t highest_gv = *rit;
+ version_t last_gv = highest_gv;
+
+ int n = 0;
+ int max_versions = (g_conf->paxos_max_join_drift*2);
+ for (; (rit != gvs.rend()) && (n < max_versions); ++rit, ++n) {
+ version_t gv = *rit;
+
+ if (last_gv == gv)
+ continue;
+ if ((last_gv - gv) > 1) {
+ // we are done; we found a gap and we are only interested in keeping
+ // contiguous paxos versions.
+ break;
+ }
+ last_gv = gv;
+ }
+
+ // erase all paxos versions between [first, last_gv[, with first being the
+ // first gv in the map.
+ MonitorDBStore::Transaction tx;
+ set<version_t>::iterator it = gvs.begin();
+ dout(1) << __func__ << " first gv " << (*it)
+ << " last gv " << last_gv << dendl;
+ for (; it != gvs.end() && (*it < last_gv); ++it) {
+ tx.erase("paxos", *it);
+ }
+ tx.put("paxos", "first_committed", last_gv);
+ tx.put("paxos", "last_committed", highest_gv);
+ tx.put("paxos", "accepted_pn", highest_accepted_pn);
+ tx.put("paxos", "last_pn", highest_last_pn);
+ tx.put("paxos", "conversion_first", last_gv);
+ db->apply_transaction(tx);
+
+ dout(10) << __func__ << " finished" << dendl;
+}
+
+void Monitor::StoreConverter::_convert_machines()
+{
+ dout(10) << __func__ << dendl;
+ set<string> machine_names = _get_machines_names();
+ set<string>::iterator it = machine_names.begin();
+
+ for (; it != machine_names.end(); ++it) {
+ _convert_machines(*it);
+ }
+ dout(10) << __func__ << " finished" << dendl;
+}
#include "perfglue/heap_profiler.h"
#include "messages/MMonCommand.h"
+#include "mon/MonitorStore.h"
#include "mon/MonitorDBStore.h"
#include <memory>
// don't allow copying
Monitor(const Monitor& rhs);
Monitor& operator=(const Monitor &rhs);
+
+public:
+ class StoreConverter {
+ const string path;
+ boost::scoped_ptr<MonitorDBStore> db;
+ boost::scoped_ptr<MonitorStore> store;
+
+ set<version_t> gvs;
+ version_t highest_last_pn;
+ version_t highest_accepted_pn;
+
+ public:
+ StoreConverter(const string &path)
+ : path(path), db(NULL), store(NULL),
+ highest_last_pn(0), highest_accepted_pn(0)
+ { }
+
+ bool needs_conversion();
+ int convert();
+
+ private:
+
+ bool _check_gv_store();
+
+ void _init() {
+ MonitorDBStore *db_ptr = new MonitorDBStore(path);
+ db.reset(db_ptr);
+
+ MonitorStore *store_ptr = new MonitorStore(path);
+ store.reset(store_ptr);
+ }
+
+ void _deinit() {
+ db.reset(NULL);
+ store.reset(NULL);
+ }
+
+ set<string> _get_machines_names() {
+ set<string> names;
+ names.insert("auth");
+ names.insert("logm");
+ names.insert("mdsmap");
+ names.insert("monmap");
+ names.insert("osdmap");
+ names.insert("pgmap");
+
+ return names;
+ }
+
+ void _mark_convert_start() {
+ MonitorDBStore::Transaction tx;
+ tx.put("mon_convert", "on_going", 1);
+ db->apply_transaction(tx);
+ }
+
+ void _convert_finish_features(MonitorDBStore::Transaction &t);
+ void _mark_convert_finish() {
+ MonitorDBStore::Transaction tx;
+ tx.erase("mon_convert", "on_going");
+ _convert_finish_features(tx);
+ db->apply_transaction(tx);
+ }
+
+ void _convert_monitor();
+ void _convert_machines(string machine);
+ void _convert_machines();
+ void _convert_paxos();
+ };
};
#define CEPH_MON_FEATURE_INCOMPAT_BASE CompatSet::Feature (1, "initial feature set (~v.18)")
#define CEPH_MON_FEATURE_INCOMPAT_GV CompatSet::Feature (2, "global version sequencing (v0.52)")
+#define CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS CompatSet::Feature (3, "single paxos with k/v store (v0.\?)")
long parse_pos_long(const char *s, ostream *pss = NULL);
return true;
}
+void PaxosService::scrub()
+{
+ dout(10) << __func__ << dendl;
+ if (!mon->store->exists(get_service_name(), "conversion_first"))
+ return;
+
+ version_t cf = mon->store->get(get_service_name(), "conversion_first");
+ version_t fc = get_first_committed();
+
+ dout(10) << __func__ << " conversion_first " << cf
+ << " first committed " << fc << dendl;
+
+ MonitorDBStore::Transaction t;
+ if (cf < fc) {
+ trim(&t, cf, fc);
+ }
+ t.erase(get_service_name(), "conversion_first");
+ mon->store->apply_transaction(t);
+}
+
bool PaxosService::should_propose(double& delay)
{
// simple default policy: quick startup, then some damping.
// pull latest from paxos
update_from_paxos();
+ scrub();
+
// create pending state?
if (mon->is_leader() && is_active()) {
dout(7) << "_active creating new pending" << dendl;
finish_contexts(g_ceph_context, waiting_for_finished_proposal);
}
+void PaxosService::trim(MonitorDBStore::Transaction *t,
+ version_t from, version_t to)
+{
+ dout(10) << __func__ << " from " << from << " to " << to << dendl;
+ assert(from != to);
+ for (; from < to; from++) {
+ dout(20) << __func__ << " " << from << dendl;
+ t->erase(get_service_name(), from);
+
+ string full_key = mon->store->combine_strings("full", from);
+ if (mon->store->exists(get_service_name(), full_key)) {
+ dout(20) << __func__ << " " << full_key << dendl;
+ t->erase(get_service_name(), full_key);
+ }
+ }
+}
+
void PaxosService::encode_trim(MonitorDBStore::Transaction *t)
{
version_t first_committed = get_first_committed();
version_t latest_full = get_version("full", "latest");
version_t trim_to = get_trim_to();
- string latest_key = mon->store->combine_strings("full", latest_full);
- bool has_full = mon->store->exists(get_service_name(), latest_key);
-
dout(10) << __func__ << " " << trim_to << " (was " << first_committed << ")"
<< ", latest full " << latest_full << dendl;
if (first_committed >= trim_to)
return;
- while ((first_committed < trim_to)) {
- dout(20) << __func__ << first_committed << dendl;
- t->erase(get_service_name(), first_committed);
-
- if (has_full) {
- latest_key = mon->store->combine_strings("full", first_committed);
- t->erase(get_service_name(), latest_key);
- }
-
- first_committed++;
- }
- put_first_committed(t, first_committed);
+ trim(t, first_committed, trim_to);
+ put_first_committed(t, trim_to);
}