]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: trim old paxos state values. just pgmap, for now
authorSage Weil <sage@newdream.net>
Sat, 17 May 2008 04:16:36 +0000 (21:16 -0700)
committerSage Weil <sage@newdream.net>
Sat, 17 May 2008 04:16:36 +0000 (21:16 -0700)
src/messages/MMonPaxos.h
src/mon/Monitor.h
src/mon/MonitorStore.cc
src/mon/MonitorStore.h
src/mon/PGMonitor.cc
src/mon/Paxos.cc
src/mon/Paxos.h

index 990f37ef1793948a505150b7c7cbee138f117eec..ee9b78f1fd9aad7206722625540577d00048e634 100644 (file)
@@ -46,6 +46,7 @@ class MMonPaxos : public Message {
   __s32 op;          // paxos op
   __s32 machine_id;  // which state machine?
 
+  version_t first_committed;  // i've committed to
   version_t last_committed;  // i've committed to
   version_t pn_from;         // i promise to accept after
   version_t pn;              // with with proposal
@@ -59,13 +60,15 @@ class MMonPaxos : public Message {
     Message(MSG_MON_PAXOS),
     epoch(e),
     op(o), machine_id(mid),
-    last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
+    first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
   
   const char *get_type_name() { return "paxos"; }
   
   void print(ostream& out) {
     out << "paxos(" << get_paxos_name(machine_id)
-       << " " << get_opname(op) << " lc " << last_committed
+       << " " << get_opname(op) 
+       << " lc " << last_committed
+       << " fc " << first_committed
        << " pn " << pn << " opn " << uncommitted_pn
        << ")";
   }
@@ -74,6 +77,7 @@ class MMonPaxos : public Message {
     ::encode(epoch, payload);
     ::encode(op, payload);
     ::encode(machine_id, payload);
+    ::encode(first_committed, payload);
     ::encode(last_committed, payload);
     ::encode(pn_from, payload);
     ::encode(pn, payload);
@@ -86,6 +90,7 @@ class MMonPaxos : public Message {
     ::decode(epoch, p);
     ::decode(op, p);
     ::decode(machine_id, p);
+    ::decode(first_committed, p);
     ::decode(last_committed, p);
     ::decode(pn_from, p);   
     ::decode(pn, p);   
index 0d67ae2929ab6b03013afb71630078c7d849d838..7d1c2f63ac1ebb6bc601e86ab80c7e0a558f57bf 100644 (file)
@@ -90,6 +90,9 @@ public:
   epoch_t get_epoch() { return mon_epoch; }
   int get_leader() { return leader; }
   const set<int>& get_quorum() { return quorum; }
+  bool is_full_quorum() {
+    return quorum.size() == monmap->size();
+  }
 
   void call_election();  // initiate election
   void win_election(epoch_t epoch, set<int>& q);  // end election (called by Elector)
index 2f2e3ec8caa03a81e99168004c71e6583111e079..827bd309ae0d169ff62c4e2690a18e595e425b39 100644 (file)
@@ -140,6 +140,18 @@ bool MonitorStore::exists_bl_ss(const char *a, const char *b)
   return r == 0;
 }
 
+int MonitorStore::erase_ss(const char *a, const char *b)
+{
+  char fn[200];
+  if (b) {
+    dout(15) << "erase_ss " << a << "/" << b << dendl;
+    sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+  } else {
+    dout(15) << "erase_ss " << a << dendl;
+    sprintf(fn, "%s/%s", dir.c_str(), a);
+  }
+  return ::unlink(fn);
+}
 
 int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b)
 {
index 0167127998de8e201457bab8ebf87475172e6c2c..e47c9aa16b1a2b57f0d2c550f4e21917ef49363d 100644 (file)
@@ -68,6 +68,17 @@ public:
     return put_bl_ss(bl, a, bs);
   }
 
+  int erase_ss(const char *a, const char *b);
+  int erase_sn(const char *a, version_t b) {
+    char bs[20];
+#ifdef __LP64__
+    sprintf(bs, "%lu", b);
+#else
+    sprintf(bs, "%llu", b);
+#endif
+    return erase_ss(a, bs);
+  }
+
   /*
   version_t get_incarnation() { return get_int("incarnation"); }
   void set_incarnation(version_t i) { set_int(i, "incarnation"); }
index 7791d0ddc0a9806d0096d12010dcbf9a0c054803..5616fa4704ea41e148d5de98567be35654a2ab47 100644 (file)
@@ -143,6 +143,11 @@ bool PGMonitor::update_from_paxos()
   pg_map.encode(bl);
   mon->store->put_bl_ss(bl, "pgmap", "latest");
 
+  if (mon->is_leader() &&
+      mon->is_full_quorum() &&
+      paxosv > 10)
+    paxos->trim_to(paxosv-10);
+
   send_pg_creates();
 
   return true;
index 87e957bdae2fe1dfec9c4b17b95675e7f8be94c4..23eb9fc419d0fddfc5385c9aceafdc621ca5f884 100644 (file)
@@ -30,6 +30,7 @@ void Paxos::init()
   last_pn = mon->store->get_int(machine_name, "last_pn");
   accepted_pn = mon->store->get_int(machine_name, "accepted_pn");
   last_committed = mon->store->get_int(machine_name, "last_committed");
+  first_committed = mon->store->get_int(machine_name, "first_committed");
 
   dout(10) << "init" << dendl;
 }
@@ -458,6 +459,8 @@ void Paxos::extend_lease()
     MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
     lease->last_committed = last_committed;
     lease->lease_expire = lease_expire;
+    if (mon->is_full_quorum())
+      lease->first_committed = first_committed;
     mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
   }
 
@@ -487,7 +490,7 @@ void Paxos::handle_lease(MMonPaxos *lease)
     delete lease;
     return;
   }
-  
+
   // extend lease
   if (lease_expire < lease->lease_expire) 
     lease_expire = lease->lease_expire;
@@ -500,6 +503,7 @@ void Paxos::handle_lease(MMonPaxos *lease)
   // ack
   MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id);
   ack->last_committed = last_committed;
+  ack->first_committed = first_committed;
   ack->lease_expire = lease_expire;
   mon->messenger->send_message(ack, lease->get_source_inst());
 
@@ -508,6 +512,9 @@ void Paxos::handle_lease(MMonPaxos *lease)
     mon->timer.cancel_event(lease_timeout_event);
   lease_timeout_event = new C_LeaseTimeout(this);
   mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_timeout_event);
+
+  // trim?
+  trim_to(lease->first_committed);
   
   // kick waiters
   finish_contexts(waiting_for_active);
@@ -575,6 +582,26 @@ void Paxos::lease_renew_timeout()
 }
 
 
+
+/*
+ * trim old states
+ */
+
+void Paxos::trim_to(version_t first)
+{
+  dout(10) << "trim_to " << first << " (was " << first_committed << ")" << dendl;
+
+  if (first_committed >= first)
+    return;
+
+  while (first_committed < first) {
+    dout(10) << "trim " << first_committed << dendl;
+    mon->store->erase_sn(machine_name, first_committed);
+    first_committed++;
+  }
+  mon->store->put_int(first_committed, machine_name, "first_committed");
+}
+
 /*
  * return a globally unique, monotonically increasing proposal number
  */
index b2e9ce900207c3c90ea83d1aa2b56527b37a2490..be1bf68634b0a327073b574c12063495df6833e5 100644 (file)
@@ -100,6 +100,8 @@ public:
 
 private:
   // recovery (phase 1)
+  version_t first_committed_any;
+  version_t first_committed;
   version_t last_pn;
   version_t last_committed;
   version_t accepted_pn;
@@ -215,12 +217,13 @@ public:
   void leader_init();
   void peon_init();
 
-
   // -- service interface --
   void wait_for_active(Context *c) {
     assert(!is_active());
     waiting_for_active.push_back(c);
   }
+
+  void trim_to(version_t first);
   
   // read
   version_t get_version() { return last_committed; }