From 2f4658f6d5289c17a41131829d226aeeee49b151 Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 14 Jun 2007 01:06:23 +0000 Subject: [PATCH] * more paxos work git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1418 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/messages/MMonPaxos.h | 18 +++++++++------ trunk/ceph/mon/Elector.cc | 16 ++++++++++---- trunk/ceph/mon/Elector.h | 16 +++++++++----- trunk/ceph/mon/Monitor.cc | 34 +++++++++++++++++++--------- trunk/ceph/mon/Monitor.h | 12 +++++----- trunk/ceph/mon/Paxos.cc | 39 +++++++++++---------------------- trunk/ceph/mon/Paxos.h | 11 ++++++++++ trunk/ceph/msg/FakeMessenger.cc | 10 ++++----- 8 files changed, 91 insertions(+), 65 deletions(-) diff --git a/trunk/ceph/messages/MMonPaxos.h b/trunk/ceph/messages/MMonPaxos.h index a5236e8472270..f766c9254cb4a 100644 --- a/trunk/ceph/messages/MMonPaxos.h +++ b/trunk/ceph/messages/MMonPaxos.h @@ -39,10 +39,10 @@ class MMonPaxos : public Message { } } - // which state machine? - int op; - int machine_id; - + epoch_t epoch; // monitor epoch + int op; // paxos op + int machine_id; // which state machine? + version_t last_committed; // i've committed to version_t pn_from; // i promise to accept after version_t pn; // with with proposal @@ -52,9 +52,11 @@ class MMonPaxos : public Message { map values; MMonPaxos() : Message(MSG_MON_PAXOS) {} - MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS), - op(o), machine_id(mid), - last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { } + MMonPaxos(epoch_t e, int o, int mid) : + Message(MSG_MON_PAXOS), + epoch(e), + op(o), machine_id(mid), + last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { } virtual char *get_type_name() { return "paxos"; } @@ -66,6 +68,7 @@ class MMonPaxos : public Message { } void encode_payload() { + ::_encode(epoch, payload); ::_encode(op, payload); ::_encode(machine_id, payload); ::_encode(last_committed, payload); @@ -77,6 +80,7 @@ class MMonPaxos : public Message { } void decode_payload() { int off = 0; + ::_decode(epoch, payload, off); ::_decode(op, payload, off); ::_decode(machine_id, payload, off); ::_decode(last_committed, payload, off); diff --git a/trunk/ceph/mon/Elector.cc b/trunk/ceph/mon/Elector.cc index 345a38b171e16..cdfce72bb0681 100644 --- a/trunk/ceph/mon/Elector.cc +++ b/trunk/ceph/mon/Elector.cc @@ -33,6 +33,12 @@ void Elector::init() dout(1) << "init, last seen epoch " << epoch << endl; } +void Elector::shutdown() +{ + if (expire_event) + mon->timer.cancel_event(expire_event); +} + void Elector::bump_epoch(epoch_t e) { dout(10) << "bump_epoch " << epoch << " to " << e << endl; @@ -94,15 +100,17 @@ void Elector::reset_timer(double plus) // set the timer cancel_timer(); expire_event = new C_ElectionExpire(this); - g_timer.add_event_after(g_conf.mon_lease + plus, - expire_event); + mon->timer.add_event_after(g_conf.mon_lease + plus, + expire_event); } void Elector::cancel_timer() { - if (expire_event) - g_timer.cancel_event(expire_event); + if (expire_event) { + mon->timer.cancel_event(expire_event); + expire_event = 0; + } } void Elector::expire() diff --git a/trunk/ceph/mon/Elector.h b/trunk/ceph/mon/Elector.h index 5a0edf70d1b8c..9bfd7cb644fc7 100644 --- a/trunk/ceph/mon/Elector.h +++ b/trunk/ceph/mon/Elector.h @@ -69,19 +69,23 @@ class Elector { void handle_propose(class MMonElection *m); void handle_ack(class MMonElection *m); void handle_victory(class MMonElection *m); - public: - Elector(Monitor *m, int w) : mon(m), whoami(w) { - // initialize all those values! - // ... - } + Elector(Monitor *m, int w) : mon(m), whoami(w), + expire_event(0), + epoch(0), + electing_me(false), + leader_acked(-1) { } void init(); + void shutdown(); + + void dispatch(Message *m); + void call_election() { start(); } - void dispatch(Message *m); + }; diff --git a/trunk/ceph/mon/Monitor.cc b/trunk/ceph/mon/Monitor.cc index 8b4f73f41b366..1d55b6c7d8496 100644 --- a/trunk/ceph/mon/Monitor.cc +++ b/trunk/ceph/mon/Monitor.cc @@ -90,6 +90,8 @@ void Monitor::shutdown() { dout(1) << "shutdown" << endl; + elector.shutdown(); + // cancel all events cancel_tick(); timer.cancel_all(); @@ -255,16 +257,28 @@ void Monitor::dispatch(Message *m) // paxos case MSG_MON_PAXOS: - // send it to the right paxos instance - switch (((MMonPaxos*)m)->machine_id) { - case PAXOS_TEST: - test_paxos.dispatch(m); - break; - case PAXOS_OSDMAP: - //... - - default: - assert(0); + { + MMonPaxos *pm = (MMonPaxos*)m; + + // sanitize + if (pm->epoch > mon_epoch) + assert(0); //call_election(); // wtf + if (pm->epoch != mon_epoch) { + delete pm; + break; + } + + // send it to the right paxos instance + switch (pm->machine_id) { + case PAXOS_TEST: + test_paxos.dispatch(m); + break; + case PAXOS_OSDMAP: + //... + + default: + assert(0); + } } break; diff --git a/trunk/ceph/mon/Monitor.h b/trunk/ceph/mon/Monitor.h index 0f001e302a92e..e7e77bca305a4 100644 --- a/trunk/ceph/mon/Monitor.h +++ b/trunk/ceph/mon/Monitor.h @@ -80,16 +80,14 @@ private: utime_t last_called_election; // [starting] last time i called an election public: - // initiate election - void call_election(); - - // end election (called by Elector) - void win_election(epoch_t epoch, set& q); - void lose_election(epoch_t epoch, int l); - + epoch_t get_epoch() { return mon_epoch; } int get_leader() { return leader; } const set& get_quorum() { return quorum; } + void call_election(); // initiate election + void win_election(epoch_t epoch, set& q); // end election (called by Elector) + void lose_election(epoch_t epoch, int l); // end election (called by Elector) + // -- paxos -- Paxos test_paxos; diff --git a/trunk/ceph/mon/Paxos.cc b/trunk/ceph/mon/Paxos.cc index 8f200717c1a40..a0875e072bba3 100644 --- a/trunk/ceph/mon/Paxos.cc +++ b/trunk/ceph/mon/Paxos.cc @@ -51,7 +51,7 @@ void Paxos::collect(version_t oldpn) ++p) { if (*p == whoami) continue; - MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id); + MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id); collect->last_committed = last_committed; collect->pn = accepted_pn; mon->messenger->send_message(collect, mon->monmap->get_inst(*p)); @@ -74,7 +74,7 @@ void Paxos::handle_collect(MMonPaxos *collect) state = STATE_RECOVERING; // reply - MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id); + MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id); last->last_committed = last_committed; // do we have an accepted but uncommitted value? @@ -135,7 +135,7 @@ void Paxos::handle_last(MMonPaxos *last) if (last->last_committed < last_committed) { // share committed values dout(10) << "sending commit to " << last->get_source() << endl; - MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id); + MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); for (version_t v = last->last_committed; v <= last_committed; v++) { @@ -232,7 +232,7 @@ void Paxos::begin(bufferlist& v) if (*p == whoami) continue; dout(10) << " sending begin to mon" << *p << endl; - MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id); + MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id); begin->values[last_committed+1] = new_value; begin->last_committed = last_committed; begin->pn = accepted_pn; @@ -264,7 +264,7 @@ void Paxos::handle_begin(MMonPaxos *begin) mon->store->put_bl_sn(begin->values[v], machine_name, v); // reply - MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id); + MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id); accept->pn = accepted_pn; accept->last_committed = last_committed; mon->messenger->send_message(accept, begin->get_source_inst()); @@ -325,7 +325,7 @@ void Paxos::commit() if (*p == whoami) continue; dout(10) << " sending commit to mon" << *p << endl; - MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id); + MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); commit->values[last_committed] = new_value; commit->pn = accepted_pn; @@ -376,7 +376,7 @@ void Paxos::extend_lease() p != mon->get_quorum().end(); ++p) { if (*p == whoami) continue; - MMonPaxos *lease = new MMonPaxos(MMonPaxos::OP_LEASE, machine_id); + MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id); lease->last_committed = last_committed; lease->lease_timeout = lease_timeout; mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); @@ -444,7 +444,7 @@ void Paxos::leader_init() { state = STATE_RECOVERING; lease_timeout = utime_t(); - dout(10) << "leader_init -- i am the leader, starting paxos recovery" << endl; + dout(10) << "leader_init -- starting paxos recovery" << endl; collect(0); } @@ -468,20 +468,13 @@ void Paxos::dispatch(Message *m) delete m; return; } - - // from the proper leader? - if (mon->is_peon()) { - if (m->get_source().num() != mon->get_leader()) { - dout(5) << "dropping from non-leader " << m->get_source() << " " << *m << endl; - delete m; - return; - } - } - - assert(mon->is_peon() || mon->is_leader()); + // check sanity + assert(mon->is_leader() || + (mon->is_peon() && m->get_source().num() == mon->get_leader())); + switch (m->get_type()) { - + case MSG_MON_PAXOS: { MMonPaxos *pm = (MMonPaxos*)m; @@ -492,27 +485,21 @@ void Paxos::dispatch(Message *m) case MMonPaxos::OP_COLLECT: handle_collect(pm); break; - case MMonPaxos::OP_LAST: handle_last(pm); break; - case MMonPaxos::OP_BEGIN: handle_begin(pm); break; - case MMonPaxos::OP_ACCEPT: handle_accept(pm); break; - case MMonPaxos::OP_COMMIT: handle_commit(pm); break; - case MMonPaxos::OP_LEASE: handle_lease(pm); break; - default: assert(0); } diff --git a/trunk/ceph/mon/Paxos.h b/trunk/ceph/mon/Paxos.h index 63a7361b09b93..43c1967ca7054 100644 --- a/trunk/ceph/mon/Paxos.h +++ b/trunk/ceph/mon/Paxos.h @@ -35,6 +35,17 @@ e 12v */ + +/* + * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways: + * 1- Only a single new value is generated at a time, simplifying the recovery logic. + * 2- Nodes track "committed" values, and share them generously (and trustingly) + * 3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to + * "read" their copy of the last committed value. + * + * This provides a simple replication substrate that services can be built on top of. + */ + #ifndef __MON_PAXOS_H #define __MON_PAXOS_H diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index b7f02bdb40624..19f8f4320ed68 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -71,17 +71,17 @@ void *fakemessenger_thread(void *ptr) { lock.Lock(); while (1) { + if (fm_shutdown) break; + fakemessenger_do_loop_2(); + + if (directory.empty()) break; + dout(20) << "thread waiting" << endl; if (fm_shutdown) break; awake = false; cond.Wait(lock); awake = true; dout(20) << "thread woke up" << endl; - if (fm_shutdown) break; - - fakemessenger_do_loop_2(); - - if (directory.empty()) break; } lock.Unlock(); -- 2.39.5