]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: share global version with proposal, collect, commits, slurp, etc.
authorSage Weil <sage@inktank.com>
Thu, 6 Sep 2012 20:39:34 +0000 (13:39 -0700)
committerSage Weil <sage@inktank.com>
Thu, 13 Sep 2012 00:33:00 +0000 (17:33 -0700)
Share the global version through all of the paxos messages that share
data, and when slurping.  This ensures that committed values get the gv's
associated with them, and that proposed values, if they are later committed
by a new leader, will retain their original gv, and that new gv's will only
be allocated after that -- once paxos settles and things go writeable.

Signed-off-by: Sage Weil <sage@inktank.com>
src/mon/Monitor.cc
src/mon/Paxos.cc

index adbd8708af9d9c164b95d98bb95cb5a7ca5a04ae..f5f6ca1123eded6e3ce124bc56a6519a837f7ffc 100644 (file)
@@ -897,6 +897,7 @@ MMonProbe *Monitor::fill_probe_data(MMonProbe *m, Paxos *pax)
   int len = 0;
   for (; v <= pax->get_version(); v++) {
     len += store->get_bl_sn(r->paxos_values[m->machine_name][v], m->machine_name.c_str(), v);
+    r->gv[m->machine_name][v] = store->get_global_version(m->machine_name.c_str(), v);
     for (list<string>::iterator p = pax->extra_state_dirs.begin();
          p != pax->extra_state_dirs.end();
          ++p) {
@@ -955,7 +956,7 @@ void Monitor::handle_probe_data(MMonProbe *m)
     for (map<string, map<version_t, bufferlist> >::iterator p = m->paxos_values.begin();
         p != m->paxos_values.end();
         ++p) {
-      store->put_bl_sn_map(p->first.c_str(), p->second.begin(), p->second.end());
+      store->put_bl_sn_map(p->first.c_str(), p->second.begin(), p->second.end(), &m->gv[p->first]);
     }
 
     pax->last_committed = m->paxos_values.begin()->second.rbegin()->first;
index f0f4cd0cc16c141c2fd8c907e3b2dcbbf9c2b89b..df636d8f71be6a0e1eefbc4460710ac23bd5157f 100644 (file)
@@ -151,6 +151,7 @@ void Paxos::handle_collect(MMonPaxos *collect)
     dout(10) << " sharing our accepted but uncommitted value for " 
             << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
     last->values[last_committed+1] = bl;
+    last->gv[last_committed+1] = mon->store->get_global_version(machine_name, last_committed+1);
     last->uncommitted_pn = accepted_pn;
   }
 
@@ -184,6 +185,7 @@ void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
   for ( ; v <= last_committed; v++) {
     if (mon->store->exists_bl_sn(machine_name, v)) {
       mon->store->get_bl_sn(m->values[v], machine_name, v);
+      m->gv[v] = mon->store->get_global_version(machine_name, v);
       dout(10) << " sharing " << v << " (" 
               << m->values[v].length() << " bytes)" << dendl;
    }
@@ -243,7 +245,7 @@ void Paxos::store_state(MMonPaxos *m)
     dout(10) << "store_state [" << start->first << ".." 
             << last_committed << "]" << dendl;
 
-    mon->store->put_bl_sn_map(machine_name, start, end);
+    mon->store->put_bl_sn_map(machine_name, start, end, &m->gv);
     mon->store->put_int(last_committed, machine_name, "last_committed");
     mon->store->put_int(first_committed, machine_name, "first_committed");
   }
@@ -383,6 +385,8 @@ void Paxos::begin(bufferlist& v, version_t gv)
   accepted.insert(mon->rank);
   new_value = v;
   mon->store->put_bl_sn(new_value, machine_name, last_committed+1);
+  if (gv > 0)
+    mon->store->put_global_version(machine_name, last_committed+1, gv);
 
   if (mon->get_quorum().size() == 1) {
     // we're alone, take it easy
@@ -409,6 +413,7 @@ void Paxos::begin(bufferlist& v, version_t gv)
     MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
                                     machine_id, ceph_clock_now(g_ceph_context));
     begin->values[last_committed+1] = new_value;
+    begin->gv[last_committed+1] = gv;
     begin->last_committed = last_committed;
     begin->pn = accepted_pn;
     
@@ -440,8 +445,10 @@ void Paxos::handle_begin(MMonPaxos *begin)
 
   // yes.
   version_t v = last_committed+1;
-  dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
+  dout(10) << "accepting value for " << v << " pn " << accepted_pn << " gv " << begin->gv[v] << dendl;
   mon->store->put_bl_sn(begin->values[v], machine_name, v);
+  if (begin->gv.count(v) && begin->gv[v] > 0)
+    mon->store->put_global_version(machine_name, v, begin->gv[v]);
   
   // reply
   MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
@@ -546,9 +553,10 @@ void Paxos::commit()
     MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
                                      machine_id, ceph_clock_now(g_ceph_context));
     commit->values[last_committed] = new_value;
+    commit->gv[last_committed] = mon->store->get_global_version(machine_name, last_committed);
     commit->pn = accepted_pn;
     commit->last_committed = last_committed;
-    
+
     mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
   }