]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: ceph-mon: convert an old monitor store to the new format
authorJoao Eduardo Luis <joao.luis@inktank.com>
Wed, 20 Feb 2013 18:46:34 +0000 (18:46 +0000)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Thu, 21 Feb 2013 18:02:23 +0000 (18:02 +0000)
With the single-paxos patches we shifted from an approach with multiple
paxos instances (one for each paxos service) keeping their own versions
to a single paxos instance for all the paxos services, thus ending up
with a single global version for paxos.

With the release of v0.52, the monitor started tracking these global
versions, keeping them for the single purpose of making it possible to
convert the store to a single-paxos format.

This patch now introduces a mechanism to convert a GV-enabled store to
the single-paxos format store when the monitor is upgraded.

As we require the global versions to be present, we first check if the
store has the GV feature set: if not we will not proceed, but we will
start the conversion otherwise.

In the end of the conversion, the monitor data directory will have a
brand new 'store.db' directory, where the key/value store lies,
alongside with the old store.  This makes it possible to revert to a
previous monitor version if things go sideways, without jeopardizing the
data in the store.

The conversion is done as during a rolling upgrade, without any
intervention by the user.  Fire up the new monitor version on an old
store, and the monitor itself will convert the store, trim any lingering
versions that might not be required, and proceed to start as expected.

Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
src/Makefile.am
src/ceph_mon.cc
src/include/CompatSet.h
src/include/ceph_features.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/Paxos.cc
src/mon/Paxos.h
src/mon/PaxosService.cc
src/mon/PaxosService.h

index 9b9d7ed004596a12ab3fc0c228619c39256509e0..45efa85544a7854ebfd7a0195cb7b2da5204b37d 100644 (file)
@@ -1823,6 +1823,7 @@ noinst_HEADERS = \
         mon/MonClient.h\
         mon/MonMap.h\
         mon/Monitor.h\
+       mon/MonitorStore.h\
         mon/MonitorDBStore.h\
         mon/OSDMonitor.h\
         mon/PGMap.h\
index 51503e49ba6d89f1dc0dfbdf90d6d0c70f173db5..c4ebf6b17fd0a04ca76672b8ace28515f04797ad 100644 (file)
@@ -216,6 +216,12 @@ int main(int argc, const char **argv)
     return 0;
   }
 
+  {
+    Monitor::StoreConverter converter(g_conf->mon_data);
+    if (converter.needs_conversion())
+      assert(!converter.convert());
+  }
+
   MonitorDBStore store(g_conf->mon_data);
   assert(!store.open(std::cerr));
 
