delete mon_caps;
}
+void Monitor::recovered_machine(int id)
+{
+ paxos_recovered.insert(id);
+ if (paxos_recovered.size() == paxos.size()) {
+ dout(10) << "all paxos instances recovered, going writeable" << dendl;
+ for (vector<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
+ finish_contexts(g_ceph_context, (*p)->waiting_for_writeable);
+ }
+}
+
enum {
l_mon_first = 456000,
l_mon_last,
<< " features are " << quorum_features
<< dendl;
+ paxos_recovered.clear();
+
clog.info() << "mon." << name << "@" << rank
<< " won leader election with quorum " << quorum << "\n";
list<Context*> waitfor_quorum;
list<Context*> maybe_wait_for_quorum;
+ // multi-paxos global version sequencing kludge-o-rama
+ set<int> paxos_recovered; ///< num paxos machines fully recovered during this election epoch
+public:
+ void recovered_machine(int id);
+ bool is_all_paxos_recovered() {
+ return paxos_recovered.size() == paxos.size();
+ }
+
+private:
Context *probe_timeout_event; // for probing and slurping states
struct C_ProbeTimeout : public Context {
// wake people up
finish_contexts(g_ceph_context, waiting_for_active);
finish_contexts(g_ceph_context, waiting_for_readable);
- finish_contexts(g_ceph_context, waiting_for_writeable);
+ //finish_contexts(g_ceph_context, waiting_for_writeable);
+
+ mon->recovered_machine(machine_id);
}
}
} else {
// we're alone, take it easy
commit();
state = STATE_ACTIVE;
+
finish_contexts(g_ceph_context, waiting_for_active);
finish_contexts(g_ceph_context, waiting_for_commit);
finish_contexts(g_ceph_context, waiting_for_readable);
- finish_contexts(g_ceph_context, waiting_for_writeable);
+ //finish_contexts(g_ceph_context, waiting_for_writeable);
+
+ mon->recovered_machine(machine_id);
+
return;
}
// yay!
state = STATE_ACTIVE;
extend_lease();
-
+
// wake people up
finish_contexts(g_ceph_context, waiting_for_active);
finish_contexts(g_ceph_context, waiting_for_commit);
finish_contexts(g_ceph_context, waiting_for_readable);
- finish_contexts(g_ceph_context, waiting_for_writeable);
- }
+ //finish_contexts(g_ceph_context, waiting_for_writeable);
+
+ mon->recovered_machine(machine_id);
+ }
accept->put();
}
new_value.clear();
if (mon->get_quorum().size() == 1) {
- state = STATE_ACTIVE;
+ state = STATE_ACTIVE;
+ mon->recovered_machine(machine_id);
return;
- }
+ }
+
state = STATE_RECOVERING;
lease_expire = utime_t();
dout(10) << "leader_init -- starting paxos recovery" << dendl;
bool Paxos::is_writeable()
{
+ // do not allow new paxos writes until all paxos machines have
+ // recovered. this ensures that the global versions we choose at
+ // proposal time are sanely ordered.
+ if (!mon->is_all_paxos_recovered())
+ return false;
+
if (mon->get_quorum().size() == 1) return true;
return
mon->is_leader() &&