From: Joao Eduardo Luis Date: Wed, 20 Feb 2013 18:46:34 +0000 (+0000) Subject: mon: ceph-mon: convert an old monitor store to the new format X-Git-Tag: v0.59~150^2~1^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cb85fb7d9a1da5a8f194bd9406c7df49da3c2e33;p=ceph.git mon: ceph-mon: convert an old monitor store to the new format With the single-paxos patches we shifted from an approach with multiple paxos instances (one for each paxos service) keeping their own versions to a single paxos instance for all the paxos services, thus ending up with a single global version for paxos. With the release of v0.52, the monitor started tracking these global versions, keeping them for the single purpose of making it possible to convert the store to a single-paxos format. This patch now introduces a mechanism to convert a GV-enabled store to the single-paxos format store when the monitor is upgraded. As we require the global versions to be present, we first check if the store has the GV feature set: if not we will not proceed, but we will start the conversion otherwise. In the end of the conversion, the monitor data directory will have a brand new 'store.db' directory, where the key/value store lies, alongside with the old store. This makes it possible to revert to a previous monitor version if things go sideways, without jeopardizing the data in the store. The conversion is done as during a rolling upgrade, without any intervention by the user. Fire up the new monitor version on an old store, and the monitor itself will convert the store, trim any lingering versions that might not be required, and proceed to start as expected. Signed-off-by: Joao Eduardo Luis --- diff --git a/src/Makefile.am b/src/Makefile.am index 9b9d7ed00459..45efa85544a7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1823,6 +1823,7 @@ noinst_HEADERS = \ mon/MonClient.h\ mon/MonMap.h\ mon/Monitor.h\ + mon/MonitorStore.h\ mon/MonitorDBStore.h\ mon/OSDMonitor.h\ mon/PGMap.h\ diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 51503e49ba6d..c4ebf6b17fd0 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -216,6 +216,12 @@ int main(int argc, const char **argv) return 0; } + { + Monitor::StoreConverter converter(g_conf->mon_data); + if (converter.needs_conversion()) + assert(!converter.convert()); + } + MonitorDBStore store(g_conf->mon_data); assert(!store.open(std::cerr)); @@ -368,7 +374,8 @@ int main(int argc, const char **argv) messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossless_peer_reuse(supported, CEPH_FEATURE_UID | - CEPH_FEATURE_PGID64)); + CEPH_FEATURE_PGID64 | + CEPH_FEATURE_MON_SINGLE_PAXOS)); messenger->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::stateless_server(supported, CEPH_FEATURE_PGID64 | diff --git a/src/include/CompatSet.h b/src/include/CompatSet.h index 797c37689879..ac532fcef816 100644 --- a/src/include/CompatSet.h +++ b/src/include/CompatSet.h @@ -52,7 +52,10 @@ struct CompatSet { names.erase(f); mask &= ~(1<exists(MONITOR_NAME, COMPAT_SET_LOC)); + bufferlist features_bl; + db->get(MONITOR_NAME, COMPAT_SET_LOC, features_bl); + assert(features_bl.length()); + + CompatSet features; + bufferlist::iterator p = features_bl.begin(); + features.decode(p); + + assert(features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV)); + features.incompat.remove(CEPH_MON_FEATURE_INCOMPAT_GV); + assert(!features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV)); + + features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS); + assert(features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS)); + + features_bl.clear(); + features.encode(features_bl); + + dout(20) << __func__ << " new features " << features << dendl; + t.put(MONITOR_NAME, COMPAT_SET_LOC, features_bl); +} + + +bool Monitor::StoreConverter::_check_gv_store() +{ + dout(20) << __func__ << dendl; + if (!store->exists_bl_ss(COMPAT_SET_LOC, 0)) + return false; + + bufferlist features_bl; + store->get_bl_ss_safe(features_bl, COMPAT_SET_LOC, 0); + if (!features_bl.length()) { + dout(20) << __func__ << " on-disk features length is zero" << dendl; + return false; + } + CompatSet features; + bufferlist::iterator p = features_bl.begin(); + features.decode(p); + return (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV)); +} + +bool Monitor::StoreConverter::needs_conversion() +{ + bool ret = false; + + dout(10) << __func__ << dendl; + _init(); + if (db->open(std::cerr) < 0) { + dout(1) << "unable to open monitor store at " << g_conf->mon_data << dendl; + dout(1) << "check for old monitor store format" << dendl; + assert(!store->mount()); + bufferlist magicbl; + if (store->exists_bl_ss("magic", 0)) { + if (_check_gv_store()) { + dout(1) << "found old GV monitor store format " + << "-- should convert!" << dendl; + ret = true; + } else { + dout(0) << "Existing monitor store has not been converted " + << "to 0.52 (bobtail) format" << dendl; + assert(0 == "Existing store has not been converted to 0.52 format"); + } + } + assert(!store->umount()); + } + _deinit(); + return ret; +} + +int Monitor::StoreConverter::convert() +{ + _init(); + assert(!db->create_and_open(std::cerr)); + assert(!store->mount()); + if (db->exists("mon_convert", "on_going")) { + dout(0) << __func__ << " found a mon store in mid-convertion; abort!" + << dendl; + return -EEXIST; + } + + _mark_convert_start(); + _convert_monitor(); + _convert_machines(); + _convert_paxos(); + _mark_convert_finish(); + + store->umount(); + _deinit(); + + dout(0) << __func__ << " finished conversion" << dendl; + + return 0; +} + +void Monitor::StoreConverter::_convert_monitor() +{ + dout(10) << __func__ << dendl; + + assert(store->exists_bl_ss("magic")); + assert(store->exists_bl_ss("keyring")); + assert(store->exists_bl_ss("feature_set")); + assert(store->exists_bl_ss("election_epoch")); + + MonitorDBStore::Transaction tx; + + if (store->exists_bl_ss("joined")) { + version_t joined = store->get_int("joined"); + tx.put(MONITOR_NAME, "joined", joined); + } + + vector keys; + keys.push_back("magic"); + keys.push_back("feature_set"); + keys.push_back("cluster_uuid"); + + vector::iterator it; + for (it = keys.begin(); it != keys.end(); ++it) { + if (!store->exists_bl_ss((*it).c_str())) + continue; + + bufferlist bl; + int r = store->get_bl_ss(bl, (*it).c_str(), 0); + assert(r > 0); + tx.put(MONITOR_NAME, *it, bl); + } + version_t election_epoch = store->get_int("election_epoch"); + tx.put(MONITOR_NAME, "election_epoch", election_epoch); + + assert(!tx.empty()); + db->apply_transaction(tx); + dout(10) << __func__ << " finished" << dendl; +} + +void Monitor::StoreConverter::_convert_machines(string machine) +{ + dout(10) << __func__ << " " << machine << dendl; + + version_t first_committed = + store->get_int(machine.c_str(), "first_committed"); + version_t last_committed = + store->get_int(machine.c_str(), "last_committed"); + + version_t accepted_pn = store->get_int(machine.c_str(), "accepted_pn"); + version_t last_pn = store->get_int(machine.c_str(), "last_pn"); + + if (accepted_pn > highest_accepted_pn) + highest_accepted_pn = accepted_pn; + if (last_pn > highest_last_pn) + highest_last_pn = last_pn; + + string machine_gv(machine); + machine_gv.append("_gv"); + bool has_gv = true; + + if (!store->exists_bl_ss(machine_gv.c_str())) { + dout(1) << __func__ << " " << machine + << " no gv dir '" << machine_gv << "'" << dendl; + has_gv = false; + } + + for (version_t ver = first_committed; ver <= last_committed; ver++) { + if (!store->exists_bl_sn(machine.c_str(), ver)) { + dout(20) << __func__ << " " << machine + << " ver " << ver << " dne" << dendl; + continue; + } + + bufferlist bl; + int r = store->get_bl_sn(bl, machine.c_str(), ver); + assert(r >= 0); + dout(20) << __func__ << " " << machine + << " ver " << ver << " bl " << bl.length() << dendl; + + MonitorDBStore::Transaction tx; + tx.put(machine, ver, bl); + tx.put(machine, "last_committed", ver); + + if (has_gv && store->exists_bl_sn(machine_gv.c_str(), ver)) { + stringstream s; + s << ver; + string ver_str = s.str(); + + version_t gv = store->get_int(machine_gv.c_str(), ver_str.c_str()); + dout(20) << __func__ << " " << machine + << " ver " << ver << " -> " << gv << dendl; + + if (gvs.count(gv) == 0) { + gvs.insert(gv); + } else { + dout(0) << __func__ << " " << machine + << " gv " << gv << " already exists" + << dendl; + assert(0 == "Duplicate GV -- something is wrong!"); + } + + bufferlist tx_bl; + tx.encode(tx_bl); + tx.put("paxos", gv, tx_bl); + } + db->apply_transaction(tx); + } + + version_t lc = db->get(machine, "last_committed"); + assert(lc == last_committed); + + MonitorDBStore::Transaction tx; + tx.put(machine, "first_committed", first_committed); + tx.put(machine, "last_committed", last_committed); + tx.put(machine, "conversion_first", first_committed); + + if (store->exists_bl_ss(machine.c_str(), "latest")) { + bufferlist latest_bl_raw; + int r = store->get_bl_ss(latest_bl_raw, machine.c_str(), "latest"); + assert(r >= 0); + if (!latest_bl_raw.length()) { + dout(20) << __func__ << " machine " << machine + << " skip latest with size 0" << dendl; + goto out; + } + + tx.put(machine, "latest", latest_bl_raw); + + bufferlist::iterator lbl_it = latest_bl_raw.begin(); + bufferlist latest_bl; + version_t latest_ver; + ::decode(latest_ver, lbl_it); + ::decode(latest_bl, lbl_it); + + dout(20) << __func__ << " machine " << machine + << " latest ver " << latest_ver << dendl; + + tx.put(machine, "full_latest", latest_ver); + stringstream os; + os << "full_" << latest_ver; + tx.put(machine, os.str(), latest_bl); + } +out: + db->apply_transaction(tx); + dout(10) << __func__ << " machine " << machine << " finished" << dendl; +} + +void Monitor::StoreConverter::_convert_paxos() +{ + dout(10) << __func__ << dendl; + assert(gvs.size() > 0); + + set::reverse_iterator rit = gvs.rbegin(); + version_t highest_gv = *rit; + version_t last_gv = highest_gv; + + int n = 0; + int max_versions = (g_conf->paxos_max_join_drift*2); + for (; (rit != gvs.rend()) && (n < max_versions); ++rit, ++n) { + version_t gv = *rit; + + if (last_gv == gv) + continue; + if ((last_gv - gv) > 1) { + // we are done; we found a gap and we are only interested in keeping + // contiguous paxos versions. + break; + } + last_gv = gv; + } + + // erase all paxos versions between [first, last_gv[, with first being the + // first gv in the map. + MonitorDBStore::Transaction tx; + set::iterator it = gvs.begin(); + dout(1) << __func__ << " first gv " << (*it) + << " last gv " << last_gv << dendl; + for (; it != gvs.end() && (*it < last_gv); ++it) { + tx.erase("paxos", *it); + } + tx.put("paxos", "first_committed", last_gv); + tx.put("paxos", "last_committed", highest_gv); + tx.put("paxos", "accepted_pn", highest_accepted_pn); + tx.put("paxos", "last_pn", highest_last_pn); + tx.put("paxos", "conversion_first", last_gv); + db->apply_transaction(tx); + + dout(10) << __func__ << " finished" << dendl; +} + +void Monitor::StoreConverter::_convert_machines() +{ + dout(10) << __func__ << dendl; + set machine_names = _get_machines_names(); + set::iterator it = machine_names.begin(); + + for (; it != machine_names.end(); ++it) { + _convert_machines(*it); + } + dout(10) << __func__ << " finished" << dendl; +} diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index efed1b65d056..652036a70b20 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -45,6 +45,7 @@ #include "perfglue/heap_profiler.h" #include "messages/MMonCommand.h" +#include "mon/MonitorStore.h" #include "mon/MonitorDBStore.h" #include @@ -1400,10 +1401,79 @@ private: // don't allow copying Monitor(const Monitor& rhs); Monitor& operator=(const Monitor &rhs); + +public: + class StoreConverter { + const string path; + boost::scoped_ptr db; + boost::scoped_ptr store; + + set gvs; + version_t highest_last_pn; + version_t highest_accepted_pn; + + public: + StoreConverter(const string &path) + : path(path), db(NULL), store(NULL), + highest_last_pn(0), highest_accepted_pn(0) + { } + + bool needs_conversion(); + int convert(); + + private: + + bool _check_gv_store(); + + void _init() { + MonitorDBStore *db_ptr = new MonitorDBStore(path); + db.reset(db_ptr); + + MonitorStore *store_ptr = new MonitorStore(path); + store.reset(store_ptr); + } + + void _deinit() { + db.reset(NULL); + store.reset(NULL); + } + + set _get_machines_names() { + set names; + names.insert("auth"); + names.insert("logm"); + names.insert("mdsmap"); + names.insert("monmap"); + names.insert("osdmap"); + names.insert("pgmap"); + + return names; + } + + void _mark_convert_start() { + MonitorDBStore::Transaction tx; + tx.put("mon_convert", "on_going", 1); + db->apply_transaction(tx); + } + + void _convert_finish_features(MonitorDBStore::Transaction &t); + void _mark_convert_finish() { + MonitorDBStore::Transaction tx; + tx.erase("mon_convert", "on_going"); + _convert_finish_features(tx); + db->apply_transaction(tx); + } + + void _convert_monitor(); + void _convert_machines(string machine); + void _convert_machines(); + void _convert_paxos(); + }; }; #define CEPH_MON_FEATURE_INCOMPAT_BASE CompatSet::Feature (1, "initial feature set (~v.18)") #define CEPH_MON_FEATURE_INCOMPAT_GV CompatSet::Feature (2, "global version sequencing (v0.52)") +#define CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS CompatSet::Feature (3, "single paxos with k/v store (v0.\?)") long parse_pos_long(const char *s, ostream *pss = NULL); diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 802c7a8e0f38..6b5fc91179ca 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -324,6 +324,18 @@ void Paxos::store_state(MMonPaxos *m) first_committed = get_store()->get(get_name(), "first_committed"); last_committed = get_store()->get(get_name(), "last_committed"); } + + if (get_store()->exists(get_name(), "conversion_first")) { + MonitorDBStore::Transaction scrub_tx; + version_t cf = get_store()->get(get_name(), "conversion_first"); + dout(10) << __func__ << " scrub paxos from " << cf + << " to " << first_committed << dendl; + if (cf < first_committed) + trim_to(&scrub_tx, cf, first_committed); + scrub_tx.erase(get_name(), "conversion_first"); + get_store()->apply_transaction(scrub_tx); + } + } // leader @@ -923,6 +935,17 @@ void Paxos::lease_renew_timeout() /* * trim old states */ +void Paxos::trim_to(MonitorDBStore::Transaction *t, + version_t from, version_t to) { + dout(10) << __func__ << " from " << from << " to " << to << dendl; + assert(from < to); + + while (from < to) { + dout(10) << "trim " << from << dendl; + t->erase(get_name(), from); + from++; + } +} void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first) { @@ -931,13 +954,8 @@ void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first) if (first_committed >= first) return; - - while (first_committed < first) { - dout(10) << "trim " << first_committed << dendl; - t->erase(get_name(), first_committed); - first_committed++; - } - t->put(get_name(), "first_committed", first_committed); + trim_to(t, first_committed, first); + t->put(get_name(), "first_committed", first); } void Paxos::trim_to(version_t first) diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index 61da2c3fa193..ccabb4380289 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -1139,6 +1139,15 @@ public: * @param first The version we are trimming to */ void trim_to(MonitorDBStore::Transaction *t, version_t first); + /** + * Auxiliary function to erase states in the interval [from, to[ from stable + * storage. + * + * @param t A transaction + * @param from Bottom limit of the interval of versions to erase + * @param to Upper limit, not including, of the interval of versions to erase + */ + void trim_to(MonitorDBStore::Transaction *t, version_t from, version_t to); /** * Trim the Paxos state as much as we can. */ diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index c8230529428c..8495870e9a04 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -84,6 +84,26 @@ bool PaxosService::dispatch(PaxosServiceMessage *m) return true; } +void PaxosService::scrub() +{ + dout(10) << __func__ << dendl; + if (!mon->store->exists(get_service_name(), "conversion_first")) + return; + + version_t cf = mon->store->get(get_service_name(), "conversion_first"); + version_t fc = get_first_committed(); + + dout(10) << __func__ << " conversion_first " << cf + << " first committed " << fc << dendl; + + MonitorDBStore::Transaction t; + if (cf < fc) { + trim(&t, cf, fc); + } + t.erase(get_service_name(), "conversion_first"); + mon->store->apply_transaction(t); +} + bool PaxosService::should_propose(double& delay) { // simple default policy: quick startup, then some damping. @@ -213,6 +233,8 @@ void PaxosService::_active() // pull latest from paxos update_from_paxos(); + scrub(); + // create pending state? if (mon->is_leader() && is_active()) { dout(7) << "_active creating new pending" << dendl; @@ -282,32 +304,36 @@ void PaxosService::wakeup_proposing_waiters() finish_contexts(g_ceph_context, waiting_for_finished_proposal); } +void PaxosService::trim(MonitorDBStore::Transaction *t, + version_t from, version_t to) +{ + dout(10) << __func__ << " from " << from << " to " << to << dendl; + assert(from != to); + for (; from < to; from++) { + dout(20) << __func__ << " " << from << dendl; + t->erase(get_service_name(), from); + + string full_key = mon->store->combine_strings("full", from); + if (mon->store->exists(get_service_name(), full_key)) { + dout(20) << __func__ << " " << full_key << dendl; + t->erase(get_service_name(), full_key); + } + } +} + void PaxosService::encode_trim(MonitorDBStore::Transaction *t) { version_t first_committed = get_first_committed(); version_t latest_full = get_version("full", "latest"); version_t trim_to = get_trim_to(); - string latest_key = mon->store->combine_strings("full", latest_full); - bool has_full = mon->store->exists(get_service_name(), latest_key); - dout(10) << __func__ << " " << trim_to << " (was " << first_committed << ")" << ", latest full " << latest_full << dendl; if (first_committed >= trim_to) return; - while ((first_committed < trim_to)) { - dout(20) << __func__ << first_committed << dendl; - t->erase(get_service_name(), first_committed); - - if (has_full) { - latest_key = mon->store->combine_strings("full", first_committed); - t->erase(get_service_name(), latest_key); - } - - first_committed++; - } - put_first_committed(t, first_committed); + trim(t, first_committed, trim_to); + put_first_committed(t, trim_to); } diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h index 3231af859ddd..cfe764160eda 100644 --- a/src/mon/PaxosService.h +++ b/src/mon/PaxosService.h @@ -239,6 +239,11 @@ private: * active */ void _active(); + /** + * Scrub our versions after we convert the store from the old layout to + * the new k/v store. + */ + void scrub(); public: /** @@ -564,6 +569,15 @@ public: * @defgroup PaxosService_h_Trim * @{ */ + /** + * Auxiliary function to trim our state from version @from to version @to, + * not including; i.e., the interval [from, to[ + * + * @param t The transaction to which we will add the trim operations. + * @param from the lower limit of the interval to be trimmed + * @param to the upper limit of the interval to be trimmed (not including) + */ + void trim(MonitorDBStore::Transaction *t, version_t from, version_t to); /** * Trim our log. This implies getting rid of versions on the k/v store. * Services implementing us don't have to implement this function if they