@@ -368,7 +374,8 @@ int main(int argc, const char **argv)
   messenger->set_policy(entity_name_t::TYPE_MON,
                         Messenger::Policy::lossless_peer_reuse(supported,
                                                               CEPH_FEATURE_UID |
-                                                              CEPH_FEATURE_PGID64));
+                                                              CEPH_FEATURE_PGID64 |
+                                                              CEPH_FEATURE_MON_SINGLE_PAXOS));
   messenger->set_policy(entity_name_t::TYPE_OSD,
                         Messenger::Policy::stateless_server(supported,
                                                             CEPH_FEATURE_PGID64 |
index 797c376898798471df713d50fa02af0954aa42de..ac532fcef81660fe87d5424b972b544b3f0612e0 100644 (file)
@@ -52,7 +52,10 @@ struct CompatSet {
        names.erase(f);
        mask &= ~(1<<f);
       }
-    }      
+    }
+    void remove(Feature f) {
+      remove(f.id);
+    }
 
     void encode(bufferlist& bl) const {
       /* See below, mask always has the lowest bit set in memory, but
index 0561bc4afae851a5c264e19da65df357f87d5b43..74603678cc36aa5c7f81fbc1d3f201808e031a72 100644 (file)
@@ -32,6 +32,7 @@
 #define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25)
 #define CEPH_FEATURE_CREATEPOOLID   (1<<26)
 #define CEPH_FEATURE_REPLY_CREATE_INODE   (1<<27)
+#define CEPH_FEATURE_MON_SINGLE_PAXOS (1<<28)
 
 /*
  * Features supported.  Should be everything above.
@@ -64,7 +65,8 @@
         CEPH_FEATURE_RECOVERY_RESERVATION | \
         CEPH_FEATURE_CRUSH_TUNABLES2 |      \
         CEPH_FEATURE_CREATEPOOLID |         \
-        CEPH_FEATURE_REPLY_CREATE_INODE)
+        CEPH_FEATURE_REPLY_CREATE_INODE |   \
+        CEPH_FEATURE_MON_SINGLE_PAXOS)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
 
index 5afbb9bf7543dcbbc84984c3d179148f949712cc..2c841b4d834c094a70e7d03ab66af60d5312d792 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "osd/OSDMap.h"
 
+#include "MonitorStore.h"
 #include "MonitorDBStore.h"
 
 #include "msg/Messenger.h"
@@ -252,7 +253,7 @@ CompatSet Monitor::get_supported_features()
   CompatSet::FeatureSet ceph_mon_feature_ro_compat;
   CompatSet::FeatureSet ceph_mon_feature_incompat;
   ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
-  ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_GV);
+  ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
   return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
                   ceph_mon_feature_incompat);
 }
@@ -3991,3 +3992,308 @@ bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
   }
   return true;
 };
+
+#undef dout_prefix
+#define dout_prefix *_dout
+
+void Monitor::StoreConverter::_convert_finish_features(
+    MonitorDBStore::Transaction &t)
+{
+  dout(20) << __func__ << dendl;
+
+  assert(db->exists(MONITOR_NAME, COMPAT_SET_LOC));
+  bufferlist features_bl;
+  db->get(MONITOR_NAME, COMPAT_SET_LOC, features_bl);
+  assert(features_bl.length());
+
+  CompatSet features;
+  bufferlist::iterator p = features_bl.begin();
+  features.decode(p);
+
+  assert(features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV));
+  features.incompat.remove(CEPH_MON_FEATURE_INCOMPAT_GV);
+  assert(!features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV));
+
+  features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
+  assert(features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS));
+
+  features_bl.clear();
+  features.encode(features_bl);
+
+  dout(20) << __func__ << " new features " << features << dendl;
+  t.put(MONITOR_NAME, COMPAT_SET_LOC, features_bl);
+}
+
+
+bool Monitor::StoreConverter::_check_gv_store()
+{
+  dout(20) << __func__ << dendl;
+  if (!store->exists_bl_ss(COMPAT_SET_LOC, 0))
+    return false;
+
+  bufferlist features_bl;
+  store->get_bl_ss_safe(features_bl, COMPAT_SET_LOC, 0);
+  if (!features_bl.length()) {
+    dout(20) << __func__ << " on-disk features length is zero" << dendl;
+    return false;
+  }
+  CompatSet features;
+  bufferlist::iterator p = features_bl.begin();
+  features.decode(p);
+  return (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV));
+}
+
+bool Monitor::StoreConverter::needs_conversion()
+{
+  bool ret = false;
+
+  dout(10) << __func__ << dendl;
+  _init();
+  if (db->open(std::cerr) < 0) {
+    dout(1) << "unable to open monitor store at " << g_conf->mon_data << dendl;
+    dout(1) << "check for old monitor store format" << dendl;
+    assert(!store->mount());
+    bufferlist magicbl;
+    if (store->exists_bl_ss("magic", 0)) {
+      if (_check_gv_store()) {
+       dout(1) << "found old GV monitor store format "
+               << "-- should convert!" << dendl;
+       ret = true;
+      } else {
+       dout(0) << "Existing monitor store has not been converted "
+               << "to 0.52 (bobtail) format" << dendl;
+       assert(0 == "Existing store has not been converted to 0.52 format");
+      }
+    }
+    assert(!store->umount());
+  }
+  _deinit();
+  return ret;
+}
+
+int Monitor::StoreConverter::convert()
+{
+  _init();
+  assert(!db->create_and_open(std::cerr));
+  assert(!store->mount());
+  if (db->exists("mon_convert", "on_going")) {
+    dout(0) << __func__ << " found a mon store in mid-convertion; abort!"
+      << dendl;
+    return -EEXIST;
+  }
+
+  _mark_convert_start();
+  _convert_monitor();
+  _convert_machines();
+  _convert_paxos();
+  _mark_convert_finish();
+
+  store->umount();
+  _deinit();
+
+  dout(0) << __func__ << " finished conversion" << dendl;
+
+  return 0;
+}
+
+void Monitor::StoreConverter::_convert_monitor()
+{
+  dout(10) << __func__ << dendl;
+
+  assert(store->exists_bl_ss("magic"));
+  assert(store->exists_bl_ss("keyring"));
+  assert(store->exists_bl_ss("feature_set"));
+  assert(store->exists_bl_ss("election_epoch"));
+
+  MonitorDBStore::Transaction tx;
+
+  if (store->exists_bl_ss("joined")) {
+    version_t joined = store->get_int("joined");
+    tx.put(MONITOR_NAME, "joined", joined);
+  }
+
+  vector<string> keys;
+  keys.push_back("magic");
+  keys.push_back("feature_set");
+  keys.push_back("cluster_uuid");
+
+  vector<string>::iterator it;
+  for (it = keys.begin(); it != keys.end(); ++it) {
+    if (!store->exists_bl_ss((*it).c_str()))
+      continue;
+
+    bufferlist bl;
+    int r = store->get_bl_ss(bl, (*it).c_str(), 0);
+    assert(r > 0);
+    tx.put(MONITOR_NAME, *it, bl);
+  }
+  version_t election_epoch = store->get_int("election_epoch");
+  tx.put(MONITOR_NAME, "election_epoch", election_epoch);
+
+  assert(!tx.empty());
+  db->apply_transaction(tx);
+  dout(10) << __func__ << " finished" << dendl;
+}
+
+void Monitor::StoreConverter::_convert_machines(string machine)
+{
+  dout(10) << __func__ << " " << machine << dendl;
+
+  version_t first_committed =
+    store->get_int(machine.c_str(), "first_committed");
+  version_t last_committed =
+    store->get_int(machine.c_str(), "last_committed");
+
+  version_t accepted_pn = store->get_int(machine.c_str(), "accepted_pn");
+  version_t last_pn = store->get_int(machine.c_str(), "last_pn");
+
+  if (accepted_pn > highest_accepted_pn)
+    highest_accepted_pn = accepted_pn;
+  if (last_pn > highest_last_pn)
+    highest_last_pn = last_pn;
+
+  string machine_gv(machine);
+  machine_gv.append("_gv");
+  bool has_gv = true;
+
+  if (!store->exists_bl_ss(machine_gv.c_str())) {
+    dout(1) << __func__ << " " << machine
+      << " no gv dir '" << machine_gv << "'" << dendl;
+    has_gv = false;
+  }
+
+  for (version_t ver = first_committed; ver <= last_committed; ver++) {
+    if (!store->exists_bl_sn(machine.c_str(), ver)) {
+      dout(20) << __func__ << " " << machine
+              << " ver " << ver << " dne" << dendl;
+      continue;
+    }
+
+    bufferlist bl;
+    int r = store->get_bl_sn(bl, machine.c_str(), ver);
+    assert(r >= 0);
+    dout(20) << __func__ << " " << machine
+            << " ver " << ver << " bl " << bl.length() << dendl;
+
+    MonitorDBStore::Transaction tx;
+    tx.put(machine, ver, bl);
+    tx.put(machine, "last_committed", ver);
+
+    if (has_gv && store->exists_bl_sn(machine_gv.c_str(), ver)) {
+      stringstream s;
+      s << ver;
+      string ver_str = s.str();
+
+      version_t gv = store->get_int(machine_gv.c_str(), ver_str.c_str());
+      dout(20) << __func__ << " " << machine
+              << " ver " << ver << " -> " << gv << dendl;
+
+      if (gvs.count(gv) == 0) {
+       gvs.insert(gv);
+      } else {
+       dout(0) << __func__ << " " << machine
+               << " gv " << gv << " already exists"
+               << dendl;
+       assert(0 == "Duplicate GV -- something is wrong!");
+      }
+
+      bufferlist tx_bl;
+      tx.encode(tx_bl);
+      tx.put("paxos", gv, tx_bl);
+    }
+    db->apply_transaction(tx);
+  }
+
+  version_t lc = db->get(machine, "last_committed");
+  assert(lc == last_committed);
+
+  MonitorDBStore::Transaction tx;
+  tx.put(machine, "first_committed", first_committed);
+  tx.put(machine, "last_committed", last_committed);
+  tx.put(machine, "conversion_first", first_committed);
+
+  if (store->exists_bl_ss(machine.c_str(), "latest")) {
+    bufferlist latest_bl_raw;
+    int r = store->get_bl_ss(latest_bl_raw, machine.c_str(), "latest");
+    assert(r >= 0);
+    if (!latest_bl_raw.length()) {
+      dout(20) << __func__ << " machine " << machine
+              << " skip latest with size 0" << dendl;
+      goto out;
+    }
+
+    tx.put(machine, "latest", latest_bl_raw);
+
+    bufferlist::iterator lbl_it = latest_bl_raw.begin();
+    bufferlist latest_bl;
+    version_t latest_ver;
+    ::decode(latest_ver, lbl_it);
+    ::decode(latest_bl, lbl_it);
+
+    dout(20) << __func__ << " machine " << machine
+            << " latest ver " << latest_ver << dendl;
+
+    tx.put(machine, "full_latest", latest_ver);
+    stringstream os;
+    os << "full_" << latest_ver;
+    tx.put(machine, os.str(), latest_bl);
+  }
+out:
+  db->apply_transaction(tx);
+  dout(10) << __func__ << " machine " << machine << " finished" << dendl;
+}
+
+void Monitor::StoreConverter::_convert_paxos()
+{
+  dout(10) << __func__ << dendl;
+  assert(gvs.size() > 0);
+
+  set<version_t>::reverse_iterator rit = gvs.rbegin();
+  version_t highest_gv = *rit;
+  version_t last_gv = highest_gv;
+
+  int n = 0;
+  int max_versions = (g_conf->paxos_max_join_drift*2);
+  for (; (rit != gvs.rend()) && (n < max_versions); ++rit, ++n) {
+    version_t gv = *rit;
+
+    if (last_gv == gv)
+      continue;
+    if ((last_gv - gv) > 1) {
+      // we are done; we found a gap and we are only interested in keeping
+      // contiguous paxos versions.
+      break;
+    }
+    last_gv = gv;
+  }
+
+  // erase all paxos versions between [first, last_gv[, with first being the
+  // first gv in the map.
+  MonitorDBStore::Transaction tx;
+  set<version_t>::iterator it = gvs.begin();
+  dout(1) << __func__ << " first gv " << (*it)
+         << " last gv " << last_gv << dendl;
+  for (; it != gvs.end() && (*it < last_gv); ++it) {
+    tx.erase("paxos", *it);
+  }
+  tx.put("paxos", "first_committed", last_gv);
+  tx.put("paxos", "last_committed", highest_gv);
+  tx.put("paxos", "accepted_pn", highest_accepted_pn);
+  tx.put("paxos", "last_pn", highest_last_pn);
+  tx.put("paxos", "conversion_first", last_gv);
+  db->apply_transaction(tx);
+
+  dout(10) << __func__ << " finished" << dendl;
+}
+
+void Monitor::StoreConverter::_convert_machines()
+{
+  dout(10) << __func__ << dendl;
+  set<string> machine_names = _get_machines_names();
+  set<string>::iterator it = machine_names.begin();
+
+  for (; it != machine_names.end(); ++it) {
+    _convert_machines(*it);
+  }
+  dout(10) << __func__ << " finished" << dendl;
+}
index efed1b65d056b02275dde1988cce3e46c3d9abb5..652036a70b20b9e98539dc49e74fae4ff4c2492d 100644 (file)
@@ -45,6 +45,7 @@
 #include "perfglue/heap_profiler.h"
 
 #include "messages/MMonCommand.h"
+#include "mon/MonitorStore.h"
 #include "mon/MonitorDBStore.h"
 
 #include <memory>
@@ -1400,10 +1401,79 @@ private:
   // don't allow copying
   Monitor(const Monitor& rhs);
   Monitor& operator=(const Monitor &rhs);
+
+public:
+  class StoreConverter {
+    const string path;
+    boost::scoped_ptr<MonitorDBStore> db;
+    boost::scoped_ptr<MonitorStore> store;
+
+    set<version_t> gvs;
+    version_t highest_last_pn;
+    version_t highest_accepted_pn;
+
+   public:
+    StoreConverter(const string &path)
+      : path(path), db(NULL), store(NULL),
+       highest_last_pn(0), highest_accepted_pn(0)
+    { }
+
+    bool needs_conversion();
+    int convert();
+
+   private:
+
+    bool _check_gv_store();
+
+    void _init() {
+      MonitorDBStore *db_ptr = new MonitorDBStore(path);
+      db.reset(db_ptr);
+
+      MonitorStore *store_ptr = new MonitorStore(path);
+      store.reset(store_ptr);
+    }
+
+    void _deinit() {
+      db.reset(NULL);
+      store.reset(NULL);
+    }
+
+    set<string> _get_machines_names() {
+      set<string> names;
+      names.insert("auth");
+      names.insert("logm");
+      names.insert("mdsmap");
+      names.insert("monmap");
+      names.insert("osdmap");
+      names.insert("pgmap");
+
+      return names;
+    }
+
+    void _mark_convert_start() {
+      MonitorDBStore::Transaction tx;
+      tx.put("mon_convert", "on_going", 1);
+      db->apply_transaction(tx);
+    }
+
+    void _convert_finish_features(MonitorDBStore::Transaction &t);
+    void _mark_convert_finish() {
+      MonitorDBStore::Transaction tx;
+      tx.erase("mon_convert", "on_going");
+      _convert_finish_features(tx);
+      db->apply_transaction(tx);
+    }
+
+    void _convert_monitor();
+    void _convert_machines(string machine);
+    void _convert_machines();
+    void _convert_paxos();
+  };
 };
 
 #define CEPH_MON_FEATURE_INCOMPAT_BASE CompatSet::Feature (1, "initial feature set (~v.18)")
 #define CEPH_MON_FEATURE_INCOMPAT_GV CompatSet::Feature (2, "global version sequencing (v0.52)")
+#define CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS CompatSet::Feature (3, "single paxos with k/v store (v0.\?)")
 
 long parse_pos_long(const char *s, ostream *pss = NULL);
 
index 802c7a8e0f38a65c847b8e69ab5e3b6b6bfc7c4b..6b5fc91179cad510adc53e7d61c6d1c604abd7cd 100644 (file)
@@ -324,6 +324,18 @@ void Paxos::store_state(MMonPaxos *m)
     first_committed = get_store()->get(get_name(), "first_committed");
     last_committed = get_store()->get(get_name(), "last_committed");
   }
+
+  if (get_store()->exists(get_name(), "conversion_first")) {
+    MonitorDBStore::Transaction scrub_tx;
+    version_t cf = get_store()->get(get_name(), "conversion_first");
+    dout(10) << __func__ << " scrub paxos from " << cf
+            << " to " << first_committed << dendl;
+    if (cf < first_committed)
+      trim_to(&scrub_tx, cf, first_committed);
+    scrub_tx.erase(get_name(), "conversion_first");
+    get_store()->apply_transaction(scrub_tx);
+  }
+
 }
 
 // leader
@@ -923,6 +935,17 @@ void Paxos::lease_renew_timeout()
 /*
  * trim old states
  */
+void Paxos::trim_to(MonitorDBStore::Transaction *t,
+                   version_t from, version_t to) {
+  dout(10) << __func__ << " from " << from << " to " << to << dendl;
+  assert(from < to);
+
+  while (from < to) {
+    dout(10) << "trim " << from << dendl;
+    t->erase(get_name(), from);
+    from++;
+  }
+}
 
 void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first)
 {
@@ -931,13 +954,8 @@ void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first)
 
   if (first_committed >= first)
     return;
