From 4e4c5bcd538f8572e4c963d0e928557ecfbfd1d2 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 4 Jul 2007 04:19:32 +0000 Subject: [PATCH] merged r1409:1471 from trunk/ceph into branches/sage/pgs git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1472 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/pgs/mon/ClientMonitor.cc | 245 +++++-- branches/sage/pgs/mon/ClientMonitor.h | 154 +++- branches/sage/pgs/mon/Elector.cc | 188 +++-- branches/sage/pgs/mon/Elector.h | 39 +- branches/sage/pgs/mon/MDSMonitor.cc | 539 ++++++++------ branches/sage/pgs/mon/MDSMonitor.h | 98 +-- branches/sage/pgs/mon/MonMap.h | 5 +- branches/sage/pgs/mon/Monitor.cc | 521 +++++++------- branches/sage/pgs/mon/Monitor.h | 94 +-- branches/sage/pgs/mon/MonitorStore.cc | 1 + branches/sage/pgs/mon/OSDMonitor.cc | 930 +++++++++++-------------- branches/sage/pgs/mon/OSDMonitor.h | 113 +-- branches/sage/pgs/mon/Paxos.cc | 602 +++++++++++++--- branches/sage/pgs/mon/Paxos.h | 175 ++++- branches/sage/pgs/mon/PaxosService.cc | 136 ++++ branches/sage/pgs/mon/PaxosService.h | 91 +++ branches/sage/pgs/mon/mon_types.h | 33 + 17 files changed, 2572 insertions(+), 1392 deletions(-) create mode 100644 branches/sage/pgs/mon/PaxosService.cc create mode 100644 branches/sage/pgs/mon/PaxosService.h create mode 100644 branches/sage/pgs/mon/mon_types.h diff --git a/branches/sage/pgs/mon/ClientMonitor.cc b/branches/sage/pgs/mon/ClientMonitor.cc index 1b7e4f0f12ac3..bfdede66b8ca7 100644 --- a/branches/sage/pgs/mon/ClientMonitor.cc +++ b/branches/sage/pgs/mon/ClientMonitor.cc @@ -17,6 +17,7 @@ #include "Monitor.h" #include "MDSMonitor.h" #include "OSDMonitor.h" +#include "MonitorStore.h" #include "messages/MClientMount.h" #include "messages/MClientUnmount.h" @@ -30,105 +31,207 @@ +bool ClientMonitor::update_from_paxos() +{ + assert(paxos->is_active()); + + version_t paxosv = paxos->get_version(); + dout(10) << "update_from_paxos paxosv " << paxosv + << ", my v " << client_map.version << endl; + + if (paxosv == client_map.version) return true; + assert(paxosv >= client_map.version); + + if (client_map.version == 0 && paxosv > 1 && + mon->store->exists_bl_ss("clientmap","latest")) { + // starting up: load latest + dout(7) << "update_from_paxos startup: loading latest full clientmap" << endl; + bufferlist bl; + mon->store->get_bl_ss(bl, "clientmap", "latest"); + int off = 0; + client_map._decode(bl, off); + } + + // walk through incrementals + while (paxosv > client_map.version) { + bufferlist bl; + bool success = paxos->read(client_map.version+1, bl); + if (success) { + dout(7) << "update_from_paxos applying incremental " << client_map.version+1 << endl; + Incremental inc; + int off = 0; + inc._decode(bl, off); + client_map.apply_incremental(inc); + + } else { + dout(7) << "update_from_paxos couldn't read incremental " << client_map.version+1 << endl; + return false; + } + } + + // save latest + bufferlist bl; + client_map._encode(bl); + mon->store->put_bl_ss(bl, "clientmap", "latest"); + + return true; +} -void ClientMonitor::dispatch(Message *m) +void ClientMonitor::create_pending() { - switch (m->get_type()) { + assert(mon->is_leader()); + pending_inc = Incremental(); + pending_inc.version = client_map.version + 1; + pending_inc.next_client = client_map.next_client; + dout(10) << "create_pending v " << pending_inc.version + << ", next is " << pending_inc.next_client + << endl; +} + +void ClientMonitor::create_initial() +{ + dout(1) << "create_initial -- creating initial map" << endl; +} + - case MSG_CLIENT_MOUNT: - handle_client_mount((MClientMount*)m); - break; +void ClientMonitor::encode_pending(bufferlist &bl) +{ + assert(mon->is_leader()); + dout(10) << "encode_pending v " << pending_inc.version + << ", next is " << pending_inc.next_client + << endl; + + assert(paxos->get_version() + 1 == pending_inc.version); + pending_inc._encode(bl); +} + + +// ------- + + +bool ClientMonitor::preprocess_query(Message *m) +{ + dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl; + + switch (m->get_type()) { + case MSG_CLIENT_MOUNT: + { + // already mounted? + entity_addr_t addr = m->get_source_addr(); + if (client_map.addr_client.count(addr)) { + int client = client_map.addr_client[addr]; + dout(7) << " client" << client << " already mounted" << endl; + _mounted(client, (MClientMount*)m); + return true; + } + } + return false; + case MSG_CLIENT_UNMOUNT: - handle_client_unmount((MClientUnmount*)m); - break; + { + // already unmounted? + int client = m->get_source().num(); + if (client_map.client_addr.count(client) == 0) { + dout(7) << " client" << client << " not mounted" << endl; + _unmounted((MClientUnmount*)m); + return true; + } + } + return false; - + default: assert(0); - } + delete m; + return true; + } } -void ClientMonitor::handle_client_mount(MClientMount *m) +bool ClientMonitor::prepare_update(Message *m) { - dout(7) << "client_mount from " << m->get_source_inst() << endl; - assert(m->get_source().is_client()); - int from = m->get_source().num(); + dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl; - // choose a client id - if (from < 0 || - (client_map.count(from) && - client_map[from] != m->get_source_addr())) { - from = num_clients++; - dout(10) << "client_mount assigned client" << from << endl; - } - - client_map[from] = m->get_source_addr(); - - // reply with latest mds map - entity_inst_t to = m->get_source_inst(); - to.name = MSG_ADDR_CLIENT(from); - mon->mdsmon->send_latest(to); - mon->osdmon->send_latest(to); - delete m; -} + switch (m->get_type()) { + case MSG_CLIENT_MOUNT: + { + MClientMount *mount = (MClientMount*)m; + entity_addr_t addr = mount->addr; + int client = -1; + if (mount->get_source().is_client()) + client = mount->get_source().num(); + + // choose a client id + if (client < 0 || + (client_map.client_addr.count(client) && + client_map.client_addr[client] != addr)) { + client = pending_inc.next_client; + dout(10) << "mount: assigned client" << client << " to " << addr << endl; + } else { + dout(10) << "mount: client" << client << " requested by " << addr << endl; + } + + pending_inc.add_mount(client, addr); + paxos->wait_for_commit(new C_Mounted(this, client, mount)); + } + return true; -void ClientMonitor::handle_client_unmount(MClientUnmount *m) -{ - dout(7) << "client_unmount from " << m->get_source() - << " at " << m->get_source_inst() << endl; - assert(m->get_source().is_client()); - int from = m->get_source().num(); - - if (client_map.count(from)) { - client_map.erase(from); - - if (client_map.empty() && - g_conf.mds_shutdown_on_last_unmount) { - dout(1) << "last client unmounted" << endl; - mon->do_stop(); + case MSG_CLIENT_UNMOUNT: + { + MClientUnmount *unmount = (MClientUnmount*)m; + assert(unmount->inst.name.is_client()); + int client = unmount->inst.name.num(); + + assert(client_map.client_addr.count(client)); + + pending_inc.add_unmount(client); + paxos->wait_for_commit(new C_Unmounted(this, unmount)); } + return true; + + default: + assert(0); + delete m; + return false; } - // reply with (same) unmount message to ack - mon->messenger->send_message(m, m->get_source_inst()); } +// MOUNT -/* -void ClientMonitor::handle_mds_shutdown(Message *m) -{ - assert(m->get_source().is_mds()); - int from = m->get_source().num(); - mdsmap.mds_inst.erase(from); - mdsmap.all_mds.erase(from); +void ClientMonitor::_mounted(int client, MClientMount *m) +{ + entity_inst_t to; + to.addr = m->addr; + to.name = MSG_ADDR_CLIENT(client); - dout(7) << "mds_shutdown from " << m->get_source() - << ", still have " << mdsmap.all_mds - << endl; - - // tell someone? - // fixme + dout(10) << "_mounted client" << client << " at " << to << endl; + // reply with latest mds, osd maps + mon->mdsmon->send_latest(to); + mon->osdmon->send_latest(0, to); + delete m; } -*/ - -/* -void ClientMonitor::bcast_latest_mds() +void ClientMonitor::_unmounted(MClientUnmount *m) { - dout(10) << "bcast_latest_mds " << mdsmap.get_epoch() << endl; + dout(10) << "_unmounted " << m->inst << endl; - // tell mds - for (set::iterator p = mdsmap.get_mds().begin(); - p != mdsmap.get_mds().end(); - p++) { - if (mdsmap.is_down(*p)) continue; - send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p)); + // reply with (same) unmount message + mon->messenger->send_message(m, m->inst); + + // auto-shutdown? + // (hack for fakesyn/newsyn, mostly) + if (mon->is_leader() && + client_map.version > 1 && + client_map.client_addr.empty() && + g_conf.mon_stop_on_last_unmount) { + dout(1) << "last client unmounted" << endl; + mon->do_stop(); } } -*/ + diff --git a/branches/sage/pgs/mon/ClientMonitor.h b/branches/sage/pgs/mon/ClientMonitor.h index 1ae9401465c94..8321202fc24f1 100644 --- a/branches/sage/pgs/mon/ClientMonitor.h +++ b/branches/sage/pgs/mon/ClientMonitor.h @@ -24,31 +24,153 @@ using namespace std; #include "mds/MDSMap.h" +#include "PaxosService.h" + class Monitor; +class Paxos; +class MClientMount; +class MClientUnmount; + +class ClientMonitor : public PaxosService { +public: + + struct Incremental { + version_t version; + uint32_t next_client; + map mount; + set unmount; + + Incremental() : version(0), next_client() {} + + bool is_empty() { return mount.empty() && unmount.empty(); } + void add_mount(uint32_t client, entity_addr_t addr) { + next_client = MAX(next_client, client+1); + mount[client] = addr; + } + void add_unmount(uint32_t client) { + assert(client < next_client); + if (mount.count(client)) + mount.erase(client); + else + unmount.insert(client); + } + + void _encode(bufferlist &bl) { + ::_encode(version, bl); + ::_encode(next_client, bl); + ::_encode(mount, bl); + ::_encode(unmount, bl); + } + void _decode(bufferlist &bl, int& off) { + ::_decode(version, bl, off); + ::_decode(next_client, bl, off); + ::_decode(mount, bl, off); + ::_decode(unmount, bl, off); + } + }; + + struct Map { + version_t version; + uint32_t next_client; + map client_addr; + hash_map addr_client; + + Map() : version(0), next_client(0) {} -class ClientMonitor : public Dispatcher { - Monitor *mon; - Messenger *messenger; - Mutex &lock; + void reverse() { + addr_client.clear(); + for (map::iterator p = client_addr.begin(); + p != client_addr.end(); + ++p) { + addr_client[p->second] = p->first; + } + } + void apply_incremental(Incremental &inc) { + assert(inc.version == version+1); + version = inc.version; + next_client = inc.next_client; + for (map::iterator p = inc.mount.begin(); + p != inc.mount.end(); + ++p) { + client_addr[p->first] = p->second; + addr_client[p->second] = p->first; + } + + for (set::iterator p = inc.unmount.begin(); + p != inc.unmount.end(); + ++p) { + assert(client_addr.count(*p)); + addr_client.erase(client_addr[*p]); + client_addr.erase(*p); + } + } - private: - int num_clients; - map client_map; + void _encode(bufferlist &bl) { + ::_encode(version, bl); + ::_encode(next_client, bl); + ::_encode(client_addr, bl); + } + void _decode(bufferlist &bl, int& off) { + ::_decode(version, bl, off); + ::_decode(next_client, bl, off); + ::_decode(client_addr, bl, off); + reverse(); + } + }; - void bcast_latest_mds(); + class C_Mounted : public Context { + ClientMonitor *cmon; + int client; + MClientMount *m; + public: + C_Mounted(ClientMonitor *cm, int c, MClientMount *m_) : + cmon(cm), client(c), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_mounted(client, m); + else + cmon->dispatch((Message*)m); + } + }; - //void accept_pending(); // accept pending, new map. - //void send_incremental(epoch_t since, msg_addr_t dest); + class C_Unmounted : public Context { + ClientMonitor *cmon; + MClientUnmount *m; + public: + C_Unmounted(ClientMonitor *cm, MClientUnmount *m_) : + cmon(cm), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_unmounted(m); + else + cmon->dispatch((Message*)m); + } + }; - void handle_client_mount(class MClientMount *m); - void handle_client_unmount(class MClientUnmount *m); +private: + Map client_map; + + // leader + Incremental pending_inc; + + void create_initial(); + bool update_from_paxos(); + void create_pending(); // prepare a new pending + void encode_pending(bufferlist &bl); // propose pending update to peers + + void _mounted(int c, MClientMount *m); + void _unmounted(MClientUnmount *m); + + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); + + public: - ClientMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l), - num_clients(0) { } + ClientMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } - void dispatch(Message *m); - void tick(); // check state, take actions + //void tick(); // check state, take actions + }; #endif diff --git a/branches/sage/pgs/mon/Elector.cc b/branches/sage/pgs/mon/Elector.cc index 43341f1a4a327..816946d3cbfe3 100644 --- a/branches/sage/pgs/mon/Elector.cc +++ b/branches/sage/pgs/mon/Elector.cc @@ -16,33 +16,58 @@ #include "Monitor.h" #include "common/Timer.h" - -#include "messages/MMonElectionPropose.h" -#include "messages/MMonElectionAck.h" -#include "messages/MMonElectionVictory.h" +#include "MonitorStore.h" +#include "messages/MMonElection.h" #include "config.h" #undef dout -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector " -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") " -void Elector::start() +void Elector::init() { - dout(5) << "start -- can i be leader?" << endl; + epoch = mon->store->get_int("mon_epoch"); + if (!epoch) + epoch = 1; + dout(1) << "init, last seen epoch " << epoch << endl; +} + +void Elector::shutdown() +{ + if (expire_event) + mon->timer.cancel_event(expire_event); +} + +void Elector::bump_epoch(epoch_t e) +{ + dout(10) << "bump_epoch " << epoch << " to " << e << endl; + assert(epoch < e); + epoch = e; + mon->store->put_int(epoch, "mon_epoch"); + // clear up some state + electing_me = false; + acked_me.clear(); leader_acked = -1; +} + +void Elector::start() +{ + dout(5) << "start -- can i be leader?" << endl; + // start by trying to elect me + if (epoch % 2 == 0) + bump_epoch(epoch+1); // odd == election cycle start_stamp = g_clock.now(); - acked_me.clear(); - acked_me.insert(whoami); electing_me = true; + acked_me.insert(whoami); // bcast to everyone else for (int i=0; imonmap->num_mon; ++i) { if (i == whoami) continue; - mon->messenger->send_message(new MMonElectionPropose, + mon->messenger->send_message(new MMonElection(MMonElection::OP_PROPOSE, epoch), mon->monmap->get_inst(i)); } @@ -54,6 +79,7 @@ void Elector::defer(int who) dout(5) << "defer to " << who << endl; if (electing_me) { + // drop out acked_me.clear(); electing_me = false; } @@ -61,7 +87,7 @@ void Elector::defer(int who) // ack them leader_acked = who; ack_stamp = g_clock.now(); - mon->messenger->send_message(new MMonElectionAck, + mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch), mon->monmap->get_inst(who)); // set a timer @@ -69,29 +95,22 @@ void Elector::defer(int who) } -class C_Mon_ElectionExpire : public Context { - Elector *elector; -public: - C_Mon_ElectionExpire(Elector *e) : elector(e) { } - void finish(int r) { - elector->expire(); - } -}; - void Elector::reset_timer(double plus) { // set the timer cancel_timer(); - expire_event = new C_Mon_ElectionExpire(this); - g_timer.add_event_after(g_conf.mon_lease + plus, - expire_event); + expire_event = new C_ElectionExpire(this); + mon->timer.add_event_after(g_conf.mon_lease + plus, + expire_event); } void Elector::cancel_timer() { - if (expire_event) - g_timer.cancel_event(expire_event); + if (expire_event) { + mon->timer.cancel_event(expire_event); + expire_event = 0; + } } void Elector::expire() @@ -114,29 +133,48 @@ void Elector::victory() { leader_acked = -1; electing_me = false; - + set quorum = acked_me; + cancel_timer(); - + + assert(epoch % 2 == 1); // election + bump_epoch(epoch+1); // is over! + // tell everyone - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; - mon->messenger->send_message(new MMonElectionVictory, - mon->monmap->get_inst(i)); + for (set::iterator p = quorum.begin(); + p != quorum.end(); + ++p) { + if (*p == whoami) continue; + mon->messenger->send_message(new MMonElection(MMonElection::OP_VICTORY, epoch), + mon->monmap->get_inst(*p)); } // tell monitor - mon->win_election(acked_me); + mon->win_election(epoch, quorum); } -void Elector::handle_propose(MMonElectionPropose *m) +void Elector::handle_propose(MMonElection *m) { dout(5) << "handle_propose from " << m->get_source() << endl; int from = m->get_source().num(); - if (from > whoami) { - if (leader_acked >= 0 && // we already acked someone - leader_acked < from) { // who would win over them + assert(m->epoch % 2 == 1); // election + if (m->epoch > epoch) { + bump_epoch(m->epoch); + } + else if (m->epoch < epoch && // got an "old" propose, + epoch % 2 == 0 && // in a non-election cycle + mon->quorum.count(from) == 0) { // from someone outside the quorum + // a mon just started up, call a new election so they can rejoin! + dout(5) << " got propose from old epoch, " << m->get_source() << " must have just started" << endl; + start(); + } + + if (whoami < from) { + // i would win over them. + if (leader_acked >= 0) { // we already acked someone + assert(leader_acked < from); // and they still win, of course dout(5) << "no, we already acked " << leader_acked << endl; } else { // wait, i should win! @@ -158,11 +196,21 @@ void Elector::handle_propose(MMonElectionPropose *m) delete m; } -void Elector::handle_ack(MMonElectionAck *m) +void Elector::handle_ack(MMonElection *m) { dout(5) << "handle_ack from " << m->get_source() << endl; int from = m->get_source().num(); + assert(m->epoch % 2 == 1); // election + if (m->epoch > epoch) { + dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << endl; + bump_epoch(m->epoch); + start(); + delete m; + return; + } + assert(m->epoch == epoch); + if (electing_me) { // thanks acked_me.insert(from); @@ -175,26 +223,28 @@ void Elector::handle_ack(MMonElectionAck *m) } } else { // ignore, i'm deferring already. + assert(leader_acked >= 0); } delete m; } -void Elector::handle_victory(MMonElectionVictory *m) + +void Elector::handle_victory(MMonElection *m) { dout(5) << "handle_victory from " << m->get_source() << endl; int from = m->get_source().num(); + + assert(from < whoami); + assert(m->epoch % 2 == 0); + assert(m->epoch == epoch + 1); // i should have seen this election if i'm getting the victory. + bump_epoch(m->epoch); - if (from < whoami) { - // ok, fine, they win - mon->lose_election(from); - - // cancel my timer - cancel_timer(); - } else { - // no, that makes no sense, i should win. start over! - start(); - } + // they win + mon->lose_election(epoch, from); + + // cancel my timer + cancel_timer(); } @@ -203,19 +253,37 @@ void Elector::handle_victory(MMonElectionVictory *m) void Elector::dispatch(Message *m) { switch (m->get_type()) { - case MSG_MON_ELECTION_ACK: - handle_ack((MMonElectionAck*)m); - break; - - case MSG_MON_ELECTION_PROPOSE: - handle_propose((MMonElectionPropose*)m); - break; - - case MSG_MON_ELECTION_VICTORY: - handle_victory((MMonElectionVictory*)m); + + case MSG_MON_ELECTION: + { + MMonElection *em = (MMonElection*)m; + + switch (em->op) { + case MMonElection::OP_PROPOSE: + handle_propose(em); + return; + } + + if (em->epoch < epoch) { + dout(5) << "old epoch, dropping" << endl; + delete em; + break; + } + + switch (em->op) { + case MMonElection::OP_ACK: + handle_ack(em); + return; + case MMonElection::OP_VICTORY: + handle_victory(em); + return; + default: + assert(0); + } + } break; - default: + default: assert(0); } } diff --git a/branches/sage/pgs/mon/Elector.h b/branches/sage/pgs/mon/Elector.h index 2a10dddf92419..9bfd7cb644fc7 100644 --- a/branches/sage/pgs/mon/Elector.h +++ b/branches/sage/pgs/mon/Elector.h @@ -39,6 +39,8 @@ class Elector { void reset_timer(double plus=0.0); void cancel_timer(); + epoch_t epoch; // latest epoch we've seen. odd == election, even == stable, + // electing me bool electing_me; utime_t start_stamp; @@ -48,25 +50,42 @@ class Elector { int leader_acked; // who i've acked utime_t ack_stamp; // and when - public: - + void bump_epoch(epoch_t e=0); // i just saw a larger epoch + + class C_ElectionExpire : public Context { + Elector *elector; + public: + C_ElectionExpire(Elector *e) : elector(e) { } + void finish(int r) { + elector->expire(); + } + }; + void start(); // start an electing me void defer(int who); void expire(); // timer goes off void victory(); - void handle_propose(class MMonElectionPropose *m); - void handle_ack(class MMonElectionAck *m); - void handle_victory(class MMonElectionVictory *m); - + void handle_propose(class MMonElection *m); + void handle_ack(class MMonElection *m); + void handle_victory(class MMonElection *m); public: - Elector(Monitor *m, int w) : mon(m), whoami(w) { - // initialize all those values! - // ... - } + Elector(Monitor *m, int w) : mon(m), whoami(w), + expire_event(0), + epoch(0), + electing_me(false), + leader_acked(-1) { } + + void init(); + void shutdown(); void dispatch(Message *m); + + void call_election() { + start(); + } + }; diff --git a/branches/sage/pgs/mon/MDSMonitor.cc b/branches/sage/pgs/mon/MDSMonitor.cc index c9a680d36a244..519f32cae7e5a 100644 --- a/branches/sage/pgs/mon/MDSMonitor.cc +++ b/branches/sage/pgs/mon/MDSMonitor.cc @@ -16,12 +16,17 @@ #include "MDSMonitor.h" #include "Monitor.h" #include "MonitorStore.h" +#include "OSDMonitor.h" #include "messages/MMDSMap.h" #include "messages/MMDSGetMap.h" #include "messages/MMDSBeacon.h" #include "messages/MMonCommand.h" +#include "messages/MMonCommandAck.h" + +#include "messages/MGenericMessage.h" + #include "common/Timer.h" @@ -34,280 +39,367 @@ -/********* MDS map **************/ +// my methods -void MDSMonitor::dispatch(Message *m) +void MDSMonitor::print_map(MDSMap &m) { - switch (m->get_type()) { - - case MSG_MDS_BEACON: - handle_mds_beacon((MMDSBeacon*)m); - break; - - case MSG_MDS_GETMAP: - handle_mds_getmap((MMDSGetMap*)m); - break; - - default: - assert(0); - } -} - - - -void MDSMonitor::election_finished() -{ - if (mon->is_leader()) { - - // FIXME be smarter later. - - if (g_conf.mkfs) { - create_initial(); - save_map(); - } else { - load_map(); - } + dout(7) << "print_map epoch " << m.get_epoch() << " num_mds " << g_conf.num_mds << endl; + entity_inst_t blank; + set all; + m.get_mds_set(all); + for (set::iterator p = all.begin(); + p != all.end(); + ++p) { + dout(7) << " mds" << *p << "." << m.mds_inc[*p] + << " : " << MDSMap::get_state_name(m.get_state(*p)) + << " : " << (m.have_inst(*p) ? m.get_inst(*p) : blank) + << endl; } } -void MDSMonitor::create_initial() -{ - mdsmap.epoch = 0; // until everyone boots - mdsmap.created = g_clock.now(); - - mdsmap.encode(encoded_map); - print_map(); -} +// service methods -void MDSMonitor::load_map() +void MDSMonitor::create_initial() { - int r = mon->store->get_bl_ss(encoded_map, "mdsmap", "current"); - assert(r > 0); - mdsmap.decode(encoded_map); - dout(7) << "load_map epoch " << mdsmap.get_epoch() << endl; + dout(10) << "create_initial" << endl; + pending_mdsmap.created = g_clock.now(); + print_map(pending_mdsmap); } -void MDSMonitor::save_map() +bool MDSMonitor::update_from_paxos() { - dout(7) << "save_map epoch " << mdsmap.get_epoch() << endl; - - int r = mon->store->put_bl_ss(encoded_map, "mdsmap", "current"); - assert(r>=0); + assert(paxos->is_active()); + + version_t paxosv = paxos->get_version(); + dout(10) << "update_from_paxos paxosv " << paxosv + << ", my e " << mdsmap.epoch << endl; + + if (paxosv == mdsmap.epoch) return true; + assert(paxosv >= mdsmap.epoch); + + // read and decode + mdsmap_bl.clear(); + bool success = paxos->read(paxosv, mdsmap_bl); + assert(success); + dout(10) << "update_from_paxos got " << paxosv << endl; + mdsmap.decode(mdsmap_bl); + + // new map + print_map(mdsmap); + + // bcast map to mds, waiters + if (mon->is_leader()) + bcast_latest_mds(); + send_to_waiting(); + + // hackish: did all mds's shut down? + if (mon->is_leader() && + g_conf.mon_stop_with_last_mds && + mdsmap.get_epoch() > 1 && + mdsmap.get_num_up_or_failed_mds() == 0) + mon->messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + mon->monmap->get_inst(mon->whoami)); + + return true; } -void MDSMonitor::print_map() +void MDSMonitor::create_pending() { - dout(7) << "print_map epoch " << mdsmap.get_epoch() << " num_mds " << g_conf.num_mds << endl; - entity_inst_t blank; - set all; - mdsmap.get_mds_set(all); - for (set::iterator p = all.begin(); - p != all.end(); - ++p) { - dout(7) << " mds" << *p << "." << mdsmap.mds_inc[*p] - << " : " << MDSMap::get_state_name(mdsmap.get_state(*p)) - << " : " << (mdsmap.have_inst(*p) ? mdsmap.get_inst(*p) : blank) - << endl; - } + pending_mdsmap = mdsmap; + pending_mdsmap.epoch++; + dout(10) << "create_pending e" << pending_mdsmap.epoch << endl; } -void MDSMonitor::issue_map() +void MDSMonitor::encode_pending(bufferlist &bl) { - mdsmap.inc_epoch(); - encoded_map.clear(); - mdsmap.encode(encoded_map); - - dout(7) << "issue_map epoch " << mdsmap.get_epoch() << endl; - - save_map(); - print_map(); + dout(10) << "encode_pending e" << pending_mdsmap.epoch << endl; - // bcast map - bcast_latest_mds(); - send_current(); + print_map(pending_mdsmap); + + // apply to paxos + assert(paxos->get_version() + 1 == pending_mdsmap.epoch); + pending_mdsmap.encode(bl); } -void MDSMonitor::handle_command(MMonCommand *m, int& r, string& rs) +bool MDSMonitor::preprocess_query(Message *m) { - stringstream ss; - if (m->cmd.size() > 1) { - if (m->cmd[1] == "stop" && m->cmd.size() > 2) { - int who = atoi(m->cmd[2].c_str()); - if (mdsmap.is_active(who)) { - r = 0; - ss << "telling mds" << who << " to stop"; - getline(ss,rs); + dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl; - // hack - mdsmap.mds_state[who] = MDSMap::STATE_STOPPING; - issue_map(); + switch (m->get_type()) { + + case MSG_MDS_BEACON: + return preprocess_beacon((MMDSBeacon*)m); + + case MSG_MDS_GETMAP: + send_full(m->get_source_inst()); + return true; - } else { - ss << "mds" << who << " not active (" << mdsmap.get_state_name(mdsmap.get_state(who)) << ")"; - getline(ss,rs); - } - } - else if (m->cmd[1] == "setnum" && m->cmd.size() > 2) { - g_conf.num_mds = atoi(m->cmd[2].c_str()); - ss << "g_conf.num_mds = " << g_conf.num_mds << endl; - getline(ss,rs); - print_map(); - } + case MSG_MON_COMMAND: + return false; + + default: + assert(0); + delete m; + return true; } } - -void MDSMonitor::handle_mds_beacon(MMDSBeacon *m) +bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) { - dout(12) << "mds_beacon " << *m - << " from " << m->get_source() - << " " << m->get_source_inst() - << endl; - int from = m->get_source().num(); + dout(12) << "preprocess_beacon " << *m + << " from " << m->get_mds_inst() + << endl; + + // fw to leader? + if (!mon->is_leader()) { + dout(10) << "fw to leader" << endl; + mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + return true; + } + + // let's see. + int from = m->get_mds_inst().name.num(); int state = m->get_state(); version_t seq = m->get_seq(); - // initial boot? - bool booted = false; + // can i handle this query without a map update? - // choose an MDS id - if (from >= 0) { - // wants to be (or already is) a specific MDS. - if (mdsmap.is_down(from)) { - dout(10) << "mds_beacon assigning requested mds" << from << endl; - booted = true; - } else if (mdsmap.get_inst(from) != m->get_source_inst()) { - dout(10) << "mds_beacon not assigning requested mds" << from - << ", that mds is up and someone else" << endl; - from = -1; - } - } - if (from < 0) { - // pick a failed mds? - set failed; - mdsmap.get_failed_mds_set(failed); - if (!failed.empty()) { - from = *failed.begin(); - dout(10) << "mds_beacon assigned failed mds" << from << endl; - booted = true; - } - } - if (from < 0) { - // ok, just pick any unused mds id. - for (from=0; ; ++from) { - if (mdsmap.is_dne(from) || - mdsmap.is_out(from)) { - dout(10) << "mds_beacon assigned out|dne mds" << from << endl; - booted = true; - break; - } - } + // boot? + if (state == MDSMap::STATE_BOOT) { + // already booted? + int already = mdsmap.get_addr_rank(m->get_mds_inst().addr); + if (already < 0) + return false; // need to update map + + // already booted. just reply to beacon, as per usual. + from = already; } - - // old beacon? + // reply to beacon if (mdsmap.mds_state_seq[from] > seq) { dout(7) << "mds_beacon " << *m << " has old seq, ignoring" << endl; delete m; - return; + return true; } // reply to beacon? if (state != MDSMap::STATE_OUT) { last_beacon[from] = g_clock.now(); // note time - messenger->send_message(new MMDSBeacon(state, seq), - m->get_source_inst()); + mon->messenger->send_message(new MMDSBeacon(m->get_mds_inst(), state, seq), + m->get_mds_inst()); } + + // is there a state change here? + if (mdsmap.mds_state.count(from) == 0 || + mdsmap.mds_state[from] != state) + return false; // yep, need to update map. + + // we're done. + delete m; + return true; +} - // make sure it's in the map - if (booted) { - mdsmap.mds_inst[from].addr = m->get_source_addr(); - mdsmap.mds_inst[from].name = MSG_ADDR_MDS(from); - mdsmap.mds_inc[from]++; +bool MDSMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << endl; - // someone (new) joined the cluster - mdsmap.same_inst_since = mdsmap.epoch+1; + switch (m->get_type()) { + + case MSG_MDS_BEACON: + return handle_beacon((MMDSBeacon*)m); + + case MSG_MON_COMMAND: + return handle_command((MMonCommand*)m); - // starting -> creating|starting|replay - if (mdsmap.is_degraded() && - !mdsmap.is_failed(from)) { - dout(10) << "mds_beacon currently degraded, mds" << from << " will be standby" << endl; - state = MDSMap::STATE_STANDBY; - } - /* - else if (from >= g_conf.num_mds) { - dout(10) << "mds_beacon already have " << g_conf.num_mds << " mds's, standby (increase with 'mds setnum xxx')" << endl; - state = MDSMap::STATE_STANDBY; - } - */ - else if (state == MDSMap::STATE_STARTING) { + default: + assert(0); + delete m; + } + + return true; +} + +bool MDSMonitor::should_propose_now() +{ + return true; +} + + +bool MDSMonitor::handle_beacon(MMDSBeacon *m) +{ + // -- this is an update -- + dout(12) << "handle_beacon " << *m + << " from " << m->get_mds_inst() + << endl; + int from = m->get_mds_inst().name.num(); + int state = m->get_state(); + version_t seq = m->get_seq(); + + // boot? + if (state == MDSMap::STATE_BOOT) { + // assign a name. + if (from >= 0) { + // wants to be (or already is) a specific MDS. if (mdsmap.is_failed(from)) { - dout(10) << "mds_beacon will recover mds" << from << endl; + dout(10) << "mds_beacon boot: mds" << from << " was failed, replaying" << endl; state = MDSMap::STATE_REPLAY; - } - else if (mdsmap.is_out(from)) { - dout(10) << "mds_beacon will start mds" << from << endl; + } else if (mdsmap.is_out(from)) { + dout(10) << "mds_beacon boot: mds" << from << " was out, starting" << endl; state = MDSMap::STATE_STARTING; - } - else { - dout(10) << "mds_beacon will create mds" << from << endl; - state = MDSMap::STATE_CREATING; - } + } else if (!mdsmap.have_inst(from) || mdsmap.get_inst(from) != m->get_mds_inst()) { + dout(10) << "mds_beacon boot: mds" << from << " is someone else" << endl; + from = -1; + } + } + if (from < 0) { + from = pending_mdsmap.get_addr_rank(m->get_mds_inst().addr); + if (from >= 0) { + state = pending_mdsmap.mds_state[from]; + dout(10) << "mds_beacon boot: already pending mds" << from + << " " << MDSMap::get_state_name(state) << endl; + delete m; + return false; + } + } + if (from < 0) { + // pick a failed mds? + set failed; + pending_mdsmap.get_failed_mds_set(failed); + if (!failed.empty()) { + from = *failed.begin(); + dout(10) << "mds_beacon boot: assigned failed mds" << from << endl; + state = MDSMap::STATE_REPLAY; + } + } + if (from < 0) { + // ok, just pick any unused mds id. + for (from=0; ; ++from) { + if (pending_mdsmap.is_dne(from)) { + dout(10) << "mds_beacon boot: assigned new mds" << from << endl; + state = MDSMap::STATE_CREATING; + break; + } else if (pending_mdsmap.is_out(from)) { + dout(10) << "mds_beacon boot: assigned out mds" << from << endl; + state = MDSMap::STATE_STARTING; + break; + } + } + } + + assert(state != MDSMap::STATE_BOOT); + + // put it in the map. + pending_mdsmap.mds_inst[from].addr = m->get_mds_inst().addr; + pending_mdsmap.mds_inst[from].name = MSG_ADDR_MDS(from); + pending_mdsmap.mds_inc[from]++; + + // someone (new) has joined the cluster. + pending_mdsmap.same_inst_since = pending_mdsmap.epoch; + + // if degraded, starting -> standby + if (pending_mdsmap.is_degraded() && + state == MDSMap::STATE_STARTING) { + dout(10) << "mds_beacon boot: cluster degraded, mds" << from << " will be standby" << endl; + state = MDSMap::STATE_STANDBY; } } // if creating -> active, go to standby instead - if (state == MDSMap::STATE_ACTIVE && mdsmap.is_creating(from)) { - mdsmap.mds_created.insert(from); + if (state == MDSMap::STATE_ACTIVE && + mdsmap.is_creating(from)) { + pending_mdsmap.mds_created.insert(from); dout(10) << "mds_beacon created mds" << from << endl; if (mdsmap.is_degraded()) { - dout(10) << "mds_beacon current degraded, marking mds" << from << " as standby" << endl; + dout(10) << "mds_beacon cluster degraded, marking mds" << from << " as standby" << endl; state = MDSMap::STATE_STANDBY; } } + // update the map + dout(10) << "mds_beacon mds" << from << " " << MDSMap::get_state_name(mdsmap.mds_state[from]) + << " -> " << MDSMap::get_state_name(state) + << endl; - // did we update the map? - if (mdsmap.mds_state.count(from) == 0 || - mdsmap.mds_state[from] != state) { - // update mds state - dout(10) << "mds_beacon mds" << from << " " << MDSMap::get_state_name(mdsmap.mds_state[from]) - << " -> " << MDSMap::get_state_name(state) - << endl; - // did someone leave the cluster? - if (state == MDSMap::STATE_OUT && mdsmap.mds_state[from] != MDSMap::STATE_OUT) - mdsmap.same_inst_since = mdsmap.epoch+1; - - // change the state - mdsmap.mds_state[from] = state; - if (mdsmap.is_up(from)) - mdsmap.mds_state_seq[from] = seq; - else - mdsmap.mds_state_seq.erase(from); - - issue_map(); - } + // did someone leave the cluster? + if (state == MDSMap::STATE_OUT && + mdsmap.mds_state[from] != MDSMap::STATE_OUT) + pending_mdsmap.same_inst_since = pending_mdsmap.epoch; + + // change the state + pending_mdsmap.mds_state[from] = state; + if (pending_mdsmap.is_up(from)) + pending_mdsmap.mds_state_seq[from] = seq; + else + pending_mdsmap.mds_state_seq.erase(from); + + + dout(7) << "pending map now:" << endl; + print_map(pending_mdsmap); + paxos->wait_for_commit(new C_Updated(this, from, m)); + + return true; +} + + +void MDSMonitor::_updated(int from, MMDSBeacon *m) +{ + if (m->get_state() == MDSMap::STATE_BOOT) { + dout(10) << "_updated (booted) mds" << from << " " << *m << endl; + mon->osdmon->send_latest(0, mdsmap.get_inst(from)); + } else { + dout(10) << "_updated mds" << from << " " << *m << endl; + } delete m; } -void MDSMonitor::handle_mds_getmap(MMDSGetMap *m) + +bool MDSMonitor::handle_command(MMonCommand *m) { - dout(7) << "mds_getmap from " << m->get_source() << " " << m->get_source_inst() << endl; - if (mdsmap.get_epoch() > 0) - send_full(m->get_source_inst()); - else - awaiting_map.push_back( m->get_source_inst() ); + int r = -1; + string rs = "unrecognized command"; + stringstream ss; + + if (m->cmd.size() > 1) { + if (m->cmd[1] == "stop" && m->cmd.size() > 2) { + int who = atoi(m->cmd[2].c_str()); + if (mdsmap.is_active(who)) { + r = 0; + ss << "telling mds" << who << " to stop"; + getline(ss,rs); + + pending_mdsmap.mds_state[who] = MDSMap::STATE_STOPPING; + + } else { + ss << "mds" << who << " not active (" << mdsmap.get_state_name(mdsmap.get_state(who)) << ")"; + getline(ss,rs); + } + } + /* + else if (m->cmd[1] == "setnum" && m->cmd.size() > 2) { + g_conf.num_mds = atoi(m->cmd[2].c_str()); + ss << "g_conf.num_mds = " << g_conf.num_mds << endl; + getline(ss,rs); + print_map(); + } + */ + } + + // reply + mon->messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); + delete m; + return r >= 0; } + void MDSMonitor::bcast_latest_mds() { dout(10) << "bcast_latest_mds " << mdsmap.get_epoch() << endl; @@ -324,26 +416,25 @@ void MDSMonitor::bcast_latest_mds() void MDSMonitor::send_full(entity_inst_t dest) { dout(11) << "send_full to " << dest << endl; - messenger->send_message(new MMDSMap(&mdsmap), dest); + mon->messenger->send_message(new MMDSMap(&mdsmap), dest); } -void MDSMonitor::send_current() +void MDSMonitor::send_to_waiting() { - dout(10) << "mds_send_current " << mdsmap.get_epoch() << endl; - for (list::iterator i = awaiting_map.begin(); - i != awaiting_map.end(); + dout(10) << "send_to_waiting " << mdsmap.get_epoch() << endl; + for (list::iterator i = waiting_for_map.begin(); + i != waiting_for_map.end(); i++) send_full(*i); - awaiting_map.clear(); + waiting_for_map.clear(); } void MDSMonitor::send_latest(entity_inst_t dest) { - // FIXME: check if we're locked, etc. - if (mdsmap.get_epoch() > 0) + if (paxos->is_readable()) send_full(dest); else - awaiting_map.push_back(dest); + waiting_for_map.push_back(dest); } @@ -351,6 +442,11 @@ void MDSMonitor::tick() { // make sure mds's are still alive utime_t now = g_clock.now(); + + // ...if i am an active leader + if (!mon->is_leader()) return; + if (!paxos->is_active()) return; + if (now > g_conf.mds_beacon_grace) { utime_t cutoff = now; cutoff -= g_conf.mds_beacon_grace; @@ -403,8 +499,8 @@ void MDSMonitor::tick() << endl; // update map - mdsmap.mds_state[*p] = newstate; - mdsmap.mds_state_seq.erase(*p); + pending_mdsmap.mds_state[*p] = newstate; + pending_mdsmap.mds_state_seq.erase(*p); changed = true; } } else { @@ -413,20 +509,29 @@ void MDSMonitor::tick() } } - if (changed) { - issue_map(); - } + if (changed) + propose_pending(); } } void MDSMonitor::do_stop() { + // hrm... + if (!mon->is_leader() || + !paxos->is_active()) { + dout(-10) << "do_stop can't stop right now, mdsmap not writeable" << endl; + return; + } + + dout(10) << "do_stop stopping active mds nodes" << endl; + + print_map(mdsmap); for (map::iterator p = mdsmap.mds_state.begin(); p != mdsmap.mds_state.end(); ++p) if (mdsmap.is_active(p->first)) - mdsmap.mds_state[p->first] = MDSMap::STATE_STOPPING; + pending_mdsmap.mds_state[p->first] = MDSMap::STATE_STOPPING; - issue_map(); + propose_pending(); } diff --git a/branches/sage/pgs/mon/MDSMonitor.h b/branches/sage/pgs/mon/MDSMonitor.h index 658ba50855b29..082423aec33a0 100644 --- a/branches/sage/pgs/mon/MDSMonitor.h +++ b/branches/sage/pgs/mon/MDSMonitor.h @@ -24,67 +24,71 @@ using namespace std; #include "mds/MDSMap.h" -class Monitor; +#include "PaxosService.h" -class MDSMonitor : public Dispatcher { - Monitor *mon; - Messenger *messenger; - Mutex &lock; +class MMDSBeacon; - // mds maps +class MDSMonitor : public PaxosService { public: - MDSMap mdsmap; - - private: - bufferlist encoded_map; - - //map inc_maps; - //MDSMap::Incremental pending_inc; + // mds maps + MDSMap mdsmap; // current + bufferlist mdsmap_bl; // encoded + + MDSMap pending_mdsmap; // current + pending updates + + // my helpers + void print_map(MDSMap &m); + + class C_Updated : public Context { + MDSMonitor *mm; + int mds; + MMDSBeacon *m; + public: + C_Updated(MDSMonitor *a, int b, MMDSBeacon *c) : + mm(a), mds(b), m(c) {} + void finish(int r) { + if (r >= 0) + mm->_updated(mds, m); // success + else + mm->dispatch((Message*)m); // try again + } + }; + + + // service methods + void create_initial(); + bool update_from_paxos(); + void create_pending(); + void encode_pending(bufferlist &bl); - list awaiting_map; + void _updated(int m, MMDSBeacon *m); + + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); + bool should_propose_now(); + + bool preprocess_beacon(class MMDSBeacon *m); + bool handle_beacon(class MMDSBeacon *m); + bool handle_command(class MMonCommand *m); // beacons map last_beacon; - bool is_alive(int mds); +public: + MDSMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } + // sending the map +private: + list waiting_for_map; - // maps - void create_initial(); - void send_current(); // send current map to waiters. - void send_full(entity_inst_t dest); void bcast_latest_mds(); + void send_full(entity_inst_t dest); + void send_to_waiting(); - void issue_map(); - - void save_map(); - void load_map(); - void print_map(); - - //void accept_pending(); // accept pending, new map. - //void send_incremental(epoch_t since, msg_addr_t dest); - - void handle_mds_state(class MMDSState *m); - void handle_mds_beacon(class MMDSBeacon *m); - //void handle_mds_failure(class MMDSFailure *m); - void handle_mds_getmap(class MMDSGetMap *m); - - - - public: - MDSMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l) { - } - - void dispatch(Message *m); - void tick(); // check state, take actions - - void election_starting(); - void election_finished(); - +public: void send_latest(entity_inst_t dest); - void handle_command(class MMonCommand *m, int& r, string& rs); - + void tick(); // check state, take actions void do_stop(); }; diff --git a/branches/sage/pgs/mon/MonMap.h b/branches/sage/pgs/mon/MonMap.h index cd77bbf3488e6..eb18579cd7e99 100644 --- a/branches/sage/pgs/mon/MonMap.h +++ b/branches/sage/pgs/mon/MonMap.h @@ -24,7 +24,7 @@ class MonMap { public: - epoch_t epoch; // what epoch of the osd cluster descriptor is this + epoch_t epoch; // what epoch/version of the monmap int num_mon; vector mon_inst; @@ -41,7 +41,7 @@ class MonMap { // choice should be stable, unless we explicitly ask for a new one. int pick_mon(bool newmon=false) { if (newmon || (last_mon < 0)) { - last_mon = 0; //last_mon = rand() % num_mon; + last_mon = rand() % num_mon; } return last_mon; } @@ -68,6 +68,7 @@ class MonMap { _decode(mon_inst, blist, off); } + // read from/write to a file int write(char *fn) { // encode bufferlist bl; diff --git a/branches/sage/pgs/mon/Monitor.cc b/branches/sage/pgs/mon/Monitor.cc index 402f7359552bb..5937fb4d0b69f 100644 --- a/branches/sage/pgs/mon/Monitor.cc +++ b/branches/sage/pgs/mon/Monitor.cc @@ -1,218 +1,251 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -// TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer - -#include "Monitor.h" - -#include "osd/OSDMap.h" - -#include "MonitorStore.h" - -#include "msg/Message.h" -#include "msg/Messenger.h" - -#include "messages/MPing.h" -#include "messages/MPingAck.h" -#include "messages/MGenericMessage.h" -#include "messages/MMonCommand.h" -#include "messages/MMonCommandAck.h" - -#include "messages/MMonPaxos.h" - -#include "common/Timer.h" -#include "common/Clock.h" - -#include "OSDMonitor.h" -#include "MDSMonitor.h" -#include "ClientMonitor.h" - -#include "config.h" -#undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " - - - -void Monitor::init() -{ - lock.Lock(); - - dout(1) << "init" << endl; - - // store - char s[80]; - sprintf(s, "mondata/mon%d", whoami); - store = new MonitorStore(s); - - if (g_conf.mkfs) - store->mkfs(); - - store->mount(); - - // create - osdmon = new OSDMonitor(this, messenger, lock); - mdsmon = new MDSMonitor(this, messenger, lock); - clientmon = new ClientMonitor(this, messenger, lock); - - // i'm ready! - messenger->set_dispatcher(this); - - // start ticker - reset_tick(); - - // call election? - if (monmap->num_mon > 1) { - assert(monmap->num_mon != 2); - call_election(); - } else { - // we're standalone. - set q; - q.insert(whoami); - win_election(q); - } - - lock.Unlock(); -} - -void Monitor::shutdown() -{ - dout(1) << "shutdown" << endl; - - // cancel all events - cancel_tick(); - timer.cancel_all(); - timer.join(); - - // stop osds. - for (set::iterator it = osdmon->osdmap.get_osds().begin(); - it != osdmon->osdmap.get_osds().end(); - it++) { - if (osdmon->osdmap.is_down(*it)) continue; - dout(10) << "sending shutdown to osd" << *it << endl; - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - osdmon->osdmap.get_inst(*it)); - } - osdmon->mark_all_down(); - - // monitors too. - for (int i=0; inum_mon; i++) - if (i != whoami) - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - monmap->get_inst(i)); - - // unmount my local storage - if (store) - delete store; - - // clean up - if (monmap) delete monmap; - if (osdmon) delete osdmon; - if (mdsmon) delete mdsmon; - if (clientmon) delete clientmon; - - // die. - messenger->shutdown(); - delete messenger; -} - - -void Monitor::call_election() -{ - if (monmap->num_mon == 1) return; - - dout(10) << "call_election" << endl; - state = STATE_STARTING; - - elector.start(); - - osdmon->election_starting(); - //mdsmon->election_starting(); -} - -void Monitor::win_election(set& active) -{ - state = STATE_LEADER; - leader = whoami; - quorum = active; - dout(10) << "win_election, quorum is " << quorum << endl; - - // init - osdmon->election_finished(); - mdsmon->election_finished(); - - // init paxos - test_paxos.leader_start(); -} - -void Monitor::lose_election(int l) -{ - state = STATE_PEON; - leader = l; - dout(10) << "lose_election, leader is mon" << leader << endl; -} - - -void Monitor::handle_command(MMonCommand *m) -{ - dout(0) << "handle_command " << *m << endl; - - int r = -1; - string rs = "unrecognized command"; - - if (!m->cmd.empty()) { - if (m->cmd[0] == "stop") { - r = 0; - rs = "stopping"; - do_stop(); - } - else if (m->cmd[0] == "mds") { - mdsmon->handle_command(m, r, rs); - } - else if (m->cmd[0] == "osd") { - - } - } - - // reply - messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); - delete m; -} - - -void Monitor::do_stop() -{ - dout(0) << "do_stop -- shutting down" << endl; - mdsmon->do_stop(); -} - - -void Monitor::dispatch(Message *m) -{ - lock.Lock(); - { - switch (m->get_type()) { - - // misc - case MSG_PING_ACK: - handle_ping_ack((MPingAck*)m); - break; - - case MSG_SHUTDOWN: - assert(m->get_source().is_osd()); - osdmon->dispatch(m); - break; + // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + // vim: ts=8 sw=2 smarttab + /* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + // TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer + + #include "Monitor.h" + + #include "osd/OSDMap.h" + + #include "MonitorStore.h" + + #include "msg/Message.h" + #include "msg/Messenger.h" + + #include "messages/MPing.h" + #include "messages/MPingAck.h" + #include "messages/MGenericMessage.h" + #include "messages/MMonCommand.h" + #include "messages/MMonCommandAck.h" + + #include "messages/MMonPaxos.h" + + #include "common/Timer.h" + #include "common/Clock.h" + + #include "OSDMonitor.h" + #include "MDSMonitor.h" + #include "ClientMonitor.h" + + #include "config.h" + #undef dout + #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " + #define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " + + + + void Monitor::init() + { + lock.Lock(); + + dout(1) << "init" << endl; + + // store + char s[80]; + sprintf(s, "mondata/mon%d", whoami); + store = new MonitorStore(s); + + if (g_conf.mkfs) + store->mkfs(); + + store->mount(); + + // create + osdmon = new OSDMonitor(this, &paxos_osdmap); + mdsmon = new MDSMonitor(this, &paxos_mdsmap); + clientmon = new ClientMonitor(this, &paxos_clientmap); + + // init paxos + paxos_test.init(); + paxos_osdmap.init(); + paxos_mdsmap.init(); + paxos_clientmap.init(); + + // i'm ready! + messenger->set_dispatcher(this); + + // start ticker + reset_tick(); + + // call election? + if (monmap->num_mon > 1) { + assert(monmap->num_mon != 2); + call_election(); + } else { + // we're standalone. + set q; + q.insert(whoami); + win_election(1, q); + } + + lock.Unlock(); + } + + void Monitor::shutdown() + { + dout(1) << "shutdown" << endl; + + elector.shutdown(); + + if (is_leader()) { + // stop osds. + for (set::iterator it = osdmon->osdmap.get_osds().begin(); + it != osdmon->osdmap.get_osds().end(); + it++) { + if (osdmon->osdmap.is_down(*it)) continue; + dout(10) << "sending shutdown to osd" << *it << endl; + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + osdmon->osdmap.get_inst(*it)); + } + osdmon->mark_all_down(); + + // monitors too. + for (int i=0; inum_mon; i++) + if (i != whoami) + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + monmap->get_inst(i)); + } + + // cancel all events + cancel_tick(); + timer.cancel_all(); + timer.join(); + + // unmount my local storage + if (store) + delete store; + + // clean up + if (osdmon) delete osdmon; + if (mdsmon) delete mdsmon; + if (clientmon) delete clientmon; + + // die. + messenger->shutdown(); + delete messenger; + } + + + void Monitor::call_election() + { + if (monmap->num_mon == 1) return; + + dout(10) << "call_election" << endl; + state = STATE_STARTING; + + // tell paxos + paxos_test.election_starting(); + paxos_mdsmap.election_starting(); + paxos_osdmap.election_starting(); + paxos_clientmap.election_starting(); + + // call a new election + elector.call_election(); + } + + void Monitor::win_election(epoch_t epoch, set& active) + { + state = STATE_LEADER; + leader = whoami; + mon_epoch = epoch; + quorum = active; + dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl; + + // init paxos + paxos_test.leader_init(); + paxos_mdsmap.leader_init(); + paxos_osdmap.leader_init(); + paxos_clientmap.leader_init(); + + // init + osdmon->election_finished(); + mdsmon->election_finished(); + clientmon->election_finished(); + } + + void Monitor::lose_election(epoch_t epoch, int l) + { + state = STATE_PEON; + mon_epoch = epoch; + leader = l; + dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl; + + // init paxos + paxos_test.peon_init(); + paxos_mdsmap.peon_init(); + paxos_osdmap.peon_init(); + paxos_clientmap.peon_init(); + + // init + osdmon->election_finished(); + mdsmon->election_finished(); + clientmon->election_finished(); + } + + + void Monitor::handle_command(MMonCommand *m) + { + dout(0) << "handle_command " << *m << endl; + + int r = -1; + string rs = "unrecognized command"; + + if (!m->cmd.empty()) { + if (m->cmd[0] == "stop") { + r = 0; + rs = "stopping"; + do_stop(); + } + else if (m->cmd[0] == "mds") { + mdsmon->dispatch(m); + return; + } + else if (m->cmd[0] == "osd") { + + } + } + + // reply + messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); + delete m; + } + + + void Monitor::do_stop() + { + dout(0) << "do_stop -- shutting down" << endl; + mdsmon->do_stop(); + } + + + void Monitor::dispatch(Message *m) + { + lock.Lock(); + { + switch (m->get_type()) { + + // misc + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; + + case MSG_SHUTDOWN: + if (m->get_source().is_osd()) + osdmon->dispatch(m); + else + handle_shutdown(m); + break; case MSG_MON_COMMAND: handle_command((MMonCommand*)m); @@ -233,12 +266,6 @@ void Monitor::dispatch(Message *m) case MSG_MDS_BEACON: case MSG_MDS_GETMAP: mdsmon->dispatch(m); - - // hackish: did all mds's shut down? - if (g_conf.mon_stop_with_last_mds && - mdsmon->mdsmap.get_num_up_or_failed_mds() == 0) - shutdown(); - break; // clients @@ -250,23 +277,39 @@ void Monitor::dispatch(Message *m) // paxos case MSG_MON_PAXOS: - // send it to the right paxos instance - switch (((MMonPaxos*)m)->machine_id) { - case PAXOS_TEST: - test_paxos.dispatch(m); - break; - case PAXOS_OSDMAP: - //... - - default: - assert(0); + { + MMonPaxos *pm = (MMonPaxos*)m; + + // sanitize + if (pm->epoch > mon_epoch) + call_election(); + if (pm->epoch != mon_epoch) { + delete pm; + break; + } + + // send it to the right paxos instance + switch (pm->machine_id) { + case PAXOS_TEST: + paxos_test.dispatch(m); + break; + case PAXOS_OSDMAP: + paxos_osdmap.dispatch(m); + break; + case PAXOS_MDSMAP: + paxos_mdsmap.dispatch(m); + break; + case PAXOS_CLIENTMAP: + paxos_clientmap.dispatch(m); + break; + default: + assert(0); + } } break; // elector messages - case MSG_MON_ELECTION_PROPOSE: - case MSG_MON_ELECTION_ACK: - case MSG_MON_ELECTION_VICTORY: + case MSG_MON_ELECTION: elector.dispatch(m); break; @@ -282,9 +325,13 @@ void Monitor::dispatch(Message *m) void Monitor::handle_shutdown(Message *m) { - dout(1) << "shutdown from " << m->get_source() << endl; - - shutdown(); + assert(m->get_source().is_mon()); + if (m->get_source().num() == get_leader()) { + dout(1) << "shutdown from leader " << m->get_source() << endl; + shutdown(); + } else { + dout(1) << "ignoring shutdown from non-leader " << m->get_source() << endl; + } delete m; } diff --git a/branches/sage/pgs/mon/Monitor.h b/branches/sage/pgs/mon/Monitor.h index 526f63ab55fae..015e5797ca6df 100644 --- a/branches/sage/pgs/mon/Monitor.h +++ b/branches/sage/pgs/mon/Monitor.h @@ -31,13 +31,9 @@ class OSDMonitor; class MDSMonitor; class ClientMonitor; -#define PAXOS_TEST 0 -#define PAXOS_OSDMAP 1 -#define PAXOS_MDSMAP 2 -#define PAXOS_CLIENTMAP 3 class Monitor : public Dispatcher { -protected: +public: // me int whoami; Messenger *messenger; @@ -52,63 +48,65 @@ protected: void reset_tick(); friend class C_Mon_Tick; - // my local store - //ObjectStore *store; + // -- local storage -- +public: MonitorStore *store; - const static int INO_ELECTOR = 1; - const static int INO_MON_MAP = 2; - const static int INO_OSD_MAP = 10; - const static int INO_OSD_INC_MAP = 11; - const static int INO_MDS_MAP = 20; - - // elector - Elector elector; - friend class Elector; - - epoch_t mon_epoch; // monitor epoch (election instance) - set quorum; // current active set of monitors (if !starting) - - //void call_election(); - - // paxos - Paxos test_paxos; - friend class Paxos; - - - // monitor state + // -- monitor state -- +private: const static int STATE_STARTING = 0; // electing const static int STATE_LEADER = 1; const static int STATE_PEON = 2; int state; - int leader; // current leader (to best of knowledge) - utime_t last_called_election; // [starting] last time i called an election - +public: bool is_starting() { return state == STATE_STARTING; } bool is_leader() { return state == STATE_LEADER; } bool is_peon() { return state == STATE_PEON; } - // my public services + + // -- elector -- +private: + Elector elector; + friend class Elector; + + epoch_t mon_epoch; // monitor epoch (election instance) + int leader; // current leader (to best of knowledge) + set quorum; // current active set of monitors (if !starting) + utime_t last_called_election; // [starting] last time i called an election + +public: + epoch_t get_epoch() { return mon_epoch; } + int get_leader() { return leader; } + const set& get_quorum() { return quorum; } + + void call_election(); // initiate election + void win_election(epoch_t epoch, set& q); // end election (called by Elector) + void lose_election(epoch_t epoch, int l); // end election (called by Elector) + + + // -- paxos -- + Paxos paxos_test; + Paxos paxos_mdsmap; + Paxos paxos_osdmap; + Paxos paxos_clientmap; + friend class Paxos; + + + // -- services -- OSDMonitor *osdmon; MDSMonitor *mdsmon; ClientMonitor *clientmon; - // messages - void handle_shutdown(Message *m); - void handle_ping_ack(class MPingAck *m); - void handle_command(class MMonCommand *m); - friend class OSDMonitor; friend class MDSMonitor; friend class ClientMonitor; - // initiate election - void call_election(); - // end election (called by Elector) - void win_election(set& q); - void lose_election(int l); + // messages + void handle_shutdown(Message *m); + void handle_ping_ack(class MPingAck *m); + void handle_command(class MMonCommand *m); @@ -119,18 +117,22 @@ protected: monmap(mm), timer(lock), tick_timer(0), store(0), + + state(STATE_STARTING), + elector(this, w), mon_epoch(0), + leader(0), - test_paxos(this, w, PAXOS_TEST, "tester"), // tester state machine + paxos_test(this, w, PAXOS_TEST), + paxos_mdsmap(this, w, PAXOS_MDSMAP), + paxos_osdmap(this, w, PAXOS_OSDMAP), + paxos_clientmap(this, w, PAXOS_CLIENTMAP), - state(STATE_STARTING), - leader(0), osdmon(0), mdsmon(0), clientmon(0) { } - void init(); void shutdown(); void dispatch(Message *m); diff --git a/branches/sage/pgs/mon/MonitorStore.cc b/branches/sage/pgs/mon/MonitorStore.cc index cbbfba0892898..d260dfd7604e4 100644 --- a/branches/sage/pgs/mon/MonitorStore.cc +++ b/branches/sage/pgs/mon/MonitorStore.cc @@ -134,6 +134,7 @@ bool MonitorStore::exists_bl_ss(const char *a, const char *b) struct stat st; int r = ::stat(fn, &st); + //dout(15) << "exists_bl stat " << fn << " r=" << r << " errno " << errno << " " << strerror(errno) << endl; return r == 0; } diff --git a/branches/sage/pgs/mon/OSDMonitor.cc b/branches/sage/pgs/mon/OSDMonitor.cc index c29e15a2ad3a2..ae9c3ae621c15 100644 --- a/branches/sage/pgs/mon/OSDMonitor.cc +++ b/branches/sage/pgs/mon/OSDMonitor.cc @@ -36,10 +36,12 @@ #include "config.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(" << (state == STATE_INIT ? (const char*)"init":(state == STATE_SYNC ? (const char*)"sync":(state == STATE_LOCK ? (const char*)"lock":(state == STATE_UPDATING ? (const char*)"updating":(const char*)"?\?")))) << ") e" << osdmap.get_epoch() << " " -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(" << (state == STATE_INIT ? (const char*)"init":(state == STATE_SYNC ? (const char*)"sync":(state == STATE_LOCK ? (const char*)"lock":(state == STATE_UPDATING ? (const char*)"updating":(const char*)"?\?")))) << ") e" << osdmap.get_epoch() << " " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(e" << osdmap.get_epoch() << ") " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(e" << osdmap.get_epoch() << ") " +// FAKING + class C_Mon_FakeOSDFailure : public Context { OSDMonitor *mon; int osd; @@ -51,16 +53,30 @@ public: } }; +void OSDMonitor::fake_osd_failure(int osd, bool down) +{ + if (down) { + dout(1) << "fake_osd_failure DOWN osd" << osd << endl; + pending_inc.new_down[osd] = osdmap.osd_inst[osd]; + } else { + dout(1) << "fake_osd_failure OUT osd" << osd << endl; + pending_inc.new_out.push_back(osd); + } + propose_pending(); + + // fixme + //bcast_latest_osd(); + //bcast_latest_mds(); +} void OSDMonitor::fake_osdmap_update() { dout(1) << "fake_osdmap_update" << endl; - accept_pending(); + propose_pending(); // tell a random osd int osd = rand() % g_conf.num_osd; - send_incremental(osdmap.get_epoch()-1, // ick! FIXME - osdmap.get_inst(osd)); + send_latest(0, osdmap.get_inst(osd)); } @@ -76,64 +92,36 @@ void OSDMonitor::fake_reorg() pending_inc.new_out.push_back(r); } - accept_pending(); - - // tell him! - send_incremental(osdmap.get_epoch()-1, osdmap.get_inst(r)); - - // do it again? - /* - if (g_conf.num_osd - d > 4 && - g_conf.num_osd - d > g_conf.num_osd/2) - mon->timer.add_event_after(g_conf.fake_osdmap_expand, - new C_Mon_Faker(this)); - */ + propose_pending(); + send_latest(0, osdmap.get_inst(r)); // after } -/* -void OSDMonitor::init() -{ - // start with blank map - - // load my last state from the store - bufferlist bl; - if (get_map_bl(0, bl)) { // FIXME - // yay! - osdmap.decode(bl); - dout(1) << "init got epoch " << osdmap.get_epoch() << " from store" << endl; - - // set up pending_inc - pending_inc.epoch = osdmap.get_epoch()+1; - } -} -*/ - - - - /************ MAPS ****************/ - void OSDMonitor::create_initial() { - dout(1) << "create_initial generating osdmap from g_conf" << endl; + assert(mon->is_leader()); + assert(paxos->get_version() == 0); + + dout(1) << "create_initial -- creating initial osdmap from g_conf" << endl; // - osdmap.mon_epoch = mon->mon_epoch; - osdmap.ctime = g_clock.now(); + OSDMap newmap; + newmap.mon_epoch = mon->mon_epoch; + newmap.ctime = g_clock.now(); if (g_conf.osd_pg_bits) { - osdmap.set_pg_num(1 << g_conf.osd_pg_bits); + newmap.set_pg_num(1 << g_conf.osd_pg_bits); } else { // 4 bits of pgs per osd. - osdmap.set_pg_num(g_conf.num_osd << 4); + newmap.set_pg_num(g_conf.num_osd << 4); } - // start at epoch 0 until all osds boot - //osdmap.inc_epoch(); // = 1 - //assert(osdmap.get_epoch() == 1); + // start at epoch 1 until all osds boot + newmap.inc_epoch(); // = 1 + assert(newmap.get_epoch() == 1); if (g_conf.num_osd >= 12) { int ndom = g_conf.osd_max_rep; @@ -141,7 +129,7 @@ void OSDMonitor::create_initial() int domid[ndom]; for (int i=0; iadd_item(i, 1.0); //cerr << "osd" << i << " in domain " << dom << endl; i++; @@ -164,42 +153,43 @@ void OSDMonitor::create_initial() //cerr << "dom " << i << " w " << domain[i]->get_weight() << endl; root->add_item(domid[i], domain[i]->get_weight()); } - int nroot = osdmap.crush.add_bucket(root); + int nroot = newmap.crush.add_bucket(root); // rules // replication for (int i=1; i<=ndom; i++) { int r = CRUSH_REP_RULE(i); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } // raid for (int i=g_conf.osd_min_raid_width; i <= g_conf.osd_max_raid_width; i++) { int r = CRUSH_RAID_RULE(i); if (ndom >= i) { - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 1)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, 1, 0)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 1)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, 1, 0)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } else { - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } } // test //vector out; - //osdmap.pg_to_osds(0x40200000110ULL, out); + //newmap.pg_to_osds(0x40200000110ULL, out); } else { // one bucket Bucket *b = new UniformBucket(1, 0); - int root = osdmap.crush.add_bucket(b); + int root = newmap.crush.add_bucket(b); for (int i=0; iadd_item(i, 1.0); } @@ -207,23 +197,25 @@ void OSDMonitor::create_initial() // replication for (int i=1; i<=g_conf.osd_max_rep; i++) { int r = CRUSH_REP_RULE(i); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } // raid for (int i=g_conf.osd_min_raid_width; i <= g_conf.osd_max_raid_width; i++) { int r = CRUSH_RAID_RULE(i); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); - osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); + newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } } if (g_conf.mds_local_osd) { // add mds osds, but don't put them in the crush mapping func - for (int i=0; i @@ -241,215 +233,321 @@ void OSDMonitor::create_initial() dout(0) << "will fake osd" << i->first << " OUT after " << i->second << endl; mon->timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 0)); } -} - -bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl) -{ - if (!mon->store->exists_bl_sn("osdmap", epoch)) - return false; - int r = mon->store->get_bl_sn(bl, "osdmap", epoch); - assert(r > 0); - return true; + // encode into pending incremental + newmap.encode(pending_inc.fullmap); } -bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl) +bool OSDMonitor::update_from_paxos() { - if (!mon->store->exists_bl_sn("osdincmap", epoch)) - return false; - int r = mon->store->get_bl_sn(bl, "osdincmap", epoch); - assert(r > 0); - return true; -} + assert(paxos->is_active()); + version_t paxosv = paxos->get_version(); + dout(15) << "update_from_paxos paxos e " << paxosv + << ", my e " << osdmap.epoch << endl; -void OSDMonitor::save_map() -{ - bufferlist bl; - osdmap.encode(bl); + if (paxosv == osdmap.epoch) return true; + assert(paxosv >= osdmap.epoch); - mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch()); - mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); + if (osdmap.epoch == 0 && paxosv > 1) { + // startup: just load latest full map + epoch_t lastfull = mon->store->get_int("osdmap_full","last_epoch"); + if (lastfull) { + dout(7) << "update_from_paxos startup: loading latest full map e" << lastfull << endl; + bufferlist bl; + mon->store->get_bl_sn(bl, "osdmap_full", lastfull); + osdmap.decode(bl); + } + } + + // walk through incrementals + while (paxosv > osdmap.epoch) { + bufferlist bl; + bool success = paxos->read(osdmap.epoch+1, bl); + assert(success); + + dout(7) << "update_from_paxos applying incremental " << osdmap.epoch+1 << endl; + OSDMap::Incremental inc; + int off = 0; + inc.decode(bl, off); + osdmap.apply_incremental(inc); + + // write out the full map, too. + bl.clear(); + osdmap.encode(bl); + mon->store->put_bl_sn(bl, "osdmap_full", osdmap.epoch); + } + mon->store->put_int(osdmap.epoch, "osdmap_full","last_epoch"); + + // new map! + bcast_latest_mds(); + + return true; } -void OSDMonitor::save_inc_map(OSDMap::Incremental &inc) + +void OSDMonitor::create_pending() { - bufferlist bl; - osdmap.encode(bl); + pending_inc = OSDMap::Incremental(osdmap.epoch+1); + dout(10) << "create_pending e " << pending_inc.epoch + << endl; +} - bufferlist incbl; - inc.encode(incbl); +void OSDMonitor::encode_pending(bufferlist &bl) +{ + dout(10) << "encode_pending e " << pending_inc.epoch + << endl; + + // finish up pending_inc + pending_inc.ctime = g_clock.now(); + pending_inc.mon_epoch = mon->mon_epoch; + + // tell me about it + for (map::iterator i = pending_inc.new_up.begin(); + i != pending_inc.new_up.end(); + i++) { + dout(0) << " osd" << i->first << " UP " << i->second << endl; + derr(0) << " osd" << i->first << " UP " << i->second << endl; + } + for (map::iterator i = pending_inc.new_down.begin(); + i != pending_inc.new_down.end(); + i++) { + dout(0) << " osd" << i->first << " DOWN " << i->second << endl; + derr(0) << " osd" << i->first << " DOWN " << i->second << endl; + mon->messenger->mark_down(i->second.addr); + } + for (list::iterator i = pending_inc.new_in.begin(); + i != pending_inc.new_in.end(); + i++) { + dout(0) << " osd" << *i << " IN" << endl; + derr(0) << " osd" << *i << " IN" << endl; + } + for (list::iterator i = pending_inc.new_out.begin(); + i != pending_inc.new_out.end(); + i++) { + dout(0) << " osd" << *i << " OUT" << endl; + derr(0) << " osd" << *i << " OUT" << endl; + } - mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch()); - mon->store->put_bl_sn(incbl, "osdincmap", osdmap.get_epoch()); - mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); + // encode + assert(paxos->get_version() + 1 == pending_inc.epoch); + pending_inc.encode(bl); } +// ------------- -void OSDMonitor::dispatch(Message *m) +bool OSDMonitor::preprocess_query(Message *m) { + dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl; + switch (m->get_type()) { - - // services + // READs case MSG_OSD_GETMAP: handle_osd_getmap((MOSDGetMap*)m); - break; + return true; + + // damp updates case MSG_OSD_FAILURE: - handle_osd_failure((MOSDFailure*)m); - break; + return preprocess_failure((MOSDFailure*)m); case MSG_OSD_BOOT: - handle_osd_boot((MOSDBoot*)m); - break; + return preprocess_boot((MOSDBoot*)m); + /* case MSG_OSD_IN: - handle_osd_in((MOSDIn*)m); - break; + return preprocess_in((MOSDIn*)m); case MSG_OSD_OUT: - handle_osd_out((MOSDOut*)m); - break; - - // replication - case MSG_MON_OSDMAP_INFO: - handle_info((MMonOSDMapInfo*)m); - break; - case MSG_MON_OSDMAP_LEASE: - handle_lease((MMonOSDMapLease*)m); - break; - case MSG_MON_OSDMAP_LEASE_ACK: - handle_lease_ack((MMonOSDMapLeaseAck*)m); - break; - case MSG_MON_OSDMAP_UPDATE_PREPARE: - handle_update_prepare((MMonOSDMapUpdatePrepare*)m); - break; - case MSG_MON_OSDMAP_UPDATE_ACK: - handle_update_ack((MMonOSDMapUpdateAck*)m); - break; - case MSG_MON_OSDMAP_UPDATE_COMMIT: - handle_update_commit((MMonOSDMapUpdateCommit*)m); - break; + return preprocess_out((MOSDOut*)m); + */ default: assert(0); + delete m; + return true; + } +} + +bool OSDMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl; + + switch (m->get_type()) { + // damp updates + case MSG_OSD_FAILURE: + return prepare_failure((MOSDFailure*)m); + case MSG_OSD_BOOT: + return prepare_boot((MOSDBoot*)m); + + /* + case MSG_OSD_IN: + return prepare_in((MOSDIn*)m); + case MSG_OSD_OUT: + return prepare_out((MOSDOut*)m); + */ + + default: + assert(0); + delete m; } + + return false; +} + +bool OSDMonitor::should_propose_now() +{ + // don't propose initial map until _all_ osds boot. + //dout(10) << "should_propose_now " << pending_inc.new_up.size() << " vs " << osdmap.get_osds().size() << endl; + if (osdmap.epoch == 1 && + pending_inc.new_up.size() < osdmap.get_osds().size()) + return false; // not all up (yet) + + // FIXME do somethihng smart here. + return true; } -void OSDMonitor::handle_osd_failure(MOSDFailure *m) +// --------------------------- +// READs + +void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) { - dout(1) << "osd failure: " << m->get_failed() << " from " << m->get_source() << endl; + dout(7) << "osd_getmap from " << m->get_source() << " since " << m->get_since() << endl; - // FIXME - // take their word for it - int from = m->get_failed().name.num(); - if (osdmap.is_up(from) && - (osdmap.osd_inst.count(from) == 0 || - osdmap.osd_inst[from] == m->get_failed())) { - pending_inc.new_down[from] = m->get_failed(); - - if (osdmap.is_in(from)) - down_pending_out[from] = g_clock.now(); - - //awaiting_maps[pending_inc.epoch][m->get_source()] = - - accept_pending(); - - send_incremental(m->get_epoch(), m->get_source_inst()); - - send_waiting(); - bcast_latest_mds(); - } + //if (m->get_since()) + send_incremental(m->get_since(), m->get_source_inst()); + //else + //send_full(m->get_source_inst()); delete m; } -void OSDMonitor::fake_osd_failure(int osd, bool down) + +// --------------------------- +// UPDATEs + +// failure -- + +bool OSDMonitor::preprocess_failure(MOSDFailure *m) { - if (down) { - dout(1) << "fake_osd_failure DOWN osd" << osd << endl; - pending_inc.new_down[osd] = osdmap.osd_inst[osd]; - } else { - dout(1) << "fake_osd_failure OUT osd" << osd << endl; - pending_inc.new_out.push_back(osd); + int badboy = m->get_failed().name.num(); + + // weird? + if (!osdmap.have_inst(badboy)) { + dout(5) << "preprocess_failure dne(/dup?): " << m->get_failed() << ", from " << m->get_from() << endl; + send_incremental(m->get_epoch(), m->get_from()); + return true; } - accept_pending(); - bcast_latest_osd(); - bcast_latest_mds(); + if (osdmap.get_inst(badboy) != m->get_failed()) { + dout(5) << "preprocess_failure wrong osd: report " << m->get_failed() << " != map's " << osdmap.get_inst(badboy) + << ", from " << m->get_from() << endl; + send_incremental(m->get_epoch(), m->get_from()); + return true; + } + // already reported? + if (osdmap.is_down(badboy)) { + dout(5) << "preprocess_failure dup: " << m->get_failed() << ", from " << m->get_from() << endl; + send_incremental(m->get_epoch(), m->get_from()); + return true; + } + + dout(10) << "preprocess_failure new: " << m->get_failed() << ", from " << m->get_from() << endl; + return false; } -void OSDMonitor::mark_all_down() +bool OSDMonitor::prepare_failure(MOSDFailure *m) { - dout(7) << "mark_all_down" << endl; + dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_from() << endl; + + // FIXME + // take their word for it + int badboy = m->get_failed().name.num(); + assert(osdmap.is_up(badboy)); + assert(osdmap.osd_inst[badboy] == m->get_failed()); + + pending_inc.new_down[badboy] = m->get_failed(); + + if (osdmap.is_in(badboy)) + down_pending_out[badboy] = g_clock.now(); - for (set::iterator it = osdmap.get_osds().begin(); - it != osdmap.get_osds().end(); - it++) { - if (osdmap.is_down(*it)) continue; - pending_inc.new_down[*it] = osdmap.get_inst(*it); - } - accept_pending(); + paxos->wait_for_commit(new C_Reported(this, m)); + + return true; } +void OSDMonitor::_reported_failure(MOSDFailure *m) +{ + dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_from() << endl; + send_latest(m->get_epoch(), m->get_from()); +} +// boot -- -void OSDMonitor::handle_osd_boot(MOSDBoot *m) +bool OSDMonitor::preprocess_boot(MOSDBoot *m) { - dout(7) << "osd_boot from " << m->get_source() << endl; - assert(m->get_source().is_osd()); - int from = m->get_source().num(); + assert(m->inst.name.is_osd()); + int from = m->inst.name.num(); - if (osdmap.get_epoch() == 0) { - // waiting for boot! - osdmap.osd_inst[from] = m->get_source_inst(); - - if (osdmap.osd_inst.size() == osdmap.osds.size()) { - dout(-7) << "osd_boot all osds booted." << endl; - osdmap.inc_epoch(); - - save_map(); - - pending_inc.epoch = osdmap.get_epoch()+1; // 2 - - bcast_latest_osd(); - bcast_latest_mds(); - send_waiting(); - } else { - dout(7) << "osd_boot waiting for " - << (osdmap.osds.size() - osdmap.osd_inst.size()) - << " osds to boot" << endl; - } + // already booted? + if (osdmap.is_up(from) && + osdmap.get_inst(from) == m->inst) { + // yup. + dout(7) << "preprocess_boot dup from " << m->inst << endl; + _booted(m); + return true; + } + + dout(10) << "preprocess_boot from " << m->inst << endl; + return false; +} +bool OSDMonitor::prepare_boot(MOSDBoot *m) +{ + dout(7) << "prepare_boot from " << m->inst << endl; + assert(m->inst.name.is_osd()); + int from = m->inst.name.num(); + + // does this osd exist? + if (!osdmap.exists(from)) { + dout(1) << "boot from non-existent osd" << from << endl; delete m; - return; + return true; } - + // already up? mark down first? if (osdmap.is_up(from)) { + assert(osdmap.get_inst(from) != m->inst); // preproces should have caught it + + // mark previous guy down pending_inc.new_down[from] = osdmap.osd_inst[from]; - accept_pending(); } - // mark up. - down_pending_out.erase(from); - assert(osdmap.is_down(from)); - pending_inc.new_up[from] = m->get_source_inst(); + // mark new guy up. + down_pending_out.erase(from); // if any + pending_inc.new_up[from] = m->inst; // mark in? if (osdmap.out_osds.count(from)) pending_inc.new_in.push_back(from); - accept_pending(); - - // the booting osd will spread word - send_incremental(m->sb.current_epoch, m->get_source_inst()); - delete m; + // wait + paxos->wait_for_commit(new C_Booted(this, m)); - // tell mds - bcast_latest_mds(); + return true; +} + +void OSDMonitor::_booted(MOSDBoot *m) +{ + dout(7) << "_booted " << m->inst << endl; + send_latest(m->sb.current_epoch, m->inst); + delete m; } + +// in -- + +/* void OSDMonitor::handle_osd_in(MOSDIn *m) { dout(7) << "osd_in from " << m->get_source() << endl; @@ -471,74 +569,16 @@ void OSDMonitor::handle_osd_out(MOSDOut *m) send_incremental(m->map_epoch, m->get_source_inst()); } } - -void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) -{ - dout(7) << "osd_getmap from " << m->get_source() << " since " << m->get_since() << endl; - - if (osdmap.get_epoch() == 0) { - awaiting_map[m->get_source()].first = m->get_source_inst(); - awaiting_map[m->get_source()].second = m->get_since(); - } else { - //if (m->get_since()) - send_incremental(m->get_since(), m->get_source_inst()); - //else - //send_full(m->get_source(), m->get_source_inst()); - } - delete m; -} - +*/ -void OSDMonitor::accept_pending() -{ - dout(-10) << "accept_pending " << osdmap.get_epoch() << " -> " << pending_inc.epoch << endl; - // accept pending into a new map! - pending_inc.ctime = g_clock.now(); - pending_inc.mon_epoch = mon->mon_epoch; +// --------------- +// map helpers - // advance! - osdmap.apply_incremental(pending_inc); - - // save it. - save_inc_map( pending_inc ); - - // tell me about it - for (map::iterator i = pending_inc.new_up.begin(); - i != pending_inc.new_up.end(); - i++) { - dout(0) << "osd" << i->first << " UP " << i->second << endl; - derr(0) << "osd" << i->first << " UP " << i->second << endl; - } - for (map::iterator i = pending_inc.new_down.begin(); - i != pending_inc.new_down.end(); - i++) { - dout(0) << "osd" << i->first << " DOWN " << i->second << endl; - derr(0) << "osd" << i->first << " DOWN " << i->second << endl; - messenger->mark_down(i->second.addr); - } - for (list::iterator i = pending_inc.new_in.begin(); - i != pending_inc.new_in.end(); - i++) { - dout(0) << "osd" << *i << " IN" << endl; - derr(0) << "osd" << *i << " IN" << endl; - } - for (list::iterator i = pending_inc.new_out.begin(); - i != pending_inc.new_out.end(); - i++) { - dout(0) << "osd" << *i << " OUT" << endl; - derr(0) << "osd" << *i << " OUT" << endl; - } - - // clear new pending - OSDMap::Incremental next(osdmap.get_epoch() + 1); - pending_inc = next; -} - -void OSDMonitor::send_waiting() +void OSDMonitor::send_to_waiting() { - dout(10) << "send_waiting " << osdmap.get_epoch() << endl; + dout(10) << "send_to_waiting " << osdmap.get_epoch() << endl; for (map >::iterator i = awaiting_map.begin(); i != awaiting_map.end(); @@ -550,26 +590,31 @@ void OSDMonitor::send_waiting() } } - -void OSDMonitor::send_latest(entity_inst_t who) +void OSDMonitor::send_latest(epoch_t since, entity_inst_t who) { - // FIXME this is super naive - if (osdmap.get_epoch() == 0) { - awaiting_map[who.name].first = who; - awaiting_map[who.name].second = 0; + if (paxos->is_readable()) { + dout(5) << "send_latest to " << who << " now" << endl; + if (since) + send_incremental(since, who); + else + send_full(who); } else { - send_full(who); + dout(5) << "send_latest to " << who << " later" << endl; + awaiting_map[who.name].first = who; + awaiting_map[who.name].second = since; } } + void OSDMonitor::send_full(entity_inst_t who) { - messenger->send_message(new MOSDMap(&osdmap), who); + dout(5) << "send_full to " << who << endl; + mon->messenger->send_message(new MOSDMap(&osdmap), who); } void OSDMonitor::send_incremental(epoch_t since, entity_inst_t dest) { - dout(5) << "osd_send_incremental " << since << " -> " << osdmap.get_epoch() + dout(5) << "send_incremental " << since << " -> " << osdmap.get_epoch() << " to " << dest << endl; MOSDMap *m = new MOSDMap; @@ -578,12 +623,12 @@ void OSDMonitor::send_incremental(epoch_t since, entity_inst_t dest) e > since; e--) { bufferlist bl; - if (get_inc_map_bl(e, bl)) { - dout(10) << "osd_send_incremental inc " << e << endl; + if (mon->store->get_bl_sn(bl, "osdmap", e) > 0) { + dout(20) << "send_incremental inc " << e << endl; m->incremental_maps[e] = bl; } - else if (get_map_bl(e, bl)) { - dout(10) << "osd_send_incremental full " << e << endl; + else if (mon->store->get_bl_sn(bl, "osdmap_full", e) > 0) { + dout(20) << "send_incremental full " << e << endl; m->maps[e] = bl; } else { @@ -591,11 +636,10 @@ void OSDMonitor::send_incremental(epoch_t since, entity_inst_t dest) } } - messenger->send_message(m, dest); + mon->messenger->send_message(m, dest); } - void OSDMonitor::bcast_latest_mds() { epoch_t e = osdmap.get_epoch(); @@ -628,6 +672,24 @@ void OSDMonitor::bcast_latest_osd() } } +void OSDMonitor::bcast_full_osd() +{ + epoch_t e = osdmap.get_epoch(); + dout(1) << "bcast_full_osd epoch " << e << endl; + + // tell osds + set osds; + osdmap.get_all_osds(osds); + for (set::iterator it = osds.begin(); + it != osds.end(); + it++) { + if (osdmap.is_down(*it)) continue; + send_full(osdmap.get_inst(*it)); + } +} + + +// TICK void OSDMonitor::tick() @@ -653,281 +715,93 @@ void OSDMonitor::tick() pending_inc.new_out.push_back( *i ); } if (!mark_out.empty()) { - accept_pending(); - - // hrmpf. bcast map for now. FIXME FIXME. - bcast_latest_osd(); + propose_pending(); } } -void OSDMonitor::election_starting() -{ - dout(10) << "election_starting" << endl; -} -void OSDMonitor::election_finished() -{ - dout(10) << "election_finished" << endl; - if (mon->is_leader()) { - if (g_conf.mkfs) { - create_initial(); - save_map(); - } else { - // - epoch_t epoch = mon->store->get_int("osd_epoch"); - dout(10) << " last epoch was " << epoch << endl; - bufferlist bl, blinc; - int r = mon->store->get_bl_sn(bl, "osdmap", epoch); - assert(r>0); - osdmap.decode(bl); - // pending_inc - pending_inc.epoch = epoch+1; - } - } +/* +void OSDMonitor::init() +{ + // start with blank map - /* - state = STATE_INIT; + // load my last state from the store + bufferlist bl; + if (get_map_bl(0, bl)) { // FIXME + // yay! + osdmap.decode(bl); + dout(1) << "init got epoch " << osdmap.get_epoch() << " from store" << endl; - // map? - if (osdmap.get_epoch() == 0 && - mon->is_leader()) { - create_initial(); + // set up pending_inc + pending_inc.epoch = osdmap.get_epoch()+1; } +} +*/ - if (mon->is_leader()) { - // leader. - if (mon->monmap->num_mon == 1) { - // hmm, it's just me! - state = STATE_SYNC; - } - } - else if (mon->is_peon()) { - // peon. send info - //messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch), - // mon->monmap->get_inst(mon->leader)); - } - */ -} +void OSDMonitor::mark_all_down() +{ + assert(mon->is_leader()); + dout(7) << "mark_all_down" << endl; -void OSDMonitor::handle_info(MMonOSDMapInfo *m) -{ - dout(10) << "handle_info from " << m->get_source() - << " epoch " << m->get_epoch() << " in mon_epoch " << m->get_mon_epoch() - << endl; - - epoch_t epoch = m->get_epoch(); - - // did they have anything? - if (epoch > 0) { - // make sure it's current. - if (epoch == osdmap.get_epoch()) { - if (osdmap.mon_epoch != m->get_mon_epoch()) { - dout(10) << "handle_info had divergent epoch " << m->get_epoch() - << ", mon_epoch " << m->get_mon_epoch() << " != " << osdmap.mon_epoch << endl; - epoch--; - } - } else { - bufferlist bl; - get_map_bl(epoch, bl); - - OSDMap old; - old.decode(bl); - - if (old.mon_epoch != m->get_mon_epoch()) { - dout(10) << "handle_info had divergent epoch " << m->get_epoch() - << ", mon_epoch " << m->get_mon_epoch() << " != " << old.mon_epoch << endl; - epoch--; - } - } + for (set::iterator it = osdmap.get_osds().begin(); + it != osdmap.get_osds().end(); + it++) { + if (osdmap.is_down(*it)) continue; + pending_inc.new_down[*it] = osdmap.get_inst(*it); } - - // bring up to date - if (epoch < osdmap.get_epoch()) - send_incremental(epoch, m->get_source_inst()); - - delete m; + + propose_pending(); } -void OSDMonitor::issue_leases() -{ - dout(10) << "issue_leases" << endl; - assert(mon->is_leader()); - // set lease endpoint - lease_expire = g_clock.now(); - lease_expire += g_conf.mon_lease; - pending_ack.clear(); - - for (set::iterator i = mon->quorum.begin(); - i != mon->quorum.end(); - i++) { - if (*i == mon->whoami) continue; - messenger->send_message(new MMonOSDMapLease(osdmap.get_epoch(), lease_expire), - mon->monmap->get_inst(*i)); - pending_ack.insert(*i); - } -} -void OSDMonitor::handle_lease(MMonOSDMapLease *m) -{ - if (m->get_epoch() != osdmap.get_epoch() + 1) { - dout(10) << "map_lease from " << m->get_source() - << " on epoch " << m->get_epoch() << ", but i am " << osdmap.get_epoch() << endl; - assert(0); - delete m; - return; - } - - dout(10) << "map_lease from " << m->get_source() << " expires " << lease_expire << endl; - lease_expire = m->get_lease_expire(); - - delete m; -} -void OSDMonitor::handle_lease_ack(MMonOSDMapLeaseAck *m) -{ - // right epoch? - if (m->get_epoch() != osdmap.get_epoch()) { - dout(10) << "map_lease_ack from " << m->get_source() - << " on old epoch " << m->get_epoch() << ", dropping" << endl; - delete m; - return; - } - - // within time limit? - if (g_clock.now() >= lease_expire) { - dout(10) << "map_lease_ack from " << m->get_source() - << ", but lease expired, calling election" << endl; - mon->call_election(); - delete m; - return; - } - - assert(m->get_source().is_mon()); - int from = m->get_source().num(); - assert(pending_ack.count(from)); - pending_ack.erase(from); - if (pending_ack.empty()) { - dout(10) << "map_lease_ack from " << m->get_source() - << ", last one" << endl; - } else { - dout(10) << "map_lease_ack from " << m->get_source() - << ", still waiting on " << pending_ack << endl; - } - - delete m; -} -void OSDMonitor::update_map() -{ - // lock map - state = STATE_UPDATING; - pending_ack.clear(); - - // set lease endpoint - lease_expire += g_conf.mon_lease; - // send prepare - epoch_t epoch = osdmap.get_epoch(); - bufferlist map_bl, inc_map_bl; - if (!get_inc_map_bl(epoch, inc_map_bl)) - get_map_bl(epoch, map_bl); - for (set::iterator i = mon->quorum.begin(); - i != mon->quorum.end(); - i++) { - if (*i == mon->whoami) continue; - messenger->send_message(new MMonOSDMapUpdatePrepare(epoch, - map_bl, inc_map_bl), - mon->monmap->get_inst(*i)); - pending_ack.insert(*i); - } -} -void OSDMonitor::handle_update_prepare(MMonOSDMapUpdatePrepare *m) -{ - dout(10) << "map_update_prepare from " << m->get_source() << " epoch " << m->get_epoch() << endl; - // accept map - assert(m->get_epoch() == osdmap.get_epoch() + 1); - - if (m->inc_map_bl.length()) { - int off = 0; - pending_inc.decode(m->inc_map_bl, off); - accept_pending(); - } else { - osdmap.decode(m->map_bl); - } - - // state - state = STATE_LOCK; - //lease_expire = m->lease_expire; - - // ack - messenger->send_message(new MMonOSDMapUpdateAck(osdmap.get_epoch()), - m->get_source_inst()); - delete m; -} +/* -void OSDMonitor::handle_update_ack(MMonOSDMapUpdateAck *m) + +void OSDMonitor::election_finished() { - /* - // right epoch? - if (m->get_epoch() != osdmap.get_epoch()) { - dout(10) << "map_update_ack from " << m->get_source() - << " on old epoch " << m->get_epoch() << ", dropping" << endl; - delete m; - return; - } + dout(10) << "election_finished" << endl; - // within time limit? - if (g_clock.now() >= lease_expire) { - dout(10) << "map_update_ack from " << m->get_source() - << ", but lease expired, calling election" << endl; - state = STATE_SYNC; - mon->call_election(); - return; - } + if (mon->is_leader()) { + if (g_conf.mkfs) { + create_initial(); + save_map(); + } else { + // + epoch_t epoch = mon->store->get_int("osd_epoch"); + dout(10) << " last epoch was " << epoch << endl; + bufferlist bl, blinc; + int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); + assert(r>0); + osdmap.decode(bl); - assert(m->get_source().is_mon()); - int from = m->get_source().num(); + // pending_inc + pending_inc.epoch = epoch+1; + } - assert(pending_lease_ack.count(from)); - pending_lease_ack.erase(from); - - if (pending_lease_ack.empty()) { - dout(10) << "map_update_ack from " << m->get_source() - << ", last one" << endl; - state = STATE_SYNC; - - // send lease commit - for (map::iterator i = mon->quorum.begin(); - i != mon->quorum.end(); - i++) { - if (i == mon->whoami) continue; - messenger->send_message(new MMonOSDMapLeaseCommit(osdmap), - MSG_ADDR_MON(*i), mon->monmap->get_inst(*i)); - } - } else { - dout(10) << "map_update_ack from " << m->get_source() - << ", still waiting on " << pending_lease_ack << endl; } -*/ -} -void OSDMonitor::handle_update_commit(MMonOSDMapUpdateCommit *m) -{ } + + + +*/ diff --git a/branches/sage/pgs/mon/OSDMonitor.h b/branches/sage/pgs/mon/OSDMonitor.h index 000a79f4024bc..9927cf805f2bc 100644 --- a/branches/sage/pgs/mon/OSDMonitor.h +++ b/branches/sage/pgs/mon/OSDMonitor.h @@ -25,85 +25,96 @@ using namespace std; #include "osd/OSDMap.h" -class Monitor; +#include "PaxosService.h" -class OSDMonitor : public Dispatcher { - Monitor *mon; - Messenger *messenger; - Mutex &lock; +class Monitor; +class MOSDBoot; - // osd maps +class OSDMonitor : public PaxosService { public: OSDMap osdmap; private: map > awaiting_map; - - void create_initial(); - bool get_map_bl(epoch_t epoch, bufferlist &bl); - bool get_inc_map_bl(epoch_t epoch, bufferlist &bl); - - void save_map(); - void save_inc_map(OSDMap::Incremental &inc); // [leader] OSDMap::Incremental pending_inc; map down_pending_out; // osd down -> out - set pending_ack; + // svc + void create_initial(); + bool update_from_paxos(); + void create_pending(); // prepare a new pending + void encode_pending(bufferlist &bl); - // we are distributed - const static int STATE_INIT = 0; // startup - const static int STATE_SYNC = 1; // sync map copy (readonly) - const static int STATE_LOCK = 2; // [peon] map locked - const static int STATE_UPDATING = 3; // [leader] map locked, waiting for peon ack + void handle_query(Message *m); + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); + bool should_propose_now(); - int state; - utime_t lease_expire; // when lease expires + // ... + bool get_map_bl(epoch_t epoch, bufferlist &bl); + bool get_inc_map_bl(epoch_t epoch, bufferlist &bl); - //void init(); - - // maps - void accept_pending(); // accept pending, new map. - void send_waiting(); // send current map to waiters. + void send_to_waiting(); // send current map to waiters. void send_full(entity_inst_t dest); void send_incremental(epoch_t since, entity_inst_t dest); void bcast_latest_mds(); void bcast_latest_osd(); - - void update_map(); - - void handle_osd_boot(class MOSDBoot *m); - void handle_osd_in(class MOSDIn *m); - void handle_osd_out(class MOSDOut *m); - void handle_osd_failure(class MOSDFailure *m); + void bcast_full_osd(); + void handle_osd_getmap(class MOSDGetMap *m); - void handle_info(class MMonOSDMapInfo*); - void handle_lease(class MMonOSDMapLease*); - void handle_lease_ack(class MMonOSDMapLeaseAck*); - void handle_update_prepare(class MMonOSDMapUpdatePrepare*); - void handle_update_ack(class MMonOSDMapUpdateAck*); - void handle_update_commit(class MMonOSDMapUpdateCommit*); + bool preprocess_failure(class MOSDFailure *m); + bool prepare_failure(class MOSDFailure *m); + void _reported_failure(MOSDFailure *m); + + bool preprocess_boot(class MOSDBoot *m); + bool prepare_boot(class MOSDBoot *m); + void _booted(MOSDBoot *m); + + class C_Booted : public Context { + OSDMonitor *cmon; + MOSDBoot *m; + public: + C_Booted(OSDMonitor *cm, MOSDBoot *m_) : + cmon(cm), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_booted(m); + else + cmon->dispatch((Message*)m); + } + }; + class C_Reported : public Context { + OSDMonitor *cmon; + MOSDFailure *m; + public: + C_Reported(OSDMonitor *cm, MOSDFailure *m_) : + cmon(cm), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_reported_failure(m); + else + cmon->dispatch((Message*)m); + } + }; + + bool preprocess_in(class MOSDIn *m); + bool prepare_in(class MOSDIn *m); + + bool preprocess_out(class MOSDOut *m); + bool prepare_out(class MOSDOut *m); public: - OSDMonitor(Monitor *mn, Messenger *m, Mutex& l) : - mon(mn), messenger(m), lock(l), - state(STATE_SYNC) { - //init(); - } + OSDMonitor(Monitor *mn, Paxos *p) : + PaxosService(mn, p) { } - void dispatch(Message *m); void tick(); // check state, take actions - void election_starting(); // abort whatever. - void election_finished(); // reinitialize whatever. - - void issue_leases(); - void mark_all_down(); - void send_latest(entity_inst_t i); + void send_latest(epoch_t since, entity_inst_t i); void fake_osd_failure(int osd, bool down); void fake_osdmap_update(); diff --git a/branches/sage/pgs/mon/Paxos.cc b/branches/sage/pgs/mon/Paxos.cc index 78d3d58287bbc..ebe965752b026 100644 --- a/branches/sage/pgs/mon/Paxos.cc +++ b/branches/sage/pgs/mon/Paxos.cc @@ -20,44 +20,79 @@ #include "config.h" #undef dout -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") " -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +void Paxos::init() +{ + // load paxos variables from stable storage + last_pn = mon->store->get_int(machine_name, "last_pn"); + accepted_pn = mon->store->get_int(machine_name, "accepted_pn"); + last_committed = mon->store->get_int(machine_name, "last_committed"); + + dout(10) << "init" << endl; +} + // --------------------------------- // PHASE 1 -// proposer - +// leader void Paxos::collect(version_t oldpn) { + // we're recoverying, it seems! + state = STATE_RECOVERING; + assert(mon->is_leader()); + // reset the number of lasts received + uncommitted_v = 0; + uncommitted_pn = 0; + uncommitted_value.clear(); + + // look for uncommitted value + if (mon->store->exists_bl_sn(machine_name, last_committed+1)) { + uncommitted_v = last_committed+1; + uncommitted_pn = accepted_pn; + mon->store->get_bl_sn(uncommitted_value, machine_name, last_committed+1); + dout(10) << "learned uncommitted " << (last_committed+1) + << " (" << uncommitted_value.length() << " bytes) from myself" + << endl; + } + + // pick new pn accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn)); accepted_pn_from = last_committed; num_last = 1; - old_accepted_pn = 0; - old_accepted_value.clear(); - dout(10) << "collect with pn " << accepted_pn << endl; // send collect - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; - MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id); + MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id); collect->last_committed = last_committed; collect->pn = accepted_pn; - mon->messenger->send_message(collect, mon->monmap->get_inst(i)); + mon->messenger->send_message(collect, mon->monmap->get_inst(*p)); } + } + +// peon void Paxos::handle_collect(MMonPaxos *collect) { dout(10) << "handle_collect " << *collect << endl; + assert(mon->is_peon()); // mon epoch filter should catch strays + + // we're recoverying, it seems! + state = STATE_RECOVERING; + // reply - MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id); + MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id); last->last_committed = last_committed; // do we have an accepted but uncommitted value? @@ -66,9 +101,10 @@ void Paxos::handle_collect(MMonPaxos *collect) if (mon->store->exists_bl_sn(machine_name, last_committed+1)) { mon->store->get_bl_sn(bl, machine_name, last_committed+1); assert(bl.length() > 0); - dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl; + dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1 + << " (" << bl.length() << " bytes)" << endl; last->values[last_committed+1] = bl; - last->old_accepted_pn = accepted_pn; + last->uncommitted_pn = accepted_pn; } // can we accept this pn? @@ -77,6 +113,7 @@ void Paxos::handle_collect(MMonPaxos *collect) accepted_pn = collect->pn; accepted_pn_from = collect->pn_from; dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << endl; + mon->store->put_int(accepted_pn, machine_name, "accepted_pn"); } else { // don't accept! dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from @@ -87,13 +124,13 @@ void Paxos::handle_collect(MMonPaxos *collect) last->pn_from = accepted_pn_from; // and share whatever data we have - for (version_t v = collect->last_committed; + for (version_t v = collect->last_committed+1; v <= last_committed; v++) { if (mon->store->exists_bl_sn(machine_name, v)) { mon->store->get_bl_sn(last->values[v], machine_name, v); - dout(10) << " sharing " << v << " " - << last->values[v].length() << " bytes" << endl; + dout(10) << " sharing " << v << " (" + << last->values[v].length() << " bytes)" << endl; } } @@ -103,28 +140,36 @@ void Paxos::handle_collect(MMonPaxos *collect) } +// leader void Paxos::handle_last(MMonPaxos *last) { dout(10) << "handle_last " << *last << endl; + if (!mon->is_leader()) { + dout(10) << "not leader, dropping" << endl; + delete last; + return; + } + // share committed values? if (last->last_committed < last_committed) { // share committed values dout(10) << "sending commit to " << last->get_source() << endl; - MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id); - for (version_t v = last->last_committed; + MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); + for (version_t v = last->last_committed+1; v <= last_committed; v++) { mon->store->get_bl_sn(commit->values[v], machine_name, v); - dout(10) << "sharing " << v << " " - << commit->values[v].length() << " bytes" << endl; + dout(10) << " sharing " << v << " (" + << commit->values[v].length() << " bytes)" << endl; } + commit->last_committed = last_committed; mon->messenger->send_message(commit, last->get_source_inst()); } - // did we receive committed value? + // did we receive a committed value? if (last->last_committed > last_committed) { - for (version_t v = last_committed; + for (version_t v = last_committed+1; v <= last->last_committed; v++) { mon->store->put_bl_sn(last->values[v], machine_name, v); @@ -132,37 +177,53 @@ void Paxos::handle_last(MMonPaxos *last) << last->values[v].length() << " bytes" << endl; } last_committed = last->last_committed; - mon->store->put_int(last_committed, machine_name, "last_commtted"); + mon->store->put_int(last_committed, machine_name, "last_committed"); dout(10) << "last_committed now " << last_committed << endl; } // do they accept your pn? - if (last->old_accepted_pn > accepted_pn) { - dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl; - collect(last->old_accepted_pn); + if (last->pn > accepted_pn) { + // no, try again. + dout(10) << " they had a higher pn than us, picking a new one." << endl; + collect(last->pn); } else { - // they accepted our pn. great. + // yes, they accepted our pn. great. num_last++; - dout(10) << "great, they accepted our pn, we now have " << num_last << endl; + dout(10) << " they accepted our pn, we now have " + << num_last << " peons" << endl; // did this person send back an accepted but uncommitted value? - if (last->old_accepted_pn && - last->old_accepted_pn > old_accepted_pn) { - version_t v = last->last_committed+1; - dout(10) << "we learned an old value for " << v << " pn " << last->old_accepted_pn; - old_accepted_pn = last->old_accepted_pn; - old_accepted_value = last->values[v]; + if (last->uncommitted_pn && + last->uncommitted_pn > uncommitted_pn) { + uncommitted_v = last->last_committed+1; + uncommitted_pn = last->uncommitted_pn; + uncommitted_value = last->values[uncommitted_v]; + dout(10) << "we learned an uncommitted value for " << uncommitted_v + << " pn " << uncommitted_pn + << " " << uncommitted_value.length() << " bytes" + << endl; } - // do we have a majority? - if (num_last == mon->monmap->num_mon/2+1) { - // do this once. + // is that everyone? + if (num_last == mon->get_quorum().size()) { + // almost... + state = STATE_ACTIVE; // did we learn an old value? - if (old_accepted_value.length()) { - dout(10) << "begin on old learned value" << endl; - begin(old_accepted_value); - } + if (uncommitted_v == last_committed+1 && + uncommitted_value.length()) { + dout(10) << "that's everyone. begin on old learned value" << endl; + begin(uncommitted_value); + } else { + // active! + dout(10) << "that's everyone. active!" << endl; + extend_lease(); + + // wake people up + finish_contexts(waiting_for_active); + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); + } } } @@ -170,54 +231,86 @@ void Paxos::handle_last(MMonPaxos *last) } +// leader void Paxos::begin(bufferlist& v) { dout(10) << "begin for " << last_committed+1 << " " - << new_value.length() << " bytes" + << v.length() << " bytes" << endl; - // we must already have a majority for this to work. - assert(num_last > mon->monmap->num_mon/2); + assert(mon->is_leader()); + assert(is_active()); + state = STATE_UPDATING; + // we must already have a majority for this to work. + assert(mon->get_quorum().size() == 1 || + num_last > (unsigned)mon->monmap->num_mon/2); + // and no value, yet. assert(new_value.length() == 0); - + // accept it ourselves - num_accepted = 1; + accepted.clear(); + accepted.insert(whoami); new_value = v; mon->store->put_bl_sn(new_value, machine_name, last_committed+1); - // ask others to accept it to! - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; + if (mon->get_quorum().size() == 1) { + // we're alone, take it easy + commit(); + state = STATE_ACTIVE; + finish_contexts(waiting_for_active); + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); + return; + } - dout(10) << " sending begin to mon" << i << endl; - MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id); + // ask others to accept it to! + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; + + dout(10) << " sending begin to mon" << *p << endl; + MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id); begin->values[last_committed+1] = new_value; + begin->last_committed = last_committed; begin->pn = accepted_pn; - mon->messenger->send_message(begin, mon->monmap->get_inst(i)); + mon->messenger->send_message(begin, mon->monmap->get_inst(*p)); } + + // set timeout event + accept_timeout_event = new C_AcceptTimeout(this); + mon->timer.add_event_after(g_conf.mon_accept_timeout, accept_timeout_event); } +// peon void Paxos::handle_begin(MMonPaxos *begin) { dout(10) << "handle_begin " << *begin << endl; // can we accept this? - if (begin->pn != accepted_pn) { + if (begin->pn < accepted_pn) { dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl; delete begin; return; } + assert(begin->pn == accepted_pn); + assert(begin->last_committed == last_committed); + // set state. + state = STATE_UPDATING; + lease_expire = utime_t(); // cancel lease + // yes. version_t v = last_committed+1; dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl; mon->store->put_bl_sn(begin->values[v], machine_name, v); // reply - MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id); + MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id); accept->pn = accepted_pn; accept->last_committed = last_committed; mon->messenger->send_message(accept, begin->get_source_inst()); @@ -225,33 +318,67 @@ void Paxos::handle_begin(MMonPaxos *begin) delete begin; } - +// leader void Paxos::handle_accept(MMonPaxos *accept) { dout(10) << "handle_accept " << *accept << endl; - + int from = accept->get_source().num(); + if (accept->pn != accepted_pn) { // we accepted a higher pn, from some other leader dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl; delete accept; return; } - if (accept->last_committed != last_committed) { - dout(10) << " this is from an old round that's already committed, ignoring" << endl; + if (last_committed > 0 && + accept->last_committed < last_committed-1) { + dout(10) << " this is from an old round, ignoring" << endl; delete accept; return; } + assert(accept->last_committed == last_committed || // not committed + accept->last_committed == last_committed-1); // committed - num_accepted++; - dout(10) << "now " << num_accepted << " have accepted" << endl; + assert(state == STATE_UPDATING); + assert(accepted.count(from) == 0); + accepted.insert(from); + dout(10) << " now " << accepted << " have accepted" << endl; // new majority? - if (num_accepted == mon->monmap->num_mon/2+1) { + if (accepted.size() == (unsigned)mon->monmap->num_mon/2+1) { // yay, commit! - dout(10) << "we got a majority, committing too" << endl; + // note: this may happen before the lease is reextended (below) + dout(10) << " got majority, committing" << endl; commit(); - } + } + + // done? + if (accepted == mon->get_quorum()) { + dout(10) << " got quorum, done with update" << endl; + // cancel timeout event + mon->timer.cancel_event(accept_timeout_event); + accept_timeout_event = 0; + // yay! + state = STATE_ACTIVE; + extend_lease(); + + // wake people up + finish_contexts(waiting_for_active); + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); + } +} + +void Paxos::accept_timeout() +{ + dout(5) << "accept timeout, calling fresh election" << endl; + accept_timeout_event = 0; + assert(mon->is_leader()); + assert(is_updating()); + cancel_events(); + mon->call_election(); } void Paxos::commit() @@ -263,20 +390,21 @@ void Paxos::commit() mon->store->put_int(last_committed, machine_name, "last_committed"); // tell everyone - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; - dout(10) << " sending commit to mon" << i << endl; - MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id); + dout(10) << " sending commit to mon" << *p << endl; + MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); commit->values[last_committed] = new_value; commit->pn = accepted_pn; - mon->messenger->send_message(commit, mon->monmap->get_inst(i)); + mon->messenger->send_message(commit, mon->monmap->get_inst(*p)); } // get ready for a new round. new_value.clear(); - } @@ -284,14 +412,162 @@ void Paxos::handle_commit(MMonPaxos *commit) { dout(10) << "handle_commit on " << commit->last_committed << endl; + if (!mon->is_peon()) { + dout(10) << "not a peon, dropping" << endl; + assert(0); + delete commit; + return; + } + // commit locally. - last_committed = commit->last_committed; - mon->store->put_bl_sn(commit->values[last_committed], machine_name, last_committed); + for (map::iterator p = commit->values.begin(); + p != commit->values.end(); + ++p) { + assert(p->first == last_committed+1); + last_committed = p->first; + dout(10) << " storing " << last_committed << " (" << p->second.length() << " bytes)" << endl; + mon->store->put_bl_sn(p->second, machine_name, last_committed); + } mon->store->put_int(last_committed, machine_name, "last_committed"); delete commit; -} +} + +void Paxos::extend_lease() +{ + assert(mon->is_leader()); + assert(is_active()); + + lease_expire = g_clock.now(); + lease_expire += g_conf.mon_lease; + acked_lease.clear(); + acked_lease.insert(whoami); + + dout(7) << "extend_lease now+" << g_conf.mon_lease << " (" << lease_expire << ")" << endl; + + // bcast + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; + MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id); + lease->last_committed = last_committed; + lease->lease_expire = lease_expire; + mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); + } + // set timeout event. + // if old timeout is still in place, leave it. + if (!lease_ack_timeout_event) { + lease_ack_timeout_event = new C_LeaseAckTimeout(this); + mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event); + } + + // set renew event + lease_renew_event = new C_LeaseRenew(this); + utime_t at = lease_expire; + at -= g_conf.mon_lease; + at += g_conf.mon_lease_renew_interval; + mon->timer.add_event_at(at, lease_renew_event); +} + + +// peon +void Paxos::handle_lease(MMonPaxos *lease) +{ + // sanity + if (!mon->is_peon() || + last_committed != lease->last_committed) { + dout(10) << "handle_lease i'm not a peon, or they're not the leader, or the last_committed doesn't match, dropping" << endl; + delete lease; + return; + } + + // extend lease + if (lease_expire < lease->lease_expire) + lease_expire = lease->lease_expire; + + state = STATE_ACTIVE; + + dout(10) << "handle_lease on " << lease->last_committed + << " now " << lease_expire << endl; + + // ack + MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id); + ack->last_committed = last_committed; + ack->lease_expire = lease_expire; + mon->messenger->send_message(ack, lease->get_source_inst()); + + // (re)set timeout event. + if (lease_timeout_event) + mon->timer.cancel_event(lease_timeout_event); + lease_timeout_event = new C_LeaseTimeout(this); + mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_timeout_event); + + // kick waiters + finish_contexts(waiting_for_active); + if (is_readable()) + finish_contexts(waiting_for_readable); + + delete lease; +} + +void Paxos::handle_lease_ack(MMonPaxos *ack) +{ + int from = ack->get_source().num(); + + if (!lease_ack_timeout_event) { + dout(10) << "handle_lease_ack from " << ack->get_source() << " -- stray (probably since revoked)" << endl; + } + else if (acked_lease.count(from) == 0) { + acked_lease.insert(from); + + if (acked_lease == mon->get_quorum()) { + // yay! + dout(10) << "handle_lease_ack from " << ack->get_source() + << " -- got everyone" << endl; + mon->timer.cancel_event(lease_ack_timeout_event); + lease_ack_timeout_event = 0; + } else { + dout(10) << "handle_lease_ack from " << ack->get_source() + << " -- still need " + << mon->get_quorum().size() - acked_lease.size() + << " more" << endl; + } + } else { + dout(10) << "handle_lease_ack from " << ack->get_source() + << " dup (lagging!), ignoring" << endl; + } + + delete ack; +} + +void Paxos::lease_ack_timeout() +{ + dout(5) << "lease_ack_timeout -- calling new election" << endl; + assert(mon->is_leader()); + assert(is_active()); + + lease_ack_timeout_event = 0; + cancel_events(); + mon->call_election(); +} + +void Paxos::lease_timeout() +{ + dout(5) << "lease_timeout -- calling new election" << endl; + assert(mon->is_peon()); + + lease_timeout_event = 0; + cancel_events(); + mon->call_election(); +} + +void Paxos::lease_renew_timeout() +{ + lease_renew_event = 0; + extend_lease(); +} /* @@ -299,37 +575,93 @@ void Paxos::handle_commit(MMonPaxos *commit) */ version_t Paxos::get_new_proposal_number(version_t gt) { - // read last - version_t last = mon->store->get_int("last_paxos_proposal"); - if (last < gt) - last = gt; + if (last_pn < gt) + last_pn = gt; - // update - last /= 100; - last++; - - // make it unique among all monitors. - version_t pn = last*100 + (version_t)whoami; + // update. make it unique among all monitors. + last_pn /= 100; + last_pn++; + last_pn *= 100; + last_pn += (version_t)whoami; // write - mon->store->put_int(pn, "last_paxos_proposal"); + mon->store->put_int(last_pn, machine_name, "last_pn"); - dout(10) << "get_new_proposal_number = " << pn << endl; - return pn; + dout(10) << "get_new_proposal_number = " << last_pn << endl; + return last_pn; } -void Paxos::leader_start() +void Paxos::cancel_events() { - dout(10) << "leader_start -- i am the leader, start paxos" << endl; + if (accept_timeout_event) { + mon->timer.cancel_event(accept_timeout_event); + accept_timeout_event = 0; + } + if (lease_renew_event) { + mon->timer.cancel_event(lease_renew_event); + lease_renew_event = 0; + } + if (lease_ack_timeout_event) { + mon->timer.cancel_event(lease_ack_timeout_event); + lease_ack_timeout_event = 0; + } + if (lease_timeout_event) { + mon->timer.cancel_event(lease_timeout_event); + lease_timeout_event = 0; + } +} + +void Paxos::leader_init() +{ + if (mon->get_quorum().size() == 1) { + state = STATE_ACTIVE; + return; + } + cancel_events(); + state = STATE_RECOVERING; + lease_expire = utime_t(); + dout(10) << "leader_init -- starting paxos recovery" << endl; collect(0); } +void Paxos::peon_init() +{ + cancel_events(); + state = STATE_RECOVERING; + lease_expire = utime_t(); + dout(10) << "peon_init -- i am a peon" << endl; + + // no chance to write now! + finish_contexts(waiting_for_writeable, -1); + finish_contexts(waiting_for_commit, -1); +} + +void Paxos::election_starting() +{ + dout(10) << "election_starting -- canceling timeouts" << endl; + cancel_events(); + new_value.clear(); + + finish_contexts(waiting_for_commit, -1); +} + void Paxos::dispatch(Message *m) { + // election in progress? + if (mon->is_starting()) { + dout(5) << "election in progress, dropping " << *m << endl; + delete m; + return; + } + + // check sanity + assert(mon->is_leader() || + (mon->is_peon() && m->get_source().num() == mon->get_leader())); + switch (m->get_type()) { - + case MSG_MON_PAXOS: { MMonPaxos *pm = (MMonPaxos*)m; @@ -340,23 +672,24 @@ void Paxos::dispatch(Message *m) case MMonPaxos::OP_COLLECT: handle_collect(pm); break; - case MMonPaxos::OP_LAST: handle_last(pm); break; - case MMonPaxos::OP_BEGIN: handle_begin(pm); break; - case MMonPaxos::OP_ACCEPT: handle_accept(pm); break; - case MMonPaxos::OP_COMMIT: handle_commit(pm); break; - + case MMonPaxos::OP_LEASE: + handle_lease(pm); + break; + case MMonPaxos::OP_LEASE_ACK: + handle_lease_ack(pm); + break; default: assert(0); } @@ -368,3 +701,84 @@ void Paxos::dispatch(Message *m) } } + + + +// ----------------- +// service interface + +// -- READ -- + +bool Paxos::is_readable() +{ + //dout(15) << "is_readable now=" << g_clock.now() << " lease_expire=" << lease_expire << endl; + return + (mon->is_peon() || mon->is_leader()) && + is_active() && + last_committed > 0 && // must have a value + (mon->get_quorum().size() == 1 || // alone, or + g_clock.now() < lease_expire); // have lease +} + +bool Paxos::read(version_t v, bufferlist &bl) +{ + if (!is_readable()) + return false; + + if (!mon->store->get_bl_sn(bl, machine_name, v)) + return false; + return true; +} + +version_t Paxos::read_current(bufferlist &bl) +{ + if (!is_readable()) + return 0; + if (read(last_committed, bl)) + return last_committed; + return 0; +} + + + + +// -- WRITE -- + +bool Paxos::is_writeable() +{ + if (mon->get_quorum().size() == 1) return true; + return + mon->is_leader() && + is_active() && + g_clock.now() < lease_expire; +} + +bool Paxos::propose_new_value(bufferlist& bl, Context *oncommit) +{ + /* + // writeable? + if (!is_writeable()) { + dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" + << " -- not writeable" << endl; + if (oncommit) { + oncommit->finish(-1); + delete oncommit; + } + return false; + } + */ + + assert(mon->is_leader() && is_active()); + + // cancel lease renewal and timeout events. + cancel_events(); + + // ok! + dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" << endl; + if (oncommit) + waiting_for_commit.push_back(oncommit); + begin(bl); + + return true; +} + diff --git a/branches/sage/pgs/mon/Paxos.h b/branches/sage/pgs/mon/Paxos.h index 777d175685bc9..403e6d6eeaf96 100644 --- a/branches/sage/pgs/mon/Paxos.h +++ b/branches/sage/pgs/mon/Paxos.h @@ -35,10 +35,22 @@ e 12v */ + +/* + * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways: + * 1- Only a single new value is generated at a time, simplifying the recovery logic. + * 2- Nodes track "committed" values, and share them generously (and trustingly) + * 3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to + * "read" their copy of the last committed value. + * + * This provides a simple replication substrate that services can be built on top of. + */ + #ifndef __MON_PAXOS_H #define __MON_PAXOS_H #include "include/types.h" +#include "mon_types.h" #include "include/buffer.h" #include "msg/Message.h" @@ -49,6 +61,7 @@ e 12v class Monitor; class MMonPaxos; + // i am one state machine. class Paxos { Monitor *mon; @@ -58,40 +71,176 @@ class Paxos { int machine_id; const char *machine_name; - // phase 1 + friend class PaxosService; + + // LEADER+PEON + + // -- generic state -- +public: + const static int STATE_RECOVERING = 1; // leader|peon: recovering paxos state + const static int STATE_ACTIVE = 2; // leader|peon: idle. peon may or may not have valid lease + const static int STATE_UPDATING = 3; // leader|peon: updating to new value + const char *get_statename(int s) { + switch (s) { + case STATE_RECOVERING: return "recovering"; + case STATE_ACTIVE: return "active"; + case STATE_UPDATING: return "updating"; + default: assert(0); return 0; + } + } + +private: + int state; + +public: + bool is_recovering() { return state == STATE_RECOVERING; } + bool is_active() { return state == STATE_ACTIVE; } + bool is_updating() { return state == STATE_UPDATING; } + +private: + // recovery (phase 1) + version_t last_pn; version_t last_committed; version_t accepted_pn; version_t accepted_pn_from; - - // results from our last replies - int num_last; - version_t old_accepted_pn; - bufferlist old_accepted_value; - // phase 2 + // active (phase 2) + utime_t lease_expire; + list waiting_for_active; + list waiting_for_readable; + + + // -- leader -- + // recovery (paxos phase 1) + unsigned num_last; + version_t uncommitted_v; + version_t uncommitted_pn; + bufferlist uncommitted_value; + + // active + set acked_lease; + Context *lease_renew_event; + Context *lease_ack_timeout_event; + Context *lease_timeout_event; + + // updating (paxos phase 2) bufferlist new_value; - int num_accepted; - + set accepted; + + Context *accept_timeout_event; + + list waiting_for_writeable; + list waiting_for_commit; + + class C_AcceptTimeout : public Context { + Paxos *paxos; + public: + C_AcceptTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->accept_timeout(); + } + }; + + class C_LeaseAckTimeout : public Context { + Paxos *paxos; + public: + C_LeaseAckTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->lease_ack_timeout(); + } + }; + + class C_LeaseTimeout : public Context { + Paxos *paxos; + public: + C_LeaseTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->lease_timeout(); + } + }; + + class C_LeaseRenew : public Context { + Paxos *paxos; + public: + C_LeaseRenew(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->lease_renew_timeout(); + } + }; + + void collect(version_t oldpn); void handle_collect(MMonPaxos*); void handle_last(MMonPaxos*); void begin(bufferlist& value); void handle_begin(MMonPaxos*); void handle_accept(MMonPaxos*); + void accept_timeout(); void commit(); void handle_commit(MMonPaxos*); + void extend_lease(); + void handle_lease(MMonPaxos*); + void handle_lease_ack(MMonPaxos*); + + void lease_ack_timeout(); // on leader, if lease isn't acked by all peons + void lease_renew_timeout(); // on leader, to renew the lease + void lease_timeout(); // on peon, if lease isn't extended + + void cancel_events(); version_t get_new_proposal_number(version_t gt=0); public: Paxos(Monitor *m, int w, - int mid,const char *mnm) : mon(m), whoami(w), - machine_id(mid), machine_name(mnm) { - } + int mid) : mon(m), whoami(w), + machine_id(mid), + machine_name(get_paxos_name(mid)), + state(STATE_RECOVERING), + lease_renew_event(0), + lease_ack_timeout_event(0), + lease_timeout_event(0), + accept_timeout_event(0) { } void dispatch(Message *m); - void leader_start(); + void init(); + + void election_starting(); + void leader_init(); + void peon_init(); + + + // -- service interface -- + void wait_for_active(Context *c) { + assert(!is_active()); + waiting_for_active.push_back(c); + } + + // read + version_t get_version() { return last_committed; } + bool is_readable(); + bool read(version_t v, bufferlist &bl); + version_t read_current(bufferlist &bl); + void wait_for_readable(Context *onreadable) { + assert(!is_readable()); + waiting_for_readable.push_back(onreadable); + } + + // write + bool is_leader(); + bool is_writeable(); + void wait_for_writeable(Context *c) { + assert(!is_writeable()); + waiting_for_writeable.push_back(c); + } + + bool propose_new_value(bufferlist& bl, Context *oncommit=0); + void wait_for_commit(Context *oncommit) { + waiting_for_commit.push_back(oncommit); + } + void wait_for_commit_front(Context *oncommit) { + waiting_for_commit.push_front(oncommit); + } }; diff --git a/branches/sage/pgs/mon/PaxosService.cc b/branches/sage/pgs/mon/PaxosService.cc new file mode 100644 index 0000000000000..0816b0db467b0 --- /dev/null +++ b/branches/sage/pgs/mon/PaxosService.cc @@ -0,0 +1,136 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "PaxosService.h" +#include "common/Clock.h" +#include "Monitor.h" + + + +#include "config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxosservice(" << get_paxos_name(paxos->machine_id) << ") " +//#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " " + + + + +void PaxosService::dispatch(Message *m) +{ + dout(10) << "dispatch " << *m << " from " << m->get_source_inst() << endl; + + // make sure our map is readable and up to date + if (!paxos->is_readable()) { + dout(10) << " waiting for paxos -> readable" << endl; + paxos->wait_for_readable(new C_RetryMessage(this, m)); + return; + } + + // make sure service has latest from paxos. + update_from_paxos(); + + // preprocess + if (preprocess_query(m)) + return; // easy! + + // leader? + if (!mon->is_leader()) { + // fw to leader + dout(10) << " fw to leader mon" << mon->get_leader() << endl; + mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + return; + } + + // writeable? + if (!paxos->is_writeable()) { + dout(10) << " waiting for paxos -> writeable" << endl; + paxos->wait_for_writeable(new C_RetryMessage(this, m)); + return; + } + + // update + if (prepare_update(m) && + should_propose_now()) + propose_pending(); +} + +void PaxosService::_commit() +{ + dout(7) << "_commit" << endl; + update_from_paxos(); // notify service of new paxos state + + if (mon->is_leader()) { + dout(7) << "_commit creating new pending" << endl; + assert(have_pending == false); + create_pending(); + have_pending = true; + } +} + + +void PaxosService::propose_pending() +{ + dout(10) << "propose_pending" << endl; + assert(have_pending); + + // finish and encode + bufferlist bl; + encode_pending(bl); + have_pending = false; + + // apply to paxos + paxos->wait_for_commit_front(new C_Commit(this)); + paxos->propose_new_value(bl); +} + + +void PaxosService::election_finished() +{ + dout(10) << "election_finished" << endl; + + if (have_pending && + !mon->is_leader()) { + discard_pending(); + have_pending = false; + } + + // make sure we update our state + if (paxos->is_active()) + _active(); + else + paxos->wait_for_active(new C_Active(this)); +} + +void PaxosService::_active() +{ + dout(10) << "_active" << endl; + assert(paxos->is_active()); + + // pull latest from paxos + update_from_paxos(); + + // create pending state? + if (mon->is_leader()) { + if (!have_pending) { + create_pending(); + have_pending = true; + } + + if (g_conf.mkfs && + paxos->get_version() == 0) { + create_initial(); + propose_pending(); + } + } +} diff --git a/branches/sage/pgs/mon/PaxosService.h b/branches/sage/pgs/mon/PaxosService.h new file mode 100644 index 0000000000000..32bcb3e4b11fb --- /dev/null +++ b/branches/sage/pgs/mon/PaxosService.h @@ -0,0 +1,91 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __PAXOSSERVICE_H +#define __PAXOSSERVICE_H + +#include "msg/Dispatcher.h" +#include "include/Context.h" + +class Monitor; +class Paxos; + +class PaxosService : public Dispatcher { +protected: + Monitor *mon; + Paxos *paxos; + + class C_RetryMessage : public Context { + PaxosService *svc; + Message *m; + public: + C_RetryMessage(PaxosService *s, Message *m_) : svc(s), m(m_) {} + void finish(int r) { + svc->dispatch(m); + } + }; + class C_Active : public Context { + PaxosService *svc; + public: + C_Active(PaxosService *s) : svc(s) {} + void finish(int r) { + if (r >= 0) + svc->_active(); + } + }; + class C_Commit : public Context { + PaxosService *svc; + public: + C_Commit(PaxosService *s) : svc(s) {} + void finish(int r) { + if (r >= 0) + svc->_commit(); + } + }; + friend class C_Update; + +private: + bool have_pending; + +public: + PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p), + have_pending(false) { } + + // i implement and you ignore + void dispatch(Message *m); + void election_finished(); + +private: + void _active(); + void _commit(); + +public: + // i implement and you use + void propose_pending(); // propose current pending as new paxos state + + // you implement + virtual bool update_from_paxos() = 0; // assimilate latest paxos state + virtual void create_pending() = 0; // [leader] create new pending structures + virtual void create_initial() = 0; // [leader] populate pending with initial state (1) + virtual void encode_pending(bufferlist& bl) = 0; // [leader] finish and encode pending for next paxos state + virtual void discard_pending() { } // [leader] discard pending + + virtual bool preprocess_query(Message *m) = 0; // true if processed (e.g., read-only) + virtual bool prepare_update(Message *m) = 0; + virtual bool should_propose_now() { return true; } + +}; + +#endif + diff --git a/branches/sage/pgs/mon/mon_types.h b/branches/sage/pgs/mon/mon_types.h new file mode 100644 index 0000000000000..852e42b8d983f --- /dev/null +++ b/branches/sage/pgs/mon/mon_types.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MON_TYPES_H +#define __MON_TYPES_H + +#define PAXOS_TEST 0 +#define PAXOS_MDSMAP 1 +#define PAXOS_OSDMAP 2 +#define PAXOS_CLIENTMAP 3 + +inline const char *get_paxos_name(int p) { + switch (p) { + case PAXOS_TEST: return "test"; + case PAXOS_MDSMAP: return "mdsmap"; + case PAXOS_OSDMAP: return "osdmap"; + case PAXOS_CLIENTMAP: return "clientmap"; + default: assert(0); return 0; + } +} + +#endif -- 2.39.5