From e91b16aa41b38f1828618f5d02c9fd19405cec05 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 12 Sep 2012 10:59:34 -0700 Subject: [PATCH] mon: fix paxos completions If we are already recovered, trigger completions immediately. Otherwise, wait, and when we do recover trigger them all. This fixes a number of things, most notably making active events only trigger when everyone is recovered. Signed-off-by: Sage Weil --- src/mon/Monitor.cc | 18 +++++++++++++----- src/mon/Monitor.h | 2 +- src/mon/Paxos.cc | 46 +++++++++++++++++++++++++++------------------- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index f5f6ca1123ede..e4fccb6747495 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -215,11 +215,19 @@ Monitor::~Monitor() delete mon_caps; } -void Monitor::recovered_machine(int id) +void Monitor::recovered_leader(int id) { + if (paxos_recovered.count(id)) + return; paxos_recovered.insert(id); if (paxos_recovered.size() == paxos.size()) { dout(10) << "all paxos instances recovered, going writeable" << dendl; + for (vector::iterator p = paxos.begin(); p != paxos.end(); p++) + finish_contexts(g_ceph_context, (*p)->waiting_for_active); + for (vector::iterator p = paxos.begin(); p != paxos.end(); p++) + finish_contexts(g_ceph_context, (*p)->waiting_for_commit); + for (vector::iterator p = paxos.begin(); p != paxos.end(); p++) + finish_contexts(g_ceph_context, (*p)->waiting_for_readable); for (vector::iterator p = paxos.begin(); p != paxos.end(); p++) finish_contexts(g_ceph_context, (*p)->waiting_for_writeable); } @@ -598,6 +606,9 @@ void Monitor::reset() quorum.clear(); outside_quorum.clear(); + paxos_recovered.clear(); + global_version = 0; + for (vector::iterator p = paxos.begin(); p != paxos.end(); p++) (*p)->restart(); for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) @@ -1027,9 +1038,6 @@ void Monitor::win_election(epoch_t epoch, set& active, unsigned features) << " features are " << quorum_features << dendl; - paxos_recovered.clear(); - global_version = 0; - clog.info() << "mon." << name << "@" << rank << " won leader election with quorum " << quorum << "\n"; @@ -1051,7 +1059,7 @@ void Monitor::lose_election(epoch_t epoch, set &q, int l) quorum_features = 0; dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader << " quorum is " << quorum << dendl; - + for (vector::iterator p = paxos.begin(); p != paxos.end(); p++) (*p)->peon_init(); for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 5058194ecfc3a..bccc5c09d1c1a 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -192,7 +192,7 @@ private: version_t global_version; public: - void recovered_machine(int id); + void recovered_leader(int id); version_t get_global_paxos_version(); bool is_all_paxos_recovered() { return paxos_recovered.size() == paxos.size(); diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index df636d8f71be6..2cdde8b6b353f 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -337,11 +337,13 @@ void Paxos::handle_last(MMonPaxos *last) extend_lease(); // wake people up - finish_contexts(g_ceph_context, waiting_for_active); - finish_contexts(g_ceph_context, waiting_for_readable); - //finish_contexts(g_ceph_context, waiting_for_writeable); - - mon->recovered_machine(machine_id); + if (mon->is_all_paxos_recovered()) { + finish_contexts(g_ceph_context, waiting_for_active); + finish_contexts(g_ceph_context, waiting_for_readable); + finish_contexts(g_ceph_context, waiting_for_writeable); + } else { + mon->recovered_leader(machine_id); + } } } } else { @@ -393,12 +395,14 @@ void Paxos::begin(bufferlist& v, version_t gv) commit(); state = STATE_ACTIVE; - finish_contexts(g_ceph_context, waiting_for_active); - finish_contexts(g_ceph_context, waiting_for_commit); - finish_contexts(g_ceph_context, waiting_for_readable); - //finish_contexts(g_ceph_context, waiting_for_writeable); - - mon->recovered_machine(machine_id); + if (mon->is_all_paxos_recovered()) { + finish_contexts(g_ceph_context, waiting_for_active); + finish_contexts(g_ceph_context, waiting_for_commit); + finish_contexts(g_ceph_context, waiting_for_readable); + finish_contexts(g_ceph_context, waiting_for_writeable); + } else { + mon->recovered_leader(machine_id); + } return; } @@ -506,13 +510,15 @@ void Paxos::handle_accept(MMonPaxos *accept) extend_lease(); // wake people up - finish_contexts(g_ceph_context, waiting_for_active); - finish_contexts(g_ceph_context, waiting_for_commit); - finish_contexts(g_ceph_context, waiting_for_readable); - //finish_contexts(g_ceph_context, waiting_for_writeable); - - mon->recovered_machine(machine_id); + if (mon->is_all_paxos_recovered()) { + finish_contexts(g_ceph_context, waiting_for_active); + finish_contexts(g_ceph_context, waiting_for_commit); + finish_contexts(g_ceph_context, waiting_for_readable); + finish_contexts(g_ceph_context, waiting_for_writeable); + } else { + mon->recovered_leader(machine_id); } + } accept->put(); } @@ -580,7 +586,9 @@ void Paxos::handle_commit(MMonPaxos *commit) commit->put(); - finish_contexts(g_ceph_context, waiting_for_commit); + if (mon->is_all_paxos_recovered()) + finish_contexts(g_ceph_context, waiting_for_commit); + // otherwise, this'll go when they all recover. } void Paxos::extend_lease() @@ -838,7 +846,7 @@ void Paxos::leader_init() if (mon->get_quorum().size() == 1) { state = STATE_ACTIVE; - mon->recovered_machine(machine_id); + mon->recovered_leader(machine_id); return; } -- 2.39.5