}
}
- // which state machine?
- int op;
- int machine_id;
-
+ epoch_t epoch; // monitor epoch
+ int op; // paxos op
+ int machine_id; // which state machine?
+
version_t last_committed; // i've committed to
version_t pn_from; // i promise to accept after
version_t pn; // with with proposal
map<version_t,bufferlist> values;
MMonPaxos() : Message(MSG_MON_PAXOS) {}
- MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS),
- op(o), machine_id(mid),
- last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
+ MMonPaxos(epoch_t e, int o, int mid) :
+ Message(MSG_MON_PAXOS),
+ epoch(e),
+ op(o), machine_id(mid),
+ last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
virtual char *get_type_name() { return "paxos"; }
}
void encode_payload() {
+ ::_encode(epoch, payload);
::_encode(op, payload);
::_encode(machine_id, payload);
::_encode(last_committed, payload);
}
void decode_payload() {
int off = 0;
+ ::_decode(epoch, payload, off);
::_decode(op, payload, off);
::_decode(machine_id, payload, off);
::_decode(last_committed, payload, off);
dout(1) << "init, last seen epoch " << epoch << endl;
}
+void Elector::shutdown()
+{
+ if (expire_event)
+ mon->timer.cancel_event(expire_event);
+}
+
void Elector::bump_epoch(epoch_t e)
{
dout(10) << "bump_epoch " << epoch << " to " << e << endl;
// set the timer
cancel_timer();
expire_event = new C_ElectionExpire(this);
- g_timer.add_event_after(g_conf.mon_lease + plus,
- expire_event);
+ mon->timer.add_event_after(g_conf.mon_lease + plus,
+ expire_event);
}
void Elector::cancel_timer()
{
- if (expire_event)
- g_timer.cancel_event(expire_event);
+ if (expire_event) {
+ mon->timer.cancel_event(expire_event);
+ expire_event = 0;
+ }
}
void Elector::expire()
void handle_propose(class MMonElection *m);
void handle_ack(class MMonElection *m);
void handle_victory(class MMonElection *m);
-
public:
- Elector(Monitor *m, int w) : mon(m), whoami(w) {
- // initialize all those values!
- // ...
- }
+ Elector(Monitor *m, int w) : mon(m), whoami(w),
+ expire_event(0),
+ epoch(0),
+ electing_me(false),
+ leader_acked(-1) { }
void init();
+ void shutdown();
+
+ void dispatch(Message *m);
+
void call_election() {
start();
}
- void dispatch(Message *m);
+
};
{
dout(1) << "shutdown" << endl;
+ elector.shutdown();
+
// cancel all events
cancel_tick();
timer.cancel_all();
// paxos
case MSG_MON_PAXOS:
- // send it to the right paxos instance
- switch (((MMonPaxos*)m)->machine_id) {
- case PAXOS_TEST:
- test_paxos.dispatch(m);
- break;
- case PAXOS_OSDMAP:
- //...
-
- default:
- assert(0);
+ {
+ MMonPaxos *pm = (MMonPaxos*)m;
+
+ // sanitize
+ if (pm->epoch > mon_epoch)
+ assert(0); //call_election(); // wtf
+ if (pm->epoch != mon_epoch) {
+ delete pm;
+ break;
+ }
+
+ // send it to the right paxos instance
+ switch (pm->machine_id) {
+ case PAXOS_TEST:
+ test_paxos.dispatch(m);
+ break;
+ case PAXOS_OSDMAP:
+ //...
+
+ default:
+ assert(0);
+ }
}
break;
utime_t last_called_election; // [starting] last time i called an election
public:
- // initiate election
- void call_election();
-
- // end election (called by Elector)
- void win_election(epoch_t epoch, set<int>& q);
- void lose_election(epoch_t epoch, int l);
-
+ epoch_t get_epoch() { return mon_epoch; }
int get_leader() { return leader; }
const set<int>& get_quorum() { return quorum; }
+ void call_election(); // initiate election
+ void win_election(epoch_t epoch, set<int>& q); // end election (called by Elector)
+ void lose_election(epoch_t epoch, int l); // end election (called by Elector)
+
// -- paxos --
Paxos test_paxos;
++p) {
if (*p == whoami) continue;
- MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
+ MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id);
collect->last_committed = last_committed;
collect->pn = accepted_pn;
mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
state = STATE_RECOVERING;
// reply
- MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
+ MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id);
last->last_committed = last_committed;
// do we have an accepted but uncommitted value?
if (last->last_committed < last_committed) {
// share committed values
dout(10) << "sending commit to " << last->get_source() << endl;
- MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+ MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
for (version_t v = last->last_committed;
v <= last_committed;
v++) {
if (*p == whoami) continue;
dout(10) << " sending begin to mon" << *p << endl;
- MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id);
+ MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id);
begin->values[last_committed+1] = new_value;
begin->last_committed = last_committed;
begin->pn = accepted_pn;
mon->store->put_bl_sn(begin->values[v], machine_name, v);
// reply
- MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
+ MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id);
accept->pn = accepted_pn;
accept->last_committed = last_committed;
mon->messenger->send_message(accept, begin->get_source_inst());
if (*p == whoami) continue;
dout(10) << " sending commit to mon" << *p << endl;
- MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+ MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
p != mon->get_quorum().end();
++p) {
if (*p == whoami) continue;
- MMonPaxos *lease = new MMonPaxos(MMonPaxos::OP_LEASE, machine_id);
+ MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
lease->last_committed = last_committed;
lease->lease_timeout = lease_timeout;
mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
{
state = STATE_RECOVERING;
lease_timeout = utime_t();
- dout(10) << "leader_init -- i am the leader, starting paxos recovery" << endl;
+ dout(10) << "leader_init -- starting paxos recovery" << endl;
collect(0);
}
delete m;
return;
}
-
- // from the proper leader?
- if (mon->is_peon()) {
- if (m->get_source().num() != mon->get_leader()) {
- dout(5) << "dropping from non-leader " << m->get_source() << " " << *m << endl;
- delete m;
- return;
- }
- }
-
- assert(mon->is_peon() || mon->is_leader());
+ // check sanity
+ assert(mon->is_leader() ||
+ (mon->is_peon() && m->get_source().num() == mon->get_leader()));
+
switch (m->get_type()) {
-
+
case MSG_MON_PAXOS:
{
MMonPaxos *pm = (MMonPaxos*)m;
case MMonPaxos::OP_COLLECT:
handle_collect(pm);
break;
-
case MMonPaxos::OP_LAST:
handle_last(pm);
break;
-
case MMonPaxos::OP_BEGIN:
handle_begin(pm);
break;
-
case MMonPaxos::OP_ACCEPT:
handle_accept(pm);
break;
-
case MMonPaxos::OP_COMMIT:
handle_commit(pm);
break;
-
case MMonPaxos::OP_LEASE:
handle_lease(pm);
break;
-
default:
assert(0);
}
*/
+
+/*
+ * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways:
+ * 1- Only a single new value is generated at a time, simplifying the recovery logic.
+ * 2- Nodes track "committed" values, and share them generously (and trustingly)
+ * 3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to
+ * "read" their copy of the last committed value.
+ *
+ * This provides a simple replication substrate that services can be built on top of.
+ */
+
#ifndef __MON_PAXOS_H
#define __MON_PAXOS_H
{
lock.Lock();
while (1) {
+ if (fm_shutdown) break;
+ fakemessenger_do_loop_2();
+
+ if (directory.empty()) break;
+
dout(20) << "thread waiting" << endl;
if (fm_shutdown) break;
awake = false;
cond.Wait(lock);
awake = true;
dout(20) << "thread woke up" << endl;
- if (fm_shutdown) break;
-
- fakemessenger_do_loop_2();
-
- if (directory.empty()) break;
}
lock.Unlock();