]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
paxos: recover using stashed latest when state histories don't overlap
authorSage Weil <sage@newdream.net>
Fri, 21 May 2010 23:17:34 +0000 (16:17 -0700)
committerSage Weil <sage@newdream.net>
Fri, 21 May 2010 23:17:34 +0000 (16:17 -0700)
If we don't have incremental states to catch up, jump to the latest.

src/mon/Paxos.cc
src/mon/Paxos.h

index 119aa482e40cc2ceec68dd9d185a92915e346e66..91e96ca99a955695c178e91ee091292c9bb7027d 100644 (file)
@@ -91,6 +91,7 @@ void Paxos::collect(version_t oldpn)
     
     MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id);
     collect->last_committed = last_committed;
+    collect->first_committed = first_committed;
     collect->pn = accepted_pn;
     mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
   }
@@ -114,6 +115,7 @@ void Paxos::handle_collect(MMonPaxos *collect)
   // reply
   MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id);
   last->last_committed = last_committed;
+  last->first_committed = first_committed;
   
   // do we have an accepted but uncommitted value?
   //  (it'll be at last_committed+1)
@@ -144,37 +146,42 @@ void Paxos::handle_collect(MMonPaxos *collect)
   last->pn_from = accepted_pn_from;
 
   // and share whatever data we have
-  if (collect->last_committed < last_committed) {
-    bufferlist bl;
-    version_t l = get_latest(bl);
-    assert(l <= last_committed);
+  if (collect->last_committed < last_committed)
+    share_state(last, collect->first_committed, collect->last_committed);
 
-    version_t v = collect->last_committed;
+  // send reply
+  mon->messenger->send_message(last, collect->get_source_inst());
+  collect->put();
+}
 
-    // start with a stashed full copy?
-    /* hmm.
-    if (l > v + 10) {
-      last->latest_value.claim(bl);
-      last->latest_version = l;
-      v = l;
-    }
-    */
+void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, version_t peer_last_committed)
+{
+  assert(peer_last_committed < last_committed);
 
-    // include (remaining) incrementals
-    for (v++;
-        v <= last_committed;
-        v++) {
-      if (mon->store->exists_bl_sn(machine_name, v)) {
-       mon->store->get_bl_sn(last->values[v], machine_name, v);
-       dout(10) << " sharing " << v << " (" 
-                << last->values[v].length() << " bytes)" << dendl;
-      }
+  dout(10) << "share_state peer has fc " << peer_first_committed << " lc " << peer_last_committed << dendl;
+  version_t v = peer_last_committed;
+    
+  // start with a stashed full copy?
+  if (peer_last_committed < first_committed) {
+    bufferlist bl;
+    version_t l = get_latest(bl);
+    assert(l <= last_committed);
+    dout(10) << "share_state starting with latest " << l << " (" << bl.length() << " bytes)" << dendl;
+    m->latest_value.claim(bl);
+    m->latest_version = l;
+    v = l;
+  }
+
+  // include (remaining) incrementals
+  for (v++;
+       v <= last_committed;
+       v++) {
+    if (mon->store->exists_bl_sn(machine_name, v)) {
+      mon->store->get_bl_sn(m->values[v], machine_name, v);
+      dout(10) << " sharing " << v << " (" 
+              << m->values[v].length() << " bytes)" << dendl;
     }
   }
-
-  // send reply
-  mon->messenger->send_message(last, collect->get_source_inst());
-  collect->put();
 }
 
 
@@ -194,13 +201,7 @@ void Paxos::handle_last(MMonPaxos *last)
     // share committed values
     dout(10) << "sending commit to " << last->get_source() << dendl;
     MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
-    for (version_t v = last->last_committed+1;
-        v <= last_committed;
-        v++) {
-      mon->store->get_bl_sn(commit->values[v], machine_name, v);
-      dout(10) << " sharing " << v << " (" 
-              << commit->values[v].length() << " bytes)" << dendl;
-    }
+    share_state(commit, last->first_committed, last->last_committed);
     commit->last_committed = last_committed;
     mon->messenger->send_message(commit, last->get_source_inst());
   }
@@ -494,6 +495,22 @@ void Paxos::handle_commit(MMonPaxos *commit)
 
   // commit locally.
   bool big_sync = commit->values.size() > 2;
+
+  // stash?
+  if (commit->latest_version) {
+    dout(10) << "got stash version " << commit->latest_version << ", zapping old states" << dendl;
+    stash_latest(commit->latest_version, commit->latest_value);
+
+    while (first_committed <= last_committed) {
+      dout(10) << "trim " << first_committed << dendl;
+      mon->store->erase_sn(machine_name, first_committed);
+      first_committed++;
+    }
+    last_committed = commit->latest_version;
+    first_committed = last_committed;
+    mon->store->put_int(first_committed, machine_name, "first_committed");
+  }
+
   for (map<version_t,bufferlist>::iterator p = commit->values.begin();
        p != commit->values.end();
        ++p) {
@@ -533,8 +550,7 @@ void Paxos::extend_lease()
     MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
     lease->last_committed = last_committed;
     lease->lease_expire = lease_expire;
-    if (mon->is_full_quorum())
-      lease->first_committed = first_committed;
+    lease->first_committed = first_committed;
     mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
   }
 
index 3ca31c5f30dff568bb8350a560e435d908920aa3..9c999ac1cefc8be94648833f48d687845a7a6c67 100644 (file)
@@ -248,6 +248,8 @@ public:
   void leader_init();
   void peon_init();
 
+  void share_state(MMonPaxos *m, version_t first_committed, version_t last_committed);
+
   // -- service interface --
   void wait_for_active(Context *c) {
     waiting_for_active.push_back(c);