-
-  while (first_committed < first) {
-    dout(10) << "trim " << first_committed << dendl;
-    t->erase(get_name(), first_committed);
-    first_committed++;
-  }
-  t->put(get_name(), "first_committed", first_committed);
+  trim_to(t, first_committed, first);
+  t->put(get_name(), "first_committed", first);
 }
 
 void Paxos::trim_to(version_t first)
index 61da2c3fa1939317dcb524cd7e54d31d53be4b3b..ccabb438028994b9ffeead83470a34909d2d1868 100644 (file)
@@ -1139,6 +1139,15 @@ public:
    * @param first The version we are trimming to
    */
   void trim_to(MonitorDBStore::Transaction *t, version_t first);
+  /**
+   * Auxiliary function to erase states in the interval [from, to[ from stable
+   * storage.
+   *
+   * @param t A transaction
+   * @param from Bottom limit of the interval of versions to erase
+   * @param to Upper limit, not including, of the interval of versions to erase
+   */
+  void trim_to(MonitorDBStore::Transaction *t, version_t from, version_t to);
   /**
    * Trim the Paxos state as much as we can.
    */
index c8230529428c73f0e2aa17ece1dfae01ffa2781a..8495870e9a04d7cc141827dc293a8ee153958b4d 100644 (file)
@@ -84,6 +84,26 @@ bool PaxosService::dispatch(PaxosServiceMessage *m)
   return true;
 }
 
+void PaxosService::scrub()
+{
+  dout(10) << __func__ << dendl;
+  if (!mon->store->exists(get_service_name(), "conversion_first"))
+    return;
+
+  version_t cf = mon->store->get(get_service_name(), "conversion_first");
+  version_t fc = get_first_committed();
+
+  dout(10) << __func__ << " conversion_first " << cf
+          << " first committed " << fc << dendl;
+
+  MonitorDBStore::Transaction t;
+  if (cf < fc) {
+    trim(&t, cf, fc);
+  }
+  t.erase(get_service_name(), "conversion_first");
+  mon->store->apply_transaction(t);
+}
+
 bool PaxosService::should_propose(double& delay)
 {
   // simple default policy: quick startup, then some damping.
@@ -213,6 +233,8 @@ void PaxosService::_active()
   // pull latest from paxos
   update_from_paxos();
 
+  scrub();
+
   // create pending state?
   if (mon->is_leader() && is_active()) {
     dout(7) << "_active creating new pending" << dendl;
@@ -282,32 +304,36 @@ void PaxosService::wakeup_proposing_waiters()
   finish_contexts(g_ceph_context, waiting_for_finished_proposal);
 }
 
+void PaxosService::trim(MonitorDBStore::Transaction *t,
+                       version_t from, version_t to)
+{
+  dout(10) << __func__ << " from " << from << " to " << to << dendl;
+  assert(from != to);
+  for (; from < to; from++) {
+    dout(20) << __func__ << " " << from << dendl;
+    t->erase(get_service_name(), from);
+
+    string full_key = mon->store->combine_strings("full", from);
+    if (mon->store->exists(get_service_name(), full_key)) {
+      dout(20) << __func__ << " " << full_key << dendl;
+      t->erase(get_service_name(), full_key);
+    }
+  }
+}
+
 void PaxosService::encode_trim(MonitorDBStore::Transaction *t)
 {
   version_t first_committed = get_first_committed();
   version_t latest_full = get_version("full", "latest");
   version_t trim_to = get_trim_to();
 
-  string latest_key = mon->store->combine_strings("full", latest_full);
-  bool has_full = mon->store->exists(get_service_name(), latest_key);
-
   dout(10) << __func__ << " " << trim_to << " (was " << first_committed << ")"
           << ", latest full " << latest_full << dendl;
 
   if (first_committed >= trim_to)
     return;
 
-  while ((first_committed < trim_to)) {
-    dout(20) << __func__ << first_committed << dendl;
-    t->erase(get_service_name(), first_committed);
-
-    if (has_full) {
-      latest_key = mon->store->combine_strings("full", first_committed);
-      t->erase(get_service_name(), latest_key);
-    }
-
-    first_committed++;
-  }
-  put_first_committed(t, first_committed);
+  trim(t, first_committed, trim_to);
+  put_first_committed(t, trim_to);
 }
 
index 3231af859dddf0686811e71f3816456d67b32411..cfe764160edaa4079f84c335fbd073e954f9f9b5 100644 (file)
@@ -239,6 +239,11 @@ private:
    *      active
    */
   void _active();
+  /**
+   * Scrub our versions after we convert the store from the old layout to
+   * the new k/v store.
+   */
+  void scrub();
 
 public:
   /**
@@ -564,6 +569,15 @@ public:
    * @defgroup PaxosService_h_Trim
    * @{
    */
+  /**
+   * Auxiliary function to trim our state from version @from to version @to,
+   * not including; i.e., the interval [from, to[
+   *
+   * @param t The transaction to which we will add the trim operations.
+   * @param from the lower limit of the interval to be trimmed
+   * @param to the upper limit of the interval to be trimmed (not including)
+   */
+  void trim(MonitorDBStore::Transaction *t, version_t from, version_t to);
   /**
    * Trim our log. This implies getting rid of versions on the k/v store.
    * Services implementing us don't have to implement this function